feat(distributions): add logweibull, ricegamma, and logricegamma

Add three new continuous random variables for log-domain and
linear-domain clutter modeling with compound Gamma-Rice structure.

Fix numerical stability of k_dist._logpdf and logk._log_kve via a
three-regime log(kve) asymptotic (direct / large-z Hankel / large-order
Gamma); replace quad-based k_dist._cdf with Gauss-Laguerre quadrature.

Fix fitter: use np.asarray instead of np.abs in fit(), pass fit_params
to goodness_of_fit so the observed-data statistic reuses fitted params.
Skip non-finite quantiles in QQ plots. Add plot_qq_plots_sns(); rename
histogram_with_fits_seaborn() to histogram_with_fits_sns(). Add unit
tests for logweibull and logricegamma.
This commit is contained in:
2026-05-07 11:55:33 -03:00
parent c59bc55fe5
commit 346d85c4f7
3 changed files with 1007 additions and 57 deletions

View File

@@ -8,6 +8,12 @@ from scipy.stats import rv_continuous, goodness_of_fit
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
def set_plot_style():
sns.set_style("whitegrid")
sns.set_context("paper", font_scale=1.25)
@dataclass @dataclass
class DistributionSummary: class DistributionSummary:
""" """
@@ -181,7 +187,7 @@ class Fitter:
Mapping of distribution name → summary (test_result is None Mapping of distribution name → summary (test_result is None
until validate() is called). until validate() is called).
""" """
data_flat = np.abs(data).flatten() data_flat = np.asarray(data).flatten()
self._last_data_flat = data_flat self._last_data_flat = data_flat
for dist in self.dist_list: for dist in self.dist_list:
@@ -213,8 +219,16 @@ class Fitter:
data_flat = self._last_data_flat data_flat = self._last_data_flat
for dist in self.dist_list: for dist in self.dist_list:
_summary = self._dist[dist.name] _summary = self._dist[dist.name]
# Build fit_params dict so goodness_of_fit reuses the already-fitted
# parameters for the observed-data statistic instead of re-fitting.
shape_names = [s.strip() for s in dist.shapes.split(',')] if dist.shapes else []
all_param_names = shape_names + ['loc', 'scale']
fit_params_dict = dict(zip(all_param_names, _summary.fit_result_params))
test_result = goodness_of_fit( test_result = goodness_of_fit(
dist, data_flat, statistic=_summary.statistic_method, **kwargs dist, data_flat,
statistic=_summary.statistic_method,
fit_params=fit_params_dict,
**kwargs,
) )
_summary.test_result = test_result _summary.test_result = test_result
self._dist[dist.name] = _summary self._dist[dist.name] = _summary
@@ -276,13 +290,21 @@ class Fitter:
plotting_positions, *summary.fit_result_params plotting_positions, *summary.fit_result_params
) )
# Drop NaN/inf quantiles that arise when ppf fails to converge
valid = np.isfinite(theoretical_quantiles)
if not valid.any():
print(f"Distribution '{dist_name}': all theoretical quantiles are non-finite. Skipping QQ plot.")
continue
theoretical_quantiles = theoretical_quantiles[valid]
sorted_data_plot = sorted_data[valid]
# Create QQ plot in each subplot # Create QQ plot in each subplot
row = (list(self._dist.keys()).index(dist_name) // num_cols) + 1 row = (list(self._dist.keys()).index(dist_name) // num_cols) + 1
col = (list(self._dist.keys()).index(dist_name) % num_cols) + 1 col = (list(self._dist.keys()).index(dist_name) % num_cols) + 1
fig.add_trace( fig.add_trace(
go.Scatter( go.Scatter(
x=theoretical_quantiles, x=theoretical_quantiles,
y=sorted_data, y=sorted_data_plot,
mode="markers", mode="markers",
name=dist_name, name=dist_name,
), ),
@@ -290,8 +312,8 @@ class Fitter:
col=col, col=col,
) )
# Add a reference line y=x # Add a reference line y=x
min_val = min(theoretical_quantiles.min(), sorted_data.min()) min_val = min(theoretical_quantiles.min(), sorted_data_plot.min())
max_val = max(theoretical_quantiles.max(), sorted_data.max()) max_val = max(theoretical_quantiles.max(), sorted_data_plot.max())
fig.add_trace( fig.add_trace(
go.Scatter( go.Scatter(
x=[min_val, max_val], x=[min_val, max_val],
@@ -322,6 +344,88 @@ class Fitter:
showlegend=False, showlegend=False,
autosize=True, autosize=True,
) )
return fig
def plot_qq_plots_sns(self, method: str = "hazen"):
"""
Generate QQ plots for each fitted distribution using seaborn/matplotlib.
Parameters
----------
method : str
Plotting positions formula. Either 'hazen' (default) or 'filliben'.
"""
if not hasattr(self, "_last_data_flat"):
raise RuntimeError("No data available. Call fit() first.")
if method not in ("hazen", "filliben"):
raise ValueError(f"method must be 'hazen' or 'filliben', got '{method}'.")
set_plot_style()
dist_names = list(self._dist.keys())
num_dists = len(dist_names)
num_cols = 2
num_rows = (num_dists + 1) // 2
dot_color = sns.color_palette()[0]
fig, axes = plt.subplots(num_rows, num_cols, figsize=(6 * num_cols, 5 * num_rows))
axes = np.array(axes).flatten()
sorted_data = np.sort(self._last_data_flat)
n = len(sorted_data)
i = np.arange(1, n + 1)
if method == "hazen":
plotting_positions = (i - 0.5) / n
else:
plotting_positions = (i - 0.3175) / (n + 0.365)
plotting_positions[0] = 1 - 0.5 ** (1 / n)
plotting_positions[-1] = 0.5 ** (1 / n)
for idx, (dist_name, summary) in enumerate(self):
ax = axes[idx]
if summary.test_result is None:
ax.set_title(dist_name)
ax.text(0.5, 0.5, "Not validated", ha="center", va="center",
transform=ax.transAxes)
continue
theoretical_quantiles = summary.distribution_object.ppf(
plotting_positions, *summary.fit_result_params
)
valid = np.isfinite(theoretical_quantiles)
if not valid.any():
ax.set_title(dist_name)
ax.text(0.5, 0.5, "All quantiles non-finite", ha="center", va="center",
transform=ax.transAxes)
continue
tq = theoretical_quantiles[valid]
sd = sorted_data[valid]
ax.scatter(tq, sd, color=dot_color, s=8, alpha=0.6, linewidths=0)
ref_min = min(tq.min(), sd.min())
ref_max = max(tq.max(), sd.max())
ax.plot([ref_min, ref_max], [ref_min, ref_max], "k--", linewidth=1)
pvalue = summary.pvalue
stat_color = "green" if pvalue > 0.05 else "red"
ax.text(0.95, 0.05,
f"{summary.statistic_method}={summary.gof_statistic:.4f}",
transform=ax.transAxes, ha="right", va="bottom",
fontsize=9, color=stat_color)
ax.set_title(dist_name)
ax.set_xlabel("Theoretical Quantiles")
ax.set_ylabel("Empirical Quantiles")
# Hide unused axes
for idx in range(num_dists, len(axes)):
axes[idx].set_visible(False)
fig.suptitle(f"QQ Plots of Fitted Distributions ({method})", y=1.01)
plt.tight_layout()
return fig return fig
def histogram_with_fits(self): def histogram_with_fits(self):
@@ -389,7 +493,7 @@ class Fitter:
return fig return fig
def histogram_with_fits_seaborn(self): def histogram_with_fits_sns(self):
""" """
Generate a histogram of the data with overlaid PDFs of each fitted distribution using seaborn. Generate a histogram of the data with overlaid PDFs of each fitted distribution using seaborn.
Requires that fit() has been called to populate parameters. Requires that fit() has been called to populate parameters.
@@ -422,7 +526,7 @@ class Fitter:
ax.plot( ax.plot(
x, x,
pdf_values, pdf_values,
label=f"{summary.distribution_name} --- p={summary.pvalue:.4f}", label=f"{summary.distribution_name}",
) )
# put title in top left, make it smaller, change it font to sans and put in light gray # put title in top left, make it smaller, change it font to sans and put in light gray
ax.set_title( ax.set_title(

View File

@@ -7,7 +7,7 @@ import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from tools.distributions import k_dist, logk, lognakagami, loggamma_dist, lograyleigh, logrice from tools.distributions import k_dist, logk, lognakagami, loggamma_dist, lograyleigh, logrice, logweibull, logricegamma, ricegamma
X = np.linspace(0.01, 10.0, 500) X = np.linspace(0.01, 10.0, 500)
@@ -677,6 +677,237 @@ class TestLogK:
np.testing.assert_allclose(pdf_ab, pdf_ba, rtol=1e-6) np.testing.assert_allclose(pdf_ab, pdf_ba, rtol=1e-6)
# ── logweibull unit tests ─────────────────────────────────────────────────────
Y_LOGWEIBULL = np.linspace(-10.0, 10.0, 500)
class TestLogWeibull:
def test_logpdf_is_finite_on_real_line(self):
"""logpdf must be finite for all real y."""
vals = logweibull.logpdf(Y_LOGWEIBULL, k=2.0, lam=1.0)
assert np.all(np.isfinite(vals))
def test_pdf_integrates_to_one(self):
"""Numerical integral of PDF over the real line should be ≈ 1."""
y_fine = np.linspace(-20.0, 20.0, 200_000)
integral = np.trapezoid(logweibull.pdf(y_fine, k=2.0, lam=1.0), y_fine)
assert pytest.approx(integral, abs=1e-3) == 1.0
def test_logpdf_equals_log_pdf(self):
"""logpdf must equal log(pdf) at points where pdf does not underflow to zero."""
y_bulk = np.linspace(-10.0, 20.0, 100)
k, lam = 2.0, np.exp(12.0)
pdf_vals = logweibull.pdf(y_bulk, k=k, lam=lam)
mask = pdf_vals > 0
np.testing.assert_allclose(
logweibull.logpdf(y_bulk[mask], k=k, lam=lam),
np.log(pdf_vals[mask]),
rtol=1e-6,
)
def test_change_of_variable_matches_weibull(self):
"""logweibull.pdf(y) must equal weibull_min.pdf(exp(y)) * exp(y)."""
from scipy.stats import weibull_min
y_test = np.linspace(-3.0, 3.0, 20)
k, lam = 2.0, 1.5
direct = logweibull.pdf(y_test, k=k, lam=lam)
via_w = weibull_min.pdf(np.exp(y_test), c=k, scale=lam) * np.exp(y_test)
np.testing.assert_allclose(direct, via_w, rtol=1e-6)
def test_cdf_is_monotone_increasing(self):
"""CDF must be strictly non-decreasing."""
y_grid = np.linspace(-5.0, 5.0, 50)
cdf_vals = logweibull.cdf(y_grid, k=2.0, lam=1.0)
assert np.all(np.diff(cdf_vals) >= -1e-12)
def test_cdf_matches_weibull(self):
"""logweibull.cdf(y) must equal weibull_min.cdf(exp(y))."""
from scipy.stats import weibull_min
y_test = np.array([-2.0, 0.0, 1.0, 2.0])
k, lam = 1.5, 2.0
np.testing.assert_allclose(
logweibull.cdf(y_test, k=k, lam=lam),
weibull_min.cdf(np.exp(y_test), c=k, scale=lam),
rtol=1e-6,
)
def test_sf_plus_cdf_equals_one(self):
"""sf + cdf must equal 1 everywhere."""
y_test = np.linspace(-3.0, 3.0, 20)
k, lam = 2.0, 1.0
np.testing.assert_allclose(
logweibull.cdf(y_test, k=k, lam=lam) + logweibull.sf(y_test, k=k, lam=lam),
1.0,
rtol=1e-12,
)
def test_ppf_inverts_cdf(self):
"""ppf must be the exact inverse of cdf: cdf(ppf(q)) == q."""
# Round-trip over quantiles to avoid CDF saturation at extreme y values
q_test = np.array([0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95])
k, lam = 2.0, np.exp(12.0)
np.testing.assert_allclose(
logweibull.cdf(logweibull.ppf(q_test, k=k, lam=lam), k=k, lam=lam),
q_test,
rtol=1e-8,
)
def test_stats_mean_shifts_by_log_lam(self):
"""Doubling lam shifts the mean by ln(2), leaving variance unchanged."""
k = 2.0
mean1 = float(logweibull.stats(k=k, lam=1.0, moments="m"))
mean2 = float(logweibull.stats(k=k, lam=2.0, moments="m"))
assert pytest.approx(mean2 - mean1, rel=1e-10) == np.log(2.0)
def test_stats_variance_scales_with_k(self):
"""Variance must equal psi_1(1) / k^2."""
for k in [0.5, 1.0, 2.0]:
_, var, *_ = logweibull.stats(k=k, lam=1.0, moments="mv")
expected = sc.polygamma(1, 1) / k ** 2
assert pytest.approx(float(var), rel=1e-10) == expected
def test_stats_variance_is_lam_independent(self):
"""Variance must not depend on lam."""
k = 2.0
_, var1, *_ = logweibull.stats(k=k, lam=1.0, moments="mv")
_, var2, *_ = logweibull.stats(k=k, lam=5.0, moments="mv")
assert pytest.approx(float(var1), rel=1e-10) == float(var2)
def test_argcheck_rejects_non_positive_k(self):
"""k <= 0 must not produce a valid PDF value."""
val = logweibull.pdf(0.0, k=-1.0, lam=1.0)
assert not (np.isfinite(val) and val > 0)
def test_argcheck_rejects_non_positive_lam(self):
"""lam <= 0 must not produce a valid PDF value."""
val = logweibull.pdf(0.0, k=1.0, lam=-1.0)
assert not (np.isfinite(val) and val > 0)
def test_rvs_are_finite(self):
"""Random samples must be finite real numbers."""
rng = np.random.default_rng(42)
samples = logweibull.rvs(k=2.0, lam=1.0, size=500, random_state=rng)
assert samples.shape == (500,)
assert np.all(np.isfinite(samples))
def test_rvs_sample_mean_near_analytical(self):
"""Sample mean must be close to the analytical mean."""
k, lam = 2.0, 1.5
rng = np.random.default_rng(0)
samples = logweibull.rvs(k=k, lam=lam, size=100_000, random_state=rng)
expected_mean = float(logweibull.stats(k=k, lam=lam, moments="m"))
assert pytest.approx(float(samples.mean()), rel=5e-2) == expected_mean
def test_rvs_sample_variance_near_analytical(self):
"""Sample variance must be close to the analytical variance."""
k, lam = 2.0, 1.5
rng = np.random.default_rng(1)
samples = logweibull.rvs(k=k, lam=lam, size=100_000, random_state=rng)
_, expected_var, *_ = logweibull.stats(k=k, lam=lam, moments="mv")
assert pytest.approx(float(np.var(samples)), rel=5e-2) == float(expected_var)
# ── logricegamma unit tests ───────────────────────────────────────────────────
# PDF is expensive (numerical integration per point), so grids are kept small.
Y_LRICEGAMMA = np.linspace(-5.0, 5.0, 30)
class TestLogRiceGamma:
def test_pdf_is_positive_for_valid_params(self):
"""PDF must be strictly positive for finite y and valid parameters."""
vals = logricegamma.pdf(Y_LRICEGAMMA, alpha=2.0, beta=1.0, K=1.0)
assert np.all(vals > 0)
def test_pdf_integrates_to_one(self):
"""Numerical integral of PDF over a wide domain should be ≈ 1."""
y_fine = np.linspace(-15.0, 10.0, 1000)
integral = np.trapezoid(
logricegamma.pdf(y_fine, alpha=2.0, beta=1.0, K=1.0), y_fine
)
assert pytest.approx(integral, abs=1e-2) == 1.0
def test_pdf_integrates_to_one_k_zero(self):
"""Normalisation must hold for K=0 (Rice collapses to Rayleigh)."""
y_fine = np.linspace(-15.0, 10.0, 1000)
integral = np.trapezoid(
logricegamma.pdf(y_fine, alpha=2.0, beta=1.0, K=0.0), y_fine
)
assert pytest.approx(integral, abs=1e-2) == 1.0
def test_pdf_integrates_to_one_large_K(self):
"""Normalisation must hold for large K (highly specular regime)."""
y_fine = np.linspace(-10.0, 15.0, 1000)
integral = np.trapezoid(
logricegamma.pdf(y_fine, alpha=2.0, beta=1.0, K=10.0), y_fine
)
assert pytest.approx(integral, abs=1e-2) == 1.0
def test_logpdf_equals_log_pdf(self):
"""logpdf must equal log(pdf) where pdf does not underflow."""
y_bulk = np.linspace(-3.0, 3.0, 15)
pdf_vals = logricegamma.pdf(y_bulk, alpha=2.0, beta=1.0, K=1.0)
mask = pdf_vals > 0
np.testing.assert_allclose(
logricegamma.logpdf(y_bulk[mask], alpha=2.0, beta=1.0, K=1.0),
np.log(pdf_vals[mask]),
rtol=1e-6,
)
def test_argcheck_rejects_non_positive_alpha(self):
"""alpha <= 0 must not produce a valid PDF value."""
val = logricegamma.pdf(0.0, alpha=-1.0, beta=1.0, K=1.0)
assert not (np.isfinite(val) and val > 0)
def test_argcheck_rejects_non_positive_beta(self):
"""beta <= 0 must not produce a valid PDF value."""
val = logricegamma.pdf(0.0, alpha=2.0, beta=-1.0, K=1.0)
assert not (np.isfinite(val) and val > 0)
def test_argcheck_rejects_negative_K(self):
"""K < 0 must not produce a valid PDF value."""
val = logricegamma.pdf(0.0, alpha=2.0, beta=1.0, K=-1.0)
assert not (np.isfinite(val) and val > 0)
def test_cdf_is_monotone_increasing(self):
"""CDF must be strictly non-decreasing."""
y_grid = np.linspace(-4.0, 4.0, 15)
cdf_vals = logricegamma.cdf(y_grid, alpha=2.0, beta=1.0, K=1.0)
assert np.all(np.diff(cdf_vals) >= -1e-10)
def test_rvs_samples_are_finite(self):
"""Random samples must be finite real numbers."""
rng = np.random.default_rng(42)
samples = logricegamma.rvs(alpha=2.0, beta=1.0, K=1.0, size=500, random_state=rng)
assert samples.shape == (500,)
assert np.all(np.isfinite(samples))
def test_rvs_second_moment_equals_alpha_times_beta(self):
"""E[exp(2Y)] = E[X²] must equal alpha*beta (total average power from docstring)."""
alpha, beta, K = 2.0, 1.5, 2.0
rng = np.random.default_rng(0)
samples = logricegamma.rvs(alpha=alpha, beta=beta, K=K, size=100_000, random_state=rng)
assert pytest.approx(float(np.mean(np.exp(2.0 * samples))), rel=5e-2) == alpha * beta
def test_rvs_second_moment_k_zero(self):
"""E[X²] = alpha*beta must hold for K=0."""
alpha, beta, K = 3.0, 0.5, 0.0
rng = np.random.default_rng(1)
samples = logricegamma.rvs(alpha=alpha, beta=beta, K=K, size=100_000, random_state=rng)
assert pytest.approx(float(np.mean(np.exp(2.0 * samples))), rel=5e-2) == alpha * beta
def test_rvs_sample_mean_consistent_with_pdf(self):
"""Sample mean from RVS should match the numerically integrated mean from the PDF."""
alpha, beta, K = 2.0, 1.0, 1.0
rng = np.random.default_rng(2)
samples = logricegamma.rvs(alpha=alpha, beta=beta, K=K, size=50_000, random_state=rng)
y_fine = np.linspace(-15.0, 10.0, 1000)
pdf_vals = logricegamma.pdf(y_fine, alpha=alpha, beta=beta, K=K)
numerical_mean = float(np.trapezoid(y_fine * pdf_vals, y_fine))
assert pytest.approx(float(samples.mean()), rel=1e-1) == numerical_mean
if __name__ == "__main__": if __name__ == "__main__":
plot_k_dist_varying_alpha() plot_k_dist_varying_alpha()
plot_k_dist_varying_mu() plot_k_dist_varying_mu()

View File

@@ -161,11 +161,25 @@ Priority order for overriding to improve tail accuracy:
from scipy.stats import rv_continuous, ncx2 from scipy.stats import rv_continuous, ncx2
from scipy.special import kve, gammaln from scipy.special import kve, gammaln
from scipy.integrate import quad
import scipy.special as sc import scipy.special as sc
from scipy.stats._distn_infrastructure import _ShapeInfo from scipy.stats._distn_infrastructure import _ShapeInfo
import numpy as np import numpy as np
# Gauss-Legendre nodes/weights on [-1, 1] — used by logricegamma_gen._batch_integral.
# Computed once at import time; 200 nodes give machine-precision accuracy for smooth,
# well-resolved integrands while keeping the (N, 200) batch cost negligible.
_GL_M = 200
_gl_xi, _gl_wi = np.polynomial.legendre.leggauss(_GL_M)
# Gauss-Hermite nodes/weights — used by k_dist._cdf for the large-beta branch.
# When beta > _BETA_GH_THRESH, Gamma(β,1) weights overflow float64 (Γ(β) → ∞).
# We instead approximate E_{t~Gamma(β,1)}[f(t)] via the Normal approximation:
# Gamma(β,1) ≈ Normal(β, √β) => E[f(t)] ≈ (1/√π) Σ_k w_k f(β + √(2β)·u_k)
_GH_M = 30
_gh_nodes, _gh_weights = np.polynomial.hermite.hermgauss(_GH_M)
_gh_weights_norm = _gh_weights / np.sqrt(np.pi) # unit-mass weights
_BETA_GH_THRESH = 150.0
class k_gen(rv_continuous): class k_gen(rv_continuous):
"""Generalized K distribution for radar clutter modeling. """Generalized K distribution for radar clutter modeling.
@@ -198,6 +212,43 @@ class k_gen(rv_continuous):
def _pdf(self, x, mu, alpha, beta): def _pdf(self, x, mu, alpha, beta):
return np.exp(self._logpdf(x, mu, alpha, beta)) return np.exp(self._logpdf(x, mu, alpha, beta))
@staticmethod
def _log_kve_stable(v, z):
"""Numerically stable log(kve(v, z)) = log(K_v(z)) + z.
Three regimes:
- direct: kve(v, z) > 0 (no over/underflow)
- large-z: z >> |v| — Hankel 2-term asymptotic
- large-order: |v| >> z — leading Gamma asymptotic
log(K_v(z)) ≈ gammaln(|v|) + |v|*log(2/z) - log(2)
"""
z = np.asarray(z, dtype=float)
v = np.asarray(v, dtype=float)
abs_v = np.abs(v)
kve_val = kve(v, z)
z_safe = np.where(z > 0, z, 1.0)
# Large-z Hankel asymptotic (accurate when z >> |v|)
mu_v = 4.0 * v ** 2
# Clip log1p argument to (-1, inf) to avoid domain errors when np.where
# evaluates this branch even for points where it won't be selected.
log1p_arg = np.maximum(
(mu_v - 1.0) / (8.0 * z_safe)
+ (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2),
-1.0 + 1e-15,
)
log_asymp_largez = (
0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe))
+ np.log1p(log1p_arg)
)
# Large-order asymptotic (accurate when |v| >> z > 0)
# log(kve(v,z)) = log(K_v(z)) + z ≈ gammaln(|v|) + |v|*log(2/z) - log(2) + z
log_asymp_largev = gammaln(abs_v) + abs_v * np.log(2.0 / z_safe) - np.log(2.0) + z
# kve overflows to +inf when |v|>>z, underflows to 0 when z>>|v|
kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0)
use_largev = (abs_v > z_safe + 2.0) & kve_bad
log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez)
return np.where(kve_bad, log_asymp, np.log(kve_val))
def _logpdf(self, x, mu, alpha, beta): def _logpdf(self, x, mu, alpha, beta):
half_sum = (alpha + beta) / 2.0 half_sum = (alpha + beta) / 2.0
log_ab_over_mu = np.log(alpha) + np.log(beta) - np.log(mu) log_ab_over_mu = np.log(alpha) + np.log(beta) - np.log(mu)
@@ -208,7 +259,7 @@ class k_gen(rv_continuous):
- gammaln(beta) - gammaln(beta)
+ half_sum * log_ab_over_mu + half_sum * log_ab_over_mu
+ (half_sum - 1.0) * np.log(x) + (half_sum - 1.0) * np.log(x)
+ np.log(kve(alpha - beta, z)) + self._log_kve_stable(alpha - beta, z)
- z - z
) )
@@ -235,18 +286,53 @@ class k_gen(rv_continuous):
return mean, variance, skewness, kurtosis return mean, variance, skewness, kurtosis
def _cdf(self, x, mu, alpha, beta): def _cdf(self, x, mu, alpha, beta):
# scipy broadcasts params to match x's shape before calling _cdf, # K-distribution CDF via the compound-Gamma representation:
# so mu/alpha/beta may be arrays. Pass all four to np.vectorize so # X = Gamma(alpha, tau/alpha), tau ~ Gamma(beta, mu/beta)
# each argument arrives as a scalar inside _scalar. # => F(x) = (1/Γ(β)) ∫_0^∞ gammainc(α, xαβ/(μt)) t^{β-1} e^{-t} dt
# Substitution u = sqrt(x) regularises the integrand near u=0: #
# f(u²)*2u ~ u^(alpha+beta-1), smooth for alpha,beta > 0. # For β ≤ _BETA_GH_THRESH: (β1)-order Gauss-Laguerre quadrature.
# For β > _BETA_GH_THRESH: Gamma(β,1) ≈ Normal(β,√β), so we switch to
# Gauss-Hermite: E[f(t)] ≈ (1/√π) Σ_k w_k f(β + √(2β)·u_k).
from scipy.special import gammainc, roots_genlaguerre
from scipy.special import gamma as sp_gamma
x = np.asarray(x, dtype=float)
mu = np.asarray(mu, dtype=float)
alpha = np.asarray(alpha, dtype=float)
beta = np.asarray(beta, dtype=float)
_N_PTS = 50
def _gh_branch(x_, alpha_, mu_, bi):
"""Gauss-Hermite CDF for large beta (Normal approx to Gamma(β,1))."""
t_gh = bi + np.sqrt(2.0 * bi) * _gh_nodes # shape (GH_M,)
t_gh = np.maximum(t_gh, 1e-10)
t_args = (x_ * alpha_ / mu_)[..., np.newaxis] * (bi / t_gh)
probs = gammainc(alpha_[..., np.newaxis], t_args)
return np.dot(probs, _gh_weights_norm)
# Fast path: beta is uniform across all inputs (the common case in fit).
beta_vals = np.unique(beta)
if beta_vals.size == 1:
bi = float(beta_vals[0])
if bi > _BETA_GH_THRESH:
return _gh_branch(x, alpha, mu, bi)
nodes, weights = roots_genlaguerre(_N_PTS, bi - 1.0)
t_args = (x * alpha / mu)[..., np.newaxis] * (bi / nodes)
probs = gammainc(alpha[..., np.newaxis], t_args)
return np.dot(probs, weights) / sp_gamma(bi)
# Slow path: heterogeneous beta — fall back to per-element loop.
def _scalar(xi, mui, ai, bi): def _scalar(xi, mui, ai, bi):
val, _ = quad( bi = float(bi)
lambda u: float(self._pdf(float(u * u), float(mui), float(ai), float(bi))) * 2.0 * u, xi_, ai_, mui_ = np.asarray([float(xi)]), np.asarray([float(ai)]), float(mui)
0.0, float(np.sqrt(xi)), if bi > _BETA_GH_THRESH:
limit=200, epsabs=1.49e-10, epsrel=1.49e-8, return float(_gh_branch(xi_, ai_, np.asarray([mui_]), bi)[0])
) nodes, weights = roots_genlaguerre(_N_PTS, bi - 1.0)
return val t_args = float(xi) * float(ai) * bi / (float(mui) * nodes)
probs = gammainc(float(ai), t_args)
return float(np.dot(probs, weights) / sp_gamma(bi))
return np.vectorize(_scalar)(x, mu, alpha, beta) return np.vectorize(_scalar)(x, mu, alpha, beta)
def _rvs(self, mu, alpha, beta, size=None, random_state=None): def _rvs(self, mu, alpha, beta, size=None, random_state=None):
@@ -300,27 +386,40 @@ class logk_gen(rv_continuous):
@staticmethod @staticmethod
def _log_kve(v, z): def _log_kve(v, z):
"""log(kve(v, z)) = log(Kv(z)) + z, stable for all z > 0. """Numerically stable log(kve(v, z)) = log(K_v(z)) + z.
For large z, kve(v, z) ≈ sqrt(π/(2z)) underflows to 0 in float64, Three regimes:
making log(kve) return -inf/-nan. The asymptotic expansion: - direct: kve(v, z) > 0 (no over/underflow)
- large-z: z >> |v| — Hankel 2-term asymptotic
log(kve(v,z)) ≈ 0.5*log(π/(2z)) + log1p((4v²-1)/(8z) + - large-order: |v| >> z — leading Gamma asymptotic
(4v²-1)(4v²-9)/(128z²)) log(K_v(z)) ≈ gammaln(|v|) + |v|*log(2/z) - log(2)
is used whenever the direct evaluation would underflow.
""" """
z = np.asarray(z, dtype=float) z = np.asarray(z, dtype=float)
v = np.asarray(v, dtype=float) v = np.asarray(v, dtype=float)
abs_v = np.abs(v)
kve_val = kve(v, z) kve_val = kve(v, z)
# Asymptotic (2-term Hankel expansion) — accurate to O(z^{-3}) z_safe = np.where(z > 0, z, 1.0)
# Large-z Hankel asymptotic (accurate when z >> |v|)
mu_v = 4.0 * v ** 2 mu_v = 4.0 * v ** 2
log_asymp = ( # Clip log1p argument to (-1, inf) to avoid domain errors when np.where
0.5 * (np.log(np.pi) - np.log(2.0) - np.log(np.where(z > 0, z, 1.0))) # evaluates this branch even for points where it won't be selected.
+ np.log1p((mu_v - 1.0) / (8.0 * np.where(z > 0, z, 1.0)) log1p_arg = np.maximum(
+ (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * np.where(z > 0, z**2, 1.0))) (mu_v - 1.0) / (8.0 * z_safe)
+ (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2),
-1.0 + 1e-15,
) )
return np.where(kve_val > 0, np.log(np.where(kve_val > 0, kve_val, 1.0)), log_asymp) log_asymp_largez = (
0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe))
+ np.log1p(log1p_arg)
)
# Large-order asymptotic (accurate when |v| >> z > 0)
# log(kve(v,z)) = log(K_v(z)) + z ≈ gammaln(|v|) + |v|*log(2/z) - log(2) + z
log_asymp_largev = gammaln(abs_v) + abs_v * np.log(2.0 / z_safe) - np.log(2.0) + z
# kve overflows to +inf when |v|>>z, underflows to 0 when z>>|v|
kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0)
use_largev = (abs_v > z_safe + 2.0) & kve_bad
log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez)
return np.where(kve_bad, log_asymp, np.log(kve_val))
def _pdf(self, y, mu, alpha, beta): def _pdf(self, y, mu, alpha, beta):
return np.exp(self._logpdf(y, mu, alpha, beta)) return np.exp(self._logpdf(y, mu, alpha, beta))
@@ -347,33 +446,158 @@ class logk_gen(rv_continuous):
return mean, var, g1, g2 return mean, var, g1, g2
def _cdf(self, y, mu, alpha, beta): def _cdf(self, y, mu, alpha, beta):
return k_dist.cdf(np.exp(y), mu, alpha, beta) # Clip y to avoid exp() overflow (float64 max ≈ exp(709.78)).
# For y > 709 the CDF is indistinguishable from 1.0.
y_clipped = np.minimum(y, 709.0)
return k_dist.cdf(np.exp(y_clipped), mu, alpha, beta)
def _ppf(self, q, mu, alpha, beta):
# Invert via the linear-domain ppf: Y = ln(X), X ~ K(mu, alpha, beta).
# k_dist.ppf has a finite lower bound (a=0) so its bracket search is
# well-defined, avoiding the exp-overflow problem in the default logk ppf.
x = k_dist.ppf(q, mu, alpha, beta)
return np.log(np.maximum(x, np.finfo(float).tiny))
def _rvs(self, mu, alpha, beta, size=None, random_state=None): def _rvs(self, mu, alpha, beta, size=None, random_state=None):
# Compound Gamma: tau ~ Gamma(beta, mu/beta), X|tau ~ Gamma(alpha, tau/alpha)
# Y = ln(X), avoids the ppf -> k_dist.ppf -> quad-CDF chain.
tau = random_state.gamma(beta, mu / beta, size=size) tau = random_state.gamma(beta, mu / beta, size=size)
sample = random_state.gamma(alpha, tau / alpha) x = random_state.gamma(alpha, tau / alpha)
return np.log(np.clip(sample, np.finfo(float).tiny, None)) return np.log(np.maximum(x, np.finfo(float).tiny))
def _sf(self, y, mu, alpha, beta):
y_clipped = np.minimum(y, 709.0)
return k_dist.sf(np.exp(y_clipped), mu, alpha, beta)
def _fitstart(self, data, args=None):
# Symmetric method-of-moments starting point using the first two cumulants:
# E[Y] = ln(mu) - ln(alpha) - ln(beta) + psi(alpha) + psi(beta)
# Var[Y] = psi_1(alpha) + psi_1(beta)
# Symmetric start (alpha=beta): psi_1(a) ≈ 1/a => a ≈ 2/Var
mean_d = float(np.mean(data))
var_d = max(float(np.var(data)), 1e-6)
alpha0 = float(np.clip(2.0 / var_d, 0.5, 50.0))
beta0 = alpha0
mu0 = float(np.exp(
mean_d
+ np.log(alpha0) + np.log(beta0)
- sc.digamma(alpha0) - sc.digamma(beta0)
))
return (mu0, alpha0, beta0, 0.0, 1.0)
def fit(self, data, *args, **kwds): def fit(self, data, *args, **kwds):
if ("loc" in kwds and kwds["loc"] != 0.0) or ("floc" in kwds and kwds["floc"] != 0.0): """MLE fit with bounded shape parameters; loc and scale are free by default.
raise ValueError("logk uses a fixed loc=0.")
if ("scale" in kwds and kwds["scale"] != 1.0) or ("fscale" in kwds and kwds["fscale"] != 1.0): Optimises in log-parameter space via L-BFGS-B to keep mu, alpha, beta
raise ValueError("logk uses a fixed scale=1.") and scale strictly positive.
kwds.pop("loc", None)
kwds.pop("scale", None) The K-distribution log-likelihood is unbounded when alpha or beta grows
# Supply data-driven initial guesses when none are provided so the without limit, so the search is capped at _MAX. Three starting points
# optimizer starts close to the data instead of the default (1,1,1). are tried — one symmetric (alpha=beta from variance matching) and two
# E[Y] = ln(mu) + ln(alpha) + ln(beta) - digamma(alpha) - digamma(beta) asymmetric (one with alpha>>beta and its mirror) — to escape the
# => mu0 = exp(mean(data) + ln(a0) + ln(b0) - psi(a0) - psi(b0)) symmetric local maximum when the data is skewed.
if not args:
alpha0, beta0 = 1.0, 1.0 loc/scale can be pinned by passing floc=0, fscale=1 as keyword args.
mu0 = float(np.exp( """
np.mean(data) from scipy.optimize import minimize, brentq
+ np.log(alpha0) + np.log(beta0)
- sc.digamma(alpha0) - sc.digamma(beta0) # Respect user-supplied fixed values; None means "free to optimise".
floc = kwds.pop('floc', None)
fscale = kwds.pop('fscale', None)
for k in ('loc', 'scale'):
kwds.pop(k, None)
data = np.asarray(data, dtype=float).ravel()
# Upper bound: prevents the degenerate alpha→∞ (or beta→∞) regime.
_MAX = 50.0
# ── symmetric starting point ──────────────────────────────────────────
start = self._fitstart(data)
mu0 = max(float(args[0]) if len(args) > 0 else start[0], 1e-12)
alpha0 = float(np.clip(args[1] if len(args) > 1 else start[1], 0.01, _MAX))
beta0 = float(np.clip(args[2] if len(args) > 2 else start[2], 0.01, _MAX))
# ── asymmetric starting point (all variance in beta, large alpha) ─────
mean_d = float(np.mean(data))
var_d = max(float(np.var(data)), 1e-6)
try:
beta_asym = float(brentq(
lambda b: sc.polygamma(1, b) - var_d, 0.05, 500.0, xtol=1e-8
)) ))
args = (mu0, alpha0, beta0) except Exception:
return super().fit(data, *args, floc=0.0, fscale=1.0, **kwds) beta_asym = beta0
alpha_asym = float(np.clip(5.0 * beta_asym, 1.0, _MAX))
beta_asym = float(np.clip(beta_asym, 0.01, _MAX))
mu_asym = float(np.exp(
mean_d
+ np.log(alpha_asym) + np.log(beta_asym)
- sc.digamma(alpha_asym) - sc.digamma(beta_asym)
))
# Starting values for loc / scale (used when free)
loc0 = float(floc) if floc is not None else 0.0
scale0 = float(fscale) if fscale is not None else 1.0
# ── parameter packing ─────────────────────────────────────────────────
# Vector layout: [log(mu), log(alpha), log(beta), loc?, log(scale)?]
# Slots for loc/scale are only present when they are free.
def pack(mu, alpha, beta):
v = [np.log(mu), np.log(alpha), np.log(beta)]
if floc is None: v.append(loc0)
if fscale is None: v.append(np.log(scale0))
return v
def unpack(v):
mu = np.exp(v[0]); alpha = np.exp(v[1]); beta = np.exp(v[2])
idx = 3
if floc is None:
loc = float(v[idx]); idx += 1
else:
loc = float(floc)
scale = float(np.exp(v[idx])) if fscale is None else float(fscale)
return mu, alpha, beta, loc, scale
bounds = [
(None, None), # log(mu)
(np.log(0.01), np.log(_MAX)), # log(alpha)
(np.log(0.01), np.log(_MAX)), # log(beta)
]
if floc is None: bounds.append((None, None)) # loc
if fscale is None: bounds.append((np.log(1e-4), np.log(100.0))) # log(scale)
def neg_ll(v):
mu, alpha, beta, loc, scale = unpack(v)
with np.errstate(all='ignore'):
ll = np.sum(self.logpdf(data, mu, alpha, beta, loc=loc, scale=scale))
return -ll if np.isfinite(ll) else 1e15
# Candidate starting vectors: symmetric + two asymmetric (α>>β and β>>α)
x0_candidates = [
pack(mu0, alpha0, beta0),
pack(mu_asym, alpha_asym, beta_asym),
pack(mu_asym, beta_asym, alpha_asym),
]
best_res = None
best_nll = np.inf
with np.errstate(all='ignore'):
for x0 in x0_candidates:
x0_safe = list(x0)
x0_safe[1] = float(np.clip(x0_safe[1], bounds[1][0], bounds[1][1]))
x0_safe[2] = float(np.clip(x0_safe[2], bounds[2][0], bounds[2][1]))
res = minimize(
neg_ll,
x0=x0_safe,
method='L-BFGS-B',
bounds=bounds,
options={'ftol': 1e-12, 'gtol': 1e-8, 'maxiter': 2000},
)
if res.fun < best_nll:
best_nll = res.fun
best_res = res
mu_hat, alpha_hat, beta_hat, loc_hat, scale_hat = unpack(best_res.x)
return (mu_hat, alpha_hat, beta_hat, loc_hat, scale_hat)
logk = logk_gen(name="logk", shapes="mu, alpha, beta") logk = logk_gen(name="logk", shapes="mu, alpha, beta")
@@ -708,3 +932,394 @@ class logrice_gen(rv_continuous):
logrice = logrice_gen(name='logrice', shapes="nu, sigma") logrice = logrice_gen(name='logrice', shapes="nu, sigma")
class logweibull_gen(rv_continuous):
"""Log-Weibull continuous random variable.
Y = ln(X) where X ~ Weibull(k, lam). The PDF is:
f(y; k, lam) = (k/lam) * (e^y/lam)^(k-1) * exp(-(e^y/lam)^k) * e^y
for y in (-inf, +inf), k > 0, lam > 0.
Since Y = ln(lam) + (1/k)*ln(W) where W ~ Exp(1), the moments are:
E[Y] = ln(lam) + psi(1)/k
Var[Y] = psi_1(1) / k^2
Skewness and excess kurtosis are k-independent constants (the 1/k scaling
cancels in the standardised moments) given by psi_2(1)/psi_1(1)^(3/2) and
psi_3(1)/psi_1(1)^2 respectively.
The differential entropy is lam-independent:
H(Y) = 1 - psi(1) - ln(k)
"""
def _shape_info(self):
return [
_ShapeInfo("k", False, (0, np.inf), (False, False)),
_ShapeInfo("lam", False, (0, np.inf), (False, False)),
]
def _argcheck(self, k, lam):
return (k > 0) & (lam > 0)
def _pdf(self, y, k, lam):
return np.exp(self._logpdf(y, k, lam))
def _logpdf(self, y, k, lam):
# ln f = ln(k) + k*(y - ln(lam)) - (e^y/lam)^k
return np.log(k) + k * (y - np.log(lam)) - (np.exp(y) / lam) ** k
def _cdf(self, y, k, lam):
return 1.0 - np.exp(-((np.exp(y) / lam) ** k))
def _sf(self, y, k, lam):
return np.exp(-((np.exp(y) / lam) ** k))
def _logcdf(self, y, k, lam):
return np.log1p(-np.exp(-((np.exp(y) / lam) ** k)))
def _logsf(self, y, k, lam):
return -((np.exp(y) / lam) ** k)
def _ppf(self, q, k, lam):
# q = 1 - exp(-(e^y/lam)^k) => y = ln(lam) + ln(-log1p(-q)) / k
return np.log(lam) + np.log(-np.log1p(-q)) / k
def _isf(self, q, k, lam):
# q = exp(-(e^y/lam)^k) => y = ln(lam) + ln(-ln(q)) / k
return np.log(lam) + np.log(-np.log(q)) / k
def _stats(self, k, lam):
# Y = ln(lam) + (1/k)*ln(W), W ~ Exp(1) = Gamma(1, 1)
mu = np.log(lam) + sc.digamma(1) / k
mu2 = sc.polygamma(1, 1) / k ** 2
g1 = sc.polygamma(2, 1) / sc.polygamma(1, 1) ** 1.5
g2 = sc.polygamma(3, 1) / sc.polygamma(1, 1) ** 2
return mu, mu2, g1, g2
def _entropy(self, k, lam):
# H(Y) = 1 - psi(1) - ln(k) (lam-independent: pure location shift)
return 1.0 - sc.digamma(1) - np.log(k)
def _rvs(self, k, lam, size=None, random_state=None):
# X = lam * W^(1/k), W ~ Exp(1) => Y = ln(lam) + ln(W)/k
return np.log(lam) + np.log(random_state.exponential(size=size)) / k
logweibull = logweibull_gen(name="logweibull", shapes="k, lam")
class logricegamma_gen(rv_continuous):
"""Log-RiceGamma continuous random variable.
Y = ln(X) where X has the Rice-Gamma PDF:
f(x; alpha, beta, K) =
4*x*(1+K)*exp(-K) / (Gamma(alpha)*beta^alpha)
* sum_{n=0}^inf [K*(1+K)]^n / (n!)^2
* x^{2n} * [beta*(1+K)*x^2]^{(alpha-1-n)/2}
* K_{alpha-1-n}(2*x*sqrt((1+K)/beta))
for x > 0, giving Y = ln(X) with support on all of R.
Parameters
----------
alpha : float > 0
Shape of the Gamma texture distribution.
beta : float > 0
Scale of the Gamma texture (E[Omega] = alpha*beta).
K : float >= 0
Rice K-factor (specular-to-diffuse power ratio). K=0 recovers
the Rayleigh-Gamma (K-distribution) log-envelope.
The compound representation is: Omega ~ Gamma(alpha, beta), then
X|Omega ~ Rice(nu, sigma) with sigma^2 = Omega/(2*(1+K)) and
nu = sqrt(K*Omega/(1+K)).
"""
def _shape_info(self):
return [
_ShapeInfo("alpha", False, (0, np.inf), (False, False)),
_ShapeInfo("beta", False, (0, np.inf), (False, False)),
_ShapeInfo("K", False, (0, np.inf), (True, False)),
]
def _argcheck(self, alpha, beta, K):
return (alpha > 0) & (beta > 0) & (K >= 0)
@staticmethod
def _log_kve(v, z):
"""log(kve(v, z)) = log(K_v(z)) + z, numerically stable."""
v = np.asarray(v, dtype=float)
z = np.asarray(z, dtype=float)
abs_v = np.abs(v)
with np.errstate(divide='ignore', invalid='ignore', over='ignore'):
kve_val = sc.kve(v, z)
z_safe = np.where(z > 0, z, 1.0)
mu_v = 4.0 * v ** 2
log1p_arg = np.maximum(
(mu_v - 1.0) / (8.0 * z_safe)
+ (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2),
-1.0 + 1e-15,
)
log_asymp_largez = (
0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe))
+ np.log1p(log1p_arg)
)
# log(kve(v,z)) ≈ gammaln(|v|) + |v|*log(2/z) - log(2) + z for |v| >> z
log_asymp_largev = (
sc.gammaln(np.maximum(abs_v, 1e-300))
+ abs_v * np.log(2.0 / z_safe)
- np.log(2.0)
+ z
)
with np.errstate(divide='ignore', invalid='ignore'):
kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0)
use_largev = (abs_v > z_safe + 2.0) & kve_bad
log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez)
return np.where(kve_bad, log_asymp, np.log(kve_val))
def _pdf(self, y, alpha, beta, K):
return np.exp(self._logpdf(y, alpha, beta, K))
#: Number of terms kept in the truncated series for _logpdf.
#: Increase for large K-factors (rule of thumb: N_SERIES > 3*K + 30).
N_SERIES: int = 90
def _logpdf(self, y, alpha, beta, K):
y = np.asarray(y, dtype=float)
x = np.exp(y)
z = 2.0 * x * np.sqrt((1.0 + K) / beta)
# log-prefactor: log(4 x^2 (1+K) e^{-K} / (Γ(α) β^α)), absorbs exp(-z)
# so each series term uses kve-scaled Bessel values
log_pre = (
np.log(4.0) + 2.0 * y + np.log(1.0 + K) - K
- sc.gammaln(alpha) - alpha * np.log(beta)
- z
)
log_b1Kx2 = np.log(beta) + np.log(1.0 + K) + 2.0 * y # log[beta*(1+K)*x^2]
log1pK = np.log(1.0 + K)
logK_safe = np.log(np.where(K > 0, K, 1.0))
log_terms = []
for n in range(self.N_SERIES + 1):
# log(kve(alpha-1-n, z)) = log(K_{alpha-1-n}(z)) + z
log_kve_n = logricegamma_gen._log_kve(alpha - 1.0 - n, z)
# K^n: for K=0 only the n=0 term survives (0^0 = 1)
n_logK = 0.0 if n == 0 else np.where(K > 0, n * logK_safe, -np.inf)
log_Tn = (
n_logK
+ n * log1pK
- 2.0 * float(sc.gammaln(n + 1))
+ 2.0 * n * y # x^{2n} factor
+ (alpha - 1.0 - n) / 2.0 * log_b1Kx2
+ log_kve_n
)
log_terms.append(log_Tn)
log_arr = np.stack(log_terms, axis=0) # shape (N_MAX+1, *y.shape)
max_lt = np.max(log_arr, axis=0) # shape (*y.shape)
with np.errstate(invalid='ignore'):
shifted = log_arr - max_lt[np.newaxis, ...]
log_sum = max_lt + np.log(np.sum(np.exp(shifted), axis=0))
return log_pre + log_sum
def _fitstart(self, data, args=None):
# loc near the data mean; alpha/beta in (0.5, 1) for typical heavy-tail
# radar clutter; K=1 moderate Rice factor; scale near data std.
mean_d = float(np.mean(data))
std_d = float(np.std(data))
return (0.75, 0.75, 3.0, mean_d, max(std_d * 0.5, 0.1))
def _rvs(self, alpha, beta, K, size=None, random_state=None):
# Compound sampler: Omega ~ Gamma(alpha, beta), X|Omega ~ Rice(nu, sigma)
Omega = random_state.gamma(alpha, beta, size=size)
sigma = np.sqrt(Omega / (2.0 * (1.0 + K)))
nu = np.sqrt(K * Omega / (1.0 + K))
z1 = random_state.normal(nu, sigma, size=size)
z2 = random_state.normal(0.0, sigma, size=size)
return np.log(np.hypot(z1, z2))
def _cdf(self, y, alpha, beta, K):
# Compound representation:
# P(Y <= y) = E_Omega[ P(X <= e^y | Omega) ]
# Given Omega, sigma^2 = Omega/(2(1+K)), nu^2 = K*Omega/(1+K):
# P(X <= x | Omega) = ncx2.cdf(x^2/sigma^2, 2, nu^2/sigma^2)
# = ncx2.cdf(2(1+K)e^{2y}/Omega, 2, 2K)
# Non-centrality 2K is Omega-independent. Integrate over Omega ~ Gamma(alpha, beta)
# by substituting s = Omega/beta (so s ~ Gamma(alpha, 1)) and applying
# generalized Gauss-Laguerre quadrature of order alpha-1.
from scipy.special import roots_genlaguerre
from scipy.special import gamma as sp_gamma
y = np.asarray(y, dtype=float)
# Parameters are broadcast to y's shape by scipy; take the unique scalar.
alpha_s = float(np.ravel(alpha)[0])
beta_s = float(np.ravel(beta)[0])
K_s = float(np.ravel(K)[0])
_N_PTS = 50
nodes, weights = roots_genlaguerre(_N_PTS, alpha_s - 1.0)
# ncx2_arg shape: (*y.shape, N_PTS)
ncx2_arg = (
2.0 * (1.0 + K_s) * np.exp(2.0 * y)[..., np.newaxis]
/ (beta_s * nodes)
)
cdf_vals = ncx2.cdf(ncx2_arg, 2.0, 2.0 * K_s)
return np.dot(cdf_vals, weights) / sp_gamma(alpha_s)
logricegamma = logricegamma_gen(name='logricegamma', shapes='alpha, beta, K')
class ricegamma_gen(rv_continuous):
"""RiceGamma continuous random variable (linear-domain envelope).
The probability density function is:
f(x; alpha, beta, K) =
4*x*(1+K)*exp(-K) / (Gamma(alpha)*beta^alpha)
* sum_{n=0}^inf [K*(1+K)]^n / (n!)^2
* x^{2n} * [beta*(1+K)*x^2]^{(alpha-1-n)/2}
* K_{alpha-1-n}(2*x*sqrt((1+K)/beta))
for x > 0, alpha > 0, beta > 0, K >= 0.
This is the linear-domain counterpart of logricegamma: if X ~ ricegamma
then ln(X) ~ logricegamma (same alpha, beta, K, loc=0, scale=1).
The compound representation is: Omega ~ Gamma(alpha, beta), then
X|Omega ~ Rice(nu, sigma) with sigma^2 = Omega/(2*(1+K)) and
nu = sqrt(K*Omega/(1+K)). E[X^2] = alpha*beta.
"""
#: Number of series terms. Increase for large K (rule: N_SERIES > 3*K + 30).
N_SERIES: int = 90
def _shape_info(self):
return [
_ShapeInfo("alpha", False, (0, np.inf), (False, False)),
_ShapeInfo("beta", False, (0, np.inf), (False, False)),
_ShapeInfo("K", False, (0, np.inf), (True, False)),
]
def _argcheck(self, alpha, beta, K):
return (alpha > 0) & (beta > 0) & (K >= 0)
@staticmethod
def _log_kve(v, z):
"""log(kve(v, z)) = log(K_v(z)) + z, numerically stable."""
v = np.asarray(v, dtype=float)
z = np.asarray(z, dtype=float)
abs_v = np.abs(v)
with np.errstate(divide='ignore', invalid='ignore', over='ignore'):
kve_val = sc.kve(v, z)
z_safe = np.where(z > 0, z, 1.0)
mu_v = 4.0 * v ** 2
log1p_arg = np.maximum(
(mu_v - 1.0) / (8.0 * z_safe)
+ (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2),
-1.0 + 1e-15,
)
log_asymp_largez = (
0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe))
+ np.log1p(log1p_arg)
)
log_asymp_largev = (
sc.gammaln(np.maximum(abs_v, 1e-300))
+ abs_v * np.log(2.0 / z_safe)
- np.log(2.0)
+ z
)
with np.errstate(divide='ignore', invalid='ignore'):
kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0)
use_largev = (abs_v > z_safe + 2.0) & kve_bad
log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez)
return np.where(kve_bad, log_asymp, np.log(kve_val))
def _pdf(self, x, alpha, beta, K):
return np.exp(self._logpdf(x, alpha, beta, K))
def _logpdf(self, x, alpha, beta, K):
x = np.asarray(x, dtype=float)
z = 2.0 * x * np.sqrt((1.0 + K) / beta)
# log-prefactor: log(4 x (1+K) e^{-K} / (Γ(α) β^α)), absorbs exp(-z)
log_pre = (
np.log(4.0) + np.log(x) + np.log(1.0 + K) - K
- sc.gammaln(alpha) - alpha * np.log(beta)
- z
)
log_b1Kx2 = np.log(beta) + np.log(1.0 + K) + 2.0 * np.log(x)
log1pK = np.log(1.0 + K)
logK_safe = np.log(np.where(K > 0, K, 1.0))
log_terms = []
for n in range(self.N_SERIES + 1):
log_kve_n = ricegamma_gen._log_kve(alpha - 1.0 - n, z)
n_logK = 0.0 if n == 0 else np.where(K > 0, n * logK_safe, -np.inf)
log_Tn = (
n_logK
+ n * log1pK
- 2.0 * float(sc.gammaln(n + 1))
+ 2.0 * n * np.log(x) # x^{2n} factor
+ (alpha - 1.0 - n) / 2.0 * log_b1Kx2
+ log_kve_n
)
log_terms.append(log_Tn)
log_arr = np.stack(log_terms, axis=0)
max_lt = np.max(log_arr, axis=0)
with np.errstate(invalid='ignore'):
shifted = log_arr - max_lt[np.newaxis, ...]
log_sum = max_lt + np.log(np.sum(np.exp(shifted), axis=0))
return log_pre + log_sum
def _cdf(self, x, alpha, beta, K):
# P(X <= x | Omega) = ncx2.cdf(2*(1+K)*x^2/Omega, 2, 2K)
# Integrate out Omega ~ Gamma(alpha, beta) via generalised Gauss-Laguerre.
from scipy.special import roots_genlaguerre
from scipy.special import gamma as sp_gamma
x = np.asarray(x, dtype=float)
alpha_s = float(np.ravel(alpha)[0])
beta_s = float(np.ravel(beta)[0])
K_s = float(np.ravel(K)[0])
nodes, weights = roots_genlaguerre(50, alpha_s - 1.0)
ncx2_arg = (
2.0 * (1.0 + K_s) * (x[..., np.newaxis] ** 2)
/ (beta_s * nodes)
)
cdf_vals = ncx2.cdf(ncx2_arg, 2.0, 2.0 * K_s)
return np.dot(cdf_vals, weights) / sp_gamma(alpha_s)
def _rvs(self, alpha, beta, K, size=None, random_state=None):
# Compound sampler: Omega ~ Gamma(alpha, beta), X|Omega ~ Rice(nu, sigma)
Omega = random_state.gamma(alpha, beta, size=size)
sigma = np.sqrt(Omega / (2.0 * (1.0 + K)))
nu = np.sqrt(K * Omega / (1.0 + K))
z1 = random_state.normal(nu, sigma, size=size)
z2 = random_state.normal(0.0, sigma, size=size)
return np.hypot(z1, z2)
def _fitstart(self, data, args=None):
# Shapes at scipy's default (1.0); loc = data mean; scale = data std.
if args is None:
args = (1.0,) * self.numargs
return args + (float(np.mean(data)), float(np.std(data)))
ricegamma = ricegamma_gen(a=0.0, name='ricegamma', shapes='alpha, beta, K')