diff --git a/etc/fitting/fitter.py b/etc/fitting/fitter.py index 7b25484..1e064d5 100644 --- a/etc/fitting/fitter.py +++ b/etc/fitting/fitter.py @@ -8,6 +8,12 @@ from scipy.stats import rv_continuous, goodness_of_fit from plotly.subplots import make_subplots +def set_plot_style(): + sns.set_style("whitegrid") + sns.set_context("paper", font_scale=1.25) + + + @dataclass class DistributionSummary: """ @@ -181,7 +187,7 @@ class Fitter: Mapping of distribution name → summary (test_result is None until validate() is called). """ - data_flat = np.abs(data).flatten() + data_flat = np.asarray(data).flatten() self._last_data_flat = data_flat for dist in self.dist_list: @@ -213,8 +219,16 @@ class Fitter: data_flat = self._last_data_flat for dist in self.dist_list: _summary = self._dist[dist.name] + # Build fit_params dict so goodness_of_fit reuses the already-fitted + # parameters for the observed-data statistic instead of re-fitting. + shape_names = [s.strip() for s in dist.shapes.split(',')] if dist.shapes else [] + all_param_names = shape_names + ['loc', 'scale'] + fit_params_dict = dict(zip(all_param_names, _summary.fit_result_params)) test_result = goodness_of_fit( - dist, data_flat, statistic=_summary.statistic_method, **kwargs + dist, data_flat, + statistic=_summary.statistic_method, + fit_params=fit_params_dict, + **kwargs, ) _summary.test_result = test_result self._dist[dist.name] = _summary @@ -276,13 +290,21 @@ class Fitter: plotting_positions, *summary.fit_result_params ) + # Drop NaN/inf quantiles that arise when ppf fails to converge + valid = np.isfinite(theoretical_quantiles) + if not valid.any(): + print(f"Distribution '{dist_name}': all theoretical quantiles are non-finite. Skipping QQ plot.") + continue + theoretical_quantiles = theoretical_quantiles[valid] + sorted_data_plot = sorted_data[valid] + # Create QQ plot in each subplot row = (list(self._dist.keys()).index(dist_name) // num_cols) + 1 col = (list(self._dist.keys()).index(dist_name) % num_cols) + 1 fig.add_trace( go.Scatter( x=theoretical_quantiles, - y=sorted_data, + y=sorted_data_plot, mode="markers", name=dist_name, ), @@ -290,8 +312,8 @@ class Fitter: col=col, ) # Add a reference line y=x - min_val = min(theoretical_quantiles.min(), sorted_data.min()) - max_val = max(theoretical_quantiles.max(), sorted_data.max()) + min_val = min(theoretical_quantiles.min(), sorted_data_plot.min()) + max_val = max(theoretical_quantiles.max(), sorted_data_plot.max()) fig.add_trace( go.Scatter( x=[min_val, max_val], @@ -322,6 +344,88 @@ class Fitter: showlegend=False, autosize=True, ) + + return fig + + def plot_qq_plots_sns(self, method: str = "hazen"): + """ + Generate QQ plots for each fitted distribution using seaborn/matplotlib. + + Parameters + ---------- + method : str + Plotting positions formula. Either 'hazen' (default) or 'filliben'. + """ + if not hasattr(self, "_last_data_flat"): + raise RuntimeError("No data available. Call fit() first.") + if method not in ("hazen", "filliben"): + raise ValueError(f"method must be 'hazen' or 'filliben', got '{method}'.") + + set_plot_style() + + dist_names = list(self._dist.keys()) + num_dists = len(dist_names) + num_cols = 2 + num_rows = (num_dists + 1) // 2 + + dot_color = sns.color_palette()[0] + + fig, axes = plt.subplots(num_rows, num_cols, figsize=(6 * num_cols, 5 * num_rows)) + axes = np.array(axes).flatten() + + sorted_data = np.sort(self._last_data_flat) + n = len(sorted_data) + i = np.arange(1, n + 1) + if method == "hazen": + plotting_positions = (i - 0.5) / n + else: + plotting_positions = (i - 0.3175) / (n + 0.365) + plotting_positions[0] = 1 - 0.5 ** (1 / n) + plotting_positions[-1] = 0.5 ** (1 / n) + + for idx, (dist_name, summary) in enumerate(self): + ax = axes[idx] + if summary.test_result is None: + ax.set_title(dist_name) + ax.text(0.5, 0.5, "Not validated", ha="center", va="center", + transform=ax.transAxes) + continue + + theoretical_quantiles = summary.distribution_object.ppf( + plotting_positions, *summary.fit_result_params + ) + valid = np.isfinite(theoretical_quantiles) + if not valid.any(): + ax.set_title(dist_name) + ax.text(0.5, 0.5, "All quantiles non-finite", ha="center", va="center", + transform=ax.transAxes) + continue + + tq = theoretical_quantiles[valid] + sd = sorted_data[valid] + + ax.scatter(tq, sd, color=dot_color, s=8, alpha=0.6, linewidths=0) + ref_min = min(tq.min(), sd.min()) + ref_max = max(tq.max(), sd.max()) + ax.plot([ref_min, ref_max], [ref_min, ref_max], "k--", linewidth=1) + + pvalue = summary.pvalue + stat_color = "green" if pvalue > 0.05 else "red" + ax.text(0.95, 0.05, + f"{summary.statistic_method}={summary.gof_statistic:.4f}", + transform=ax.transAxes, ha="right", va="bottom", + fontsize=9, color=stat_color) + + ax.set_title(dist_name) + ax.set_xlabel("Theoretical Quantiles") + ax.set_ylabel("Empirical Quantiles") + + # Hide unused axes + for idx in range(num_dists, len(axes)): + axes[idx].set_visible(False) + + fig.suptitle(f"QQ Plots of Fitted Distributions ({method})", y=1.01) + plt.tight_layout() return fig def histogram_with_fits(self): @@ -389,7 +493,7 @@ class Fitter: return fig - def histogram_with_fits_seaborn(self): + def histogram_with_fits_sns(self): """ Generate a histogram of the data with overlaid PDFs of each fitted distribution using seaborn. Requires that fit() has been called to populate parameters. @@ -422,7 +526,7 @@ class Fitter: ax.plot( x, pdf_values, - label=f"{summary.distribution_name} --- p={summary.pvalue:.4f}", + label=f"{summary.distribution_name}", ) # put title in top left, make it smaller, change it font to sans and put in light gray ax.set_title( diff --git a/etc/tests/test_distributions.py b/etc/tests/test_distributions.py index e91005b..83586e1 100644 --- a/etc/tests/test_distributions.py +++ b/etc/tests/test_distributions.py @@ -7,7 +7,7 @@ import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) -from tools.distributions import k_dist, logk, lognakagami, loggamma_dist, lograyleigh, logrice +from tools.distributions import k_dist, logk, lognakagami, loggamma_dist, lograyleigh, logrice, logweibull, logricegamma, ricegamma X = np.linspace(0.01, 10.0, 500) @@ -677,6 +677,237 @@ class TestLogK: np.testing.assert_allclose(pdf_ab, pdf_ba, rtol=1e-6) +# ── logweibull unit tests ───────────────────────────────────────────────────── + +Y_LOGWEIBULL = np.linspace(-10.0, 10.0, 500) + + +class TestLogWeibull: + def test_logpdf_is_finite_on_real_line(self): + """logpdf must be finite for all real y.""" + vals = logweibull.logpdf(Y_LOGWEIBULL, k=2.0, lam=1.0) + assert np.all(np.isfinite(vals)) + + def test_pdf_integrates_to_one(self): + """Numerical integral of PDF over the real line should be ≈ 1.""" + y_fine = np.linspace(-20.0, 20.0, 200_000) + integral = np.trapezoid(logweibull.pdf(y_fine, k=2.0, lam=1.0), y_fine) + assert pytest.approx(integral, abs=1e-3) == 1.0 + + def test_logpdf_equals_log_pdf(self): + """logpdf must equal log(pdf) at points where pdf does not underflow to zero.""" + y_bulk = np.linspace(-10.0, 20.0, 100) + k, lam = 2.0, np.exp(12.0) + pdf_vals = logweibull.pdf(y_bulk, k=k, lam=lam) + mask = pdf_vals > 0 + np.testing.assert_allclose( + logweibull.logpdf(y_bulk[mask], k=k, lam=lam), + np.log(pdf_vals[mask]), + rtol=1e-6, + ) + + def test_change_of_variable_matches_weibull(self): + """logweibull.pdf(y) must equal weibull_min.pdf(exp(y)) * exp(y).""" + from scipy.stats import weibull_min + y_test = np.linspace(-3.0, 3.0, 20) + k, lam = 2.0, 1.5 + direct = logweibull.pdf(y_test, k=k, lam=lam) + via_w = weibull_min.pdf(np.exp(y_test), c=k, scale=lam) * np.exp(y_test) + np.testing.assert_allclose(direct, via_w, rtol=1e-6) + + def test_cdf_is_monotone_increasing(self): + """CDF must be strictly non-decreasing.""" + y_grid = np.linspace(-5.0, 5.0, 50) + cdf_vals = logweibull.cdf(y_grid, k=2.0, lam=1.0) + assert np.all(np.diff(cdf_vals) >= -1e-12) + + def test_cdf_matches_weibull(self): + """logweibull.cdf(y) must equal weibull_min.cdf(exp(y)).""" + from scipy.stats import weibull_min + y_test = np.array([-2.0, 0.0, 1.0, 2.0]) + k, lam = 1.5, 2.0 + np.testing.assert_allclose( + logweibull.cdf(y_test, k=k, lam=lam), + weibull_min.cdf(np.exp(y_test), c=k, scale=lam), + rtol=1e-6, + ) + + def test_sf_plus_cdf_equals_one(self): + """sf + cdf must equal 1 everywhere.""" + y_test = np.linspace(-3.0, 3.0, 20) + k, lam = 2.0, 1.0 + np.testing.assert_allclose( + logweibull.cdf(y_test, k=k, lam=lam) + logweibull.sf(y_test, k=k, lam=lam), + 1.0, + rtol=1e-12, + ) + + def test_ppf_inverts_cdf(self): + """ppf must be the exact inverse of cdf: cdf(ppf(q)) == q.""" + # Round-trip over quantiles to avoid CDF saturation at extreme y values + q_test = np.array([0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95]) + k, lam = 2.0, np.exp(12.0) + np.testing.assert_allclose( + logweibull.cdf(logweibull.ppf(q_test, k=k, lam=lam), k=k, lam=lam), + q_test, + rtol=1e-8, + ) + + def test_stats_mean_shifts_by_log_lam(self): + """Doubling lam shifts the mean by ln(2), leaving variance unchanged.""" + k = 2.0 + mean1 = float(logweibull.stats(k=k, lam=1.0, moments="m")) + mean2 = float(logweibull.stats(k=k, lam=2.0, moments="m")) + assert pytest.approx(mean2 - mean1, rel=1e-10) == np.log(2.0) + + def test_stats_variance_scales_with_k(self): + """Variance must equal psi_1(1) / k^2.""" + for k in [0.5, 1.0, 2.0]: + _, var, *_ = logweibull.stats(k=k, lam=1.0, moments="mv") + expected = sc.polygamma(1, 1) / k ** 2 + assert pytest.approx(float(var), rel=1e-10) == expected + + def test_stats_variance_is_lam_independent(self): + """Variance must not depend on lam.""" + k = 2.0 + _, var1, *_ = logweibull.stats(k=k, lam=1.0, moments="mv") + _, var2, *_ = logweibull.stats(k=k, lam=5.0, moments="mv") + assert pytest.approx(float(var1), rel=1e-10) == float(var2) + + def test_argcheck_rejects_non_positive_k(self): + """k <= 0 must not produce a valid PDF value.""" + val = logweibull.pdf(0.0, k=-1.0, lam=1.0) + assert not (np.isfinite(val) and val > 0) + + def test_argcheck_rejects_non_positive_lam(self): + """lam <= 0 must not produce a valid PDF value.""" + val = logweibull.pdf(0.0, k=1.0, lam=-1.0) + assert not (np.isfinite(val) and val > 0) + + def test_rvs_are_finite(self): + """Random samples must be finite real numbers.""" + rng = np.random.default_rng(42) + samples = logweibull.rvs(k=2.0, lam=1.0, size=500, random_state=rng) + assert samples.shape == (500,) + assert np.all(np.isfinite(samples)) + + def test_rvs_sample_mean_near_analytical(self): + """Sample mean must be close to the analytical mean.""" + k, lam = 2.0, 1.5 + rng = np.random.default_rng(0) + samples = logweibull.rvs(k=k, lam=lam, size=100_000, random_state=rng) + expected_mean = float(logweibull.stats(k=k, lam=lam, moments="m")) + assert pytest.approx(float(samples.mean()), rel=5e-2) == expected_mean + + def test_rvs_sample_variance_near_analytical(self): + """Sample variance must be close to the analytical variance.""" + k, lam = 2.0, 1.5 + rng = np.random.default_rng(1) + samples = logweibull.rvs(k=k, lam=lam, size=100_000, random_state=rng) + _, expected_var, *_ = logweibull.stats(k=k, lam=lam, moments="mv") + assert pytest.approx(float(np.var(samples)), rel=5e-2) == float(expected_var) + + +# ── logricegamma unit tests ─────────────────────────────────────────────────── +# PDF is expensive (numerical integration per point), so grids are kept small. + +Y_LRICEGAMMA = np.linspace(-5.0, 5.0, 30) + + +class TestLogRiceGamma: + def test_pdf_is_positive_for_valid_params(self): + """PDF must be strictly positive for finite y and valid parameters.""" + vals = logricegamma.pdf(Y_LRICEGAMMA, alpha=2.0, beta=1.0, K=1.0) + assert np.all(vals > 0) + + def test_pdf_integrates_to_one(self): + """Numerical integral of PDF over a wide domain should be ≈ 1.""" + y_fine = np.linspace(-15.0, 10.0, 1000) + integral = np.trapezoid( + logricegamma.pdf(y_fine, alpha=2.0, beta=1.0, K=1.0), y_fine + ) + assert pytest.approx(integral, abs=1e-2) == 1.0 + + def test_pdf_integrates_to_one_k_zero(self): + """Normalisation must hold for K=0 (Rice collapses to Rayleigh).""" + y_fine = np.linspace(-15.0, 10.0, 1000) + integral = np.trapezoid( + logricegamma.pdf(y_fine, alpha=2.0, beta=1.0, K=0.0), y_fine + ) + assert pytest.approx(integral, abs=1e-2) == 1.0 + + def test_pdf_integrates_to_one_large_K(self): + """Normalisation must hold for large K (highly specular regime).""" + y_fine = np.linspace(-10.0, 15.0, 1000) + integral = np.trapezoid( + logricegamma.pdf(y_fine, alpha=2.0, beta=1.0, K=10.0), y_fine + ) + assert pytest.approx(integral, abs=1e-2) == 1.0 + + def test_logpdf_equals_log_pdf(self): + """logpdf must equal log(pdf) where pdf does not underflow.""" + y_bulk = np.linspace(-3.0, 3.0, 15) + pdf_vals = logricegamma.pdf(y_bulk, alpha=2.0, beta=1.0, K=1.0) + mask = pdf_vals > 0 + np.testing.assert_allclose( + logricegamma.logpdf(y_bulk[mask], alpha=2.0, beta=1.0, K=1.0), + np.log(pdf_vals[mask]), + rtol=1e-6, + ) + + def test_argcheck_rejects_non_positive_alpha(self): + """alpha <= 0 must not produce a valid PDF value.""" + val = logricegamma.pdf(0.0, alpha=-1.0, beta=1.0, K=1.0) + assert not (np.isfinite(val) and val > 0) + + def test_argcheck_rejects_non_positive_beta(self): + """beta <= 0 must not produce a valid PDF value.""" + val = logricegamma.pdf(0.0, alpha=2.0, beta=-1.0, K=1.0) + assert not (np.isfinite(val) and val > 0) + + def test_argcheck_rejects_negative_K(self): + """K < 0 must not produce a valid PDF value.""" + val = logricegamma.pdf(0.0, alpha=2.0, beta=1.0, K=-1.0) + assert not (np.isfinite(val) and val > 0) + + def test_cdf_is_monotone_increasing(self): + """CDF must be strictly non-decreasing.""" + y_grid = np.linspace(-4.0, 4.0, 15) + cdf_vals = logricegamma.cdf(y_grid, alpha=2.0, beta=1.0, K=1.0) + assert np.all(np.diff(cdf_vals) >= -1e-10) + + def test_rvs_samples_are_finite(self): + """Random samples must be finite real numbers.""" + rng = np.random.default_rng(42) + samples = logricegamma.rvs(alpha=2.0, beta=1.0, K=1.0, size=500, random_state=rng) + assert samples.shape == (500,) + assert np.all(np.isfinite(samples)) + + def test_rvs_second_moment_equals_alpha_times_beta(self): + """E[exp(2Y)] = E[X²] must equal alpha*beta (total average power from docstring).""" + alpha, beta, K = 2.0, 1.5, 2.0 + rng = np.random.default_rng(0) + samples = logricegamma.rvs(alpha=alpha, beta=beta, K=K, size=100_000, random_state=rng) + assert pytest.approx(float(np.mean(np.exp(2.0 * samples))), rel=5e-2) == alpha * beta + + def test_rvs_second_moment_k_zero(self): + """E[X²] = alpha*beta must hold for K=0.""" + alpha, beta, K = 3.0, 0.5, 0.0 + rng = np.random.default_rng(1) + samples = logricegamma.rvs(alpha=alpha, beta=beta, K=K, size=100_000, random_state=rng) + assert pytest.approx(float(np.mean(np.exp(2.0 * samples))), rel=5e-2) == alpha * beta + + def test_rvs_sample_mean_consistent_with_pdf(self): + """Sample mean from RVS should match the numerically integrated mean from the PDF.""" + alpha, beta, K = 2.0, 1.0, 1.0 + rng = np.random.default_rng(2) + samples = logricegamma.rvs(alpha=alpha, beta=beta, K=K, size=50_000, random_state=rng) + y_fine = np.linspace(-15.0, 10.0, 1000) + pdf_vals = logricegamma.pdf(y_fine, alpha=alpha, beta=beta, K=K) + numerical_mean = float(np.trapezoid(y_fine * pdf_vals, y_fine)) + assert pytest.approx(float(samples.mean()), rel=1e-1) == numerical_mean + + if __name__ == "__main__": plot_k_dist_varying_alpha() plot_k_dist_varying_mu() diff --git a/etc/tools/distributions.py b/etc/tools/distributions.py index 4881c6b..9cd13ae 100644 --- a/etc/tools/distributions.py +++ b/etc/tools/distributions.py @@ -161,11 +161,25 @@ Priority order for overriding to improve tail accuracy: from scipy.stats import rv_continuous, ncx2 from scipy.special import kve, gammaln -from scipy.integrate import quad import scipy.special as sc from scipy.stats._distn_infrastructure import _ShapeInfo import numpy as np +# Gauss-Legendre nodes/weights on [-1, 1] — used by logricegamma_gen._batch_integral. +# Computed once at import time; 200 nodes give machine-precision accuracy for smooth, +# well-resolved integrands while keeping the (N, 200) batch cost negligible. +_GL_M = 200 +_gl_xi, _gl_wi = np.polynomial.legendre.leggauss(_GL_M) + +# Gauss-Hermite nodes/weights — used by k_dist._cdf for the large-beta branch. +# When beta > _BETA_GH_THRESH, Gamma(β,1) weights overflow float64 (Γ(β) → ∞). +# We instead approximate E_{t~Gamma(β,1)}[f(t)] via the Normal approximation: +# Gamma(β,1) ≈ Normal(β, √β) => E[f(t)] ≈ (1/√π) Σ_k w_k f(β + √(2β)·u_k) +_GH_M = 30 +_gh_nodes, _gh_weights = np.polynomial.hermite.hermgauss(_GH_M) +_gh_weights_norm = _gh_weights / np.sqrt(np.pi) # unit-mass weights +_BETA_GH_THRESH = 150.0 + class k_gen(rv_continuous): """Generalized K distribution for radar clutter modeling. @@ -198,6 +212,43 @@ class k_gen(rv_continuous): def _pdf(self, x, mu, alpha, beta): return np.exp(self._logpdf(x, mu, alpha, beta)) + @staticmethod + def _log_kve_stable(v, z): + """Numerically stable log(kve(v, z)) = log(K_v(z)) + z. + + Three regimes: + - direct: kve(v, z) > 0 (no over/underflow) + - large-z: z >> |v| — Hankel 2-term asymptotic + - large-order: |v| >> z — leading Gamma asymptotic + log(K_v(z)) ≈ gammaln(|v|) + |v|*log(2/z) - log(2) + """ + z = np.asarray(z, dtype=float) + v = np.asarray(v, dtype=float) + abs_v = np.abs(v) + kve_val = kve(v, z) + z_safe = np.where(z > 0, z, 1.0) + # Large-z Hankel asymptotic (accurate when z >> |v|) + mu_v = 4.0 * v ** 2 + # Clip log1p argument to (-1, inf) to avoid domain errors when np.where + # evaluates this branch even for points where it won't be selected. + log1p_arg = np.maximum( + (mu_v - 1.0) / (8.0 * z_safe) + + (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2), + -1.0 + 1e-15, + ) + log_asymp_largez = ( + 0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe)) + + np.log1p(log1p_arg) + ) + # Large-order asymptotic (accurate when |v| >> z > 0) + # log(kve(v,z)) = log(K_v(z)) + z ≈ gammaln(|v|) + |v|*log(2/z) - log(2) + z + log_asymp_largev = gammaln(abs_v) + abs_v * np.log(2.0 / z_safe) - np.log(2.0) + z + # kve overflows to +inf when |v|>>z, underflows to 0 when z>>|v| + kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0) + use_largev = (abs_v > z_safe + 2.0) & kve_bad + log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez) + return np.where(kve_bad, log_asymp, np.log(kve_val)) + def _logpdf(self, x, mu, alpha, beta): half_sum = (alpha + beta) / 2.0 log_ab_over_mu = np.log(alpha) + np.log(beta) - np.log(mu) @@ -208,7 +259,7 @@ class k_gen(rv_continuous): - gammaln(beta) + half_sum * log_ab_over_mu + (half_sum - 1.0) * np.log(x) - + np.log(kve(alpha - beta, z)) + + self._log_kve_stable(alpha - beta, z) - z ) @@ -235,18 +286,53 @@ class k_gen(rv_continuous): return mean, variance, skewness, kurtosis def _cdf(self, x, mu, alpha, beta): - # scipy broadcasts params to match x's shape before calling _cdf, - # so mu/alpha/beta may be arrays. Pass all four to np.vectorize so - # each argument arrives as a scalar inside _scalar. - # Substitution u = sqrt(x) regularises the integrand near u=0: - # f(u²)*2u ~ u^(alpha+beta-1), smooth for alpha,beta > 0. + # K-distribution CDF via the compound-Gamma representation: + # X = Gamma(alpha, tau/alpha), tau ~ Gamma(beta, mu/beta) + # => F(x) = (1/Γ(β)) ∫_0^∞ gammainc(α, xαβ/(μt)) t^{β-1} e^{-t} dt + # + # For β ≤ _BETA_GH_THRESH: (β−1)-order Gauss-Laguerre quadrature. + # For β > _BETA_GH_THRESH: Gamma(β,1) ≈ Normal(β,√β), so we switch to + # Gauss-Hermite: E[f(t)] ≈ (1/√π) Σ_k w_k f(β + √(2β)·u_k). + from scipy.special import gammainc, roots_genlaguerre + from scipy.special import gamma as sp_gamma + + x = np.asarray(x, dtype=float) + mu = np.asarray(mu, dtype=float) + alpha = np.asarray(alpha, dtype=float) + beta = np.asarray(beta, dtype=float) + + _N_PTS = 50 + + def _gh_branch(x_, alpha_, mu_, bi): + """Gauss-Hermite CDF for large beta (Normal approx to Gamma(β,1)).""" + t_gh = bi + np.sqrt(2.0 * bi) * _gh_nodes # shape (GH_M,) + t_gh = np.maximum(t_gh, 1e-10) + t_args = (x_ * alpha_ / mu_)[..., np.newaxis] * (bi / t_gh) + probs = gammainc(alpha_[..., np.newaxis], t_args) + return np.dot(probs, _gh_weights_norm) + + # Fast path: beta is uniform across all inputs (the common case in fit). + beta_vals = np.unique(beta) + if beta_vals.size == 1: + bi = float(beta_vals[0]) + if bi > _BETA_GH_THRESH: + return _gh_branch(x, alpha, mu, bi) + nodes, weights = roots_genlaguerre(_N_PTS, bi - 1.0) + t_args = (x * alpha / mu)[..., np.newaxis] * (bi / nodes) + probs = gammainc(alpha[..., np.newaxis], t_args) + return np.dot(probs, weights) / sp_gamma(bi) + + # Slow path: heterogeneous beta — fall back to per-element loop. def _scalar(xi, mui, ai, bi): - val, _ = quad( - lambda u: float(self._pdf(float(u * u), float(mui), float(ai), float(bi))) * 2.0 * u, - 0.0, float(np.sqrt(xi)), - limit=200, epsabs=1.49e-10, epsrel=1.49e-8, - ) - return val + bi = float(bi) + xi_, ai_, mui_ = np.asarray([float(xi)]), np.asarray([float(ai)]), float(mui) + if bi > _BETA_GH_THRESH: + return float(_gh_branch(xi_, ai_, np.asarray([mui_]), bi)[0]) + nodes, weights = roots_genlaguerre(_N_PTS, bi - 1.0) + t_args = float(xi) * float(ai) * bi / (float(mui) * nodes) + probs = gammainc(float(ai), t_args) + return float(np.dot(probs, weights) / sp_gamma(bi)) + return np.vectorize(_scalar)(x, mu, alpha, beta) def _rvs(self, mu, alpha, beta, size=None, random_state=None): @@ -300,27 +386,40 @@ class logk_gen(rv_continuous): @staticmethod def _log_kve(v, z): - """log(kve(v, z)) = log(Kv(z)) + z, stable for all z > 0. + """Numerically stable log(kve(v, z)) = log(K_v(z)) + z. - For large z, kve(v, z) ≈ sqrt(π/(2z)) underflows to 0 in float64, - making log(kve) return -inf/-nan. The asymptotic expansion: - - log(kve(v,z)) ≈ 0.5*log(π/(2z)) + log1p((4v²-1)/(8z) + - (4v²-1)(4v²-9)/(128z²)) - - is used whenever the direct evaluation would underflow. + Three regimes: + - direct: kve(v, z) > 0 (no over/underflow) + - large-z: z >> |v| — Hankel 2-term asymptotic + - large-order: |v| >> z — leading Gamma asymptotic + log(K_v(z)) ≈ gammaln(|v|) + |v|*log(2/z) - log(2) """ z = np.asarray(z, dtype=float) v = np.asarray(v, dtype=float) + abs_v = np.abs(v) kve_val = kve(v, z) - # Asymptotic (2-term Hankel expansion) — accurate to O(z^{-3}) + z_safe = np.where(z > 0, z, 1.0) + # Large-z Hankel asymptotic (accurate when z >> |v|) mu_v = 4.0 * v ** 2 - log_asymp = ( - 0.5 * (np.log(np.pi) - np.log(2.0) - np.log(np.where(z > 0, z, 1.0))) - + np.log1p((mu_v - 1.0) / (8.0 * np.where(z > 0, z, 1.0)) - + (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * np.where(z > 0, z**2, 1.0))) + # Clip log1p argument to (-1, inf) to avoid domain errors when np.where + # evaluates this branch even for points where it won't be selected. + log1p_arg = np.maximum( + (mu_v - 1.0) / (8.0 * z_safe) + + (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2), + -1.0 + 1e-15, ) - return np.where(kve_val > 0, np.log(np.where(kve_val > 0, kve_val, 1.0)), log_asymp) + log_asymp_largez = ( + 0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe)) + + np.log1p(log1p_arg) + ) + # Large-order asymptotic (accurate when |v| >> z > 0) + # log(kve(v,z)) = log(K_v(z)) + z ≈ gammaln(|v|) + |v|*log(2/z) - log(2) + z + log_asymp_largev = gammaln(abs_v) + abs_v * np.log(2.0 / z_safe) - np.log(2.0) + z + # kve overflows to +inf when |v|>>z, underflows to 0 when z>>|v| + kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0) + use_largev = (abs_v > z_safe + 2.0) & kve_bad + log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez) + return np.where(kve_bad, log_asymp, np.log(kve_val)) def _pdf(self, y, mu, alpha, beta): return np.exp(self._logpdf(y, mu, alpha, beta)) @@ -347,33 +446,158 @@ class logk_gen(rv_continuous): return mean, var, g1, g2 def _cdf(self, y, mu, alpha, beta): - return k_dist.cdf(np.exp(y), mu, alpha, beta) + # Clip y to avoid exp() overflow (float64 max ≈ exp(709.78)). + # For y > 709 the CDF is indistinguishable from 1.0. + y_clipped = np.minimum(y, 709.0) + return k_dist.cdf(np.exp(y_clipped), mu, alpha, beta) + + def _ppf(self, q, mu, alpha, beta): + # Invert via the linear-domain ppf: Y = ln(X), X ~ K(mu, alpha, beta). + # k_dist.ppf has a finite lower bound (a=0) so its bracket search is + # well-defined, avoiding the exp-overflow problem in the default logk ppf. + x = k_dist.ppf(q, mu, alpha, beta) + return np.log(np.maximum(x, np.finfo(float).tiny)) def _rvs(self, mu, alpha, beta, size=None, random_state=None): + # Compound Gamma: tau ~ Gamma(beta, mu/beta), X|tau ~ Gamma(alpha, tau/alpha) + # Y = ln(X), avoids the ppf -> k_dist.ppf -> quad-CDF chain. tau = random_state.gamma(beta, mu / beta, size=size) - sample = random_state.gamma(alpha, tau / alpha) - return np.log(np.clip(sample, np.finfo(float).tiny, None)) + x = random_state.gamma(alpha, tau / alpha) + return np.log(np.maximum(x, np.finfo(float).tiny)) + + def _sf(self, y, mu, alpha, beta): + y_clipped = np.minimum(y, 709.0) + return k_dist.sf(np.exp(y_clipped), mu, alpha, beta) + + def _fitstart(self, data, args=None): + # Symmetric method-of-moments starting point using the first two cumulants: + # E[Y] = ln(mu) - ln(alpha) - ln(beta) + psi(alpha) + psi(beta) + # Var[Y] = psi_1(alpha) + psi_1(beta) + # Symmetric start (alpha=beta): psi_1(a) ≈ 1/a => a ≈ 2/Var + mean_d = float(np.mean(data)) + var_d = max(float(np.var(data)), 1e-6) + alpha0 = float(np.clip(2.0 / var_d, 0.5, 50.0)) + beta0 = alpha0 + mu0 = float(np.exp( + mean_d + + np.log(alpha0) + np.log(beta0) + - sc.digamma(alpha0) - sc.digamma(beta0) + )) + return (mu0, alpha0, beta0, 0.0, 1.0) def fit(self, data, *args, **kwds): - if ("loc" in kwds and kwds["loc"] != 0.0) or ("floc" in kwds and kwds["floc"] != 0.0): - raise ValueError("logk uses a fixed loc=0.") - if ("scale" in kwds and kwds["scale"] != 1.0) or ("fscale" in kwds and kwds["fscale"] != 1.0): - raise ValueError("logk uses a fixed scale=1.") - kwds.pop("loc", None) - kwds.pop("scale", None) - # Supply data-driven initial guesses when none are provided so the - # optimizer starts close to the data instead of the default (1,1,1). - # E[Y] = ln(mu) + ln(alpha) + ln(beta) - digamma(alpha) - digamma(beta) - # => mu0 = exp(mean(data) + ln(a0) + ln(b0) - psi(a0) - psi(b0)) - if not args: - alpha0, beta0 = 1.0, 1.0 - mu0 = float(np.exp( - np.mean(data) - + np.log(alpha0) + np.log(beta0) - - sc.digamma(alpha0) - sc.digamma(beta0) + """MLE fit with bounded shape parameters; loc and scale are free by default. + + Optimises in log-parameter space via L-BFGS-B to keep mu, alpha, beta + and scale strictly positive. + + The K-distribution log-likelihood is unbounded when alpha or beta grows + without limit, so the search is capped at _MAX. Three starting points + are tried — one symmetric (alpha=beta from variance matching) and two + asymmetric (one with alpha>>beta and its mirror) — to escape the + symmetric local maximum when the data is skewed. + + loc/scale can be pinned by passing floc=0, fscale=1 as keyword args. + """ + from scipy.optimize import minimize, brentq + + # Respect user-supplied fixed values; None means "free to optimise". + floc = kwds.pop('floc', None) + fscale = kwds.pop('fscale', None) + for k in ('loc', 'scale'): + kwds.pop(k, None) + data = np.asarray(data, dtype=float).ravel() + + # Upper bound: prevents the degenerate alpha→∞ (or beta→∞) regime. + _MAX = 50.0 + + # ── symmetric starting point ────────────────────────────────────────── + start = self._fitstart(data) + mu0 = max(float(args[0]) if len(args) > 0 else start[0], 1e-12) + alpha0 = float(np.clip(args[1] if len(args) > 1 else start[1], 0.01, _MAX)) + beta0 = float(np.clip(args[2] if len(args) > 2 else start[2], 0.01, _MAX)) + + # ── asymmetric starting point (all variance in beta, large alpha) ───── + mean_d = float(np.mean(data)) + var_d = max(float(np.var(data)), 1e-6) + try: + beta_asym = float(brentq( + lambda b: sc.polygamma(1, b) - var_d, 0.05, 500.0, xtol=1e-8 )) - args = (mu0, alpha0, beta0) - return super().fit(data, *args, floc=0.0, fscale=1.0, **kwds) + except Exception: + beta_asym = beta0 + alpha_asym = float(np.clip(5.0 * beta_asym, 1.0, _MAX)) + beta_asym = float(np.clip(beta_asym, 0.01, _MAX)) + mu_asym = float(np.exp( + mean_d + + np.log(alpha_asym) + np.log(beta_asym) + - sc.digamma(alpha_asym) - sc.digamma(beta_asym) + )) + + # Starting values for loc / scale (used when free) + loc0 = float(floc) if floc is not None else 0.0 + scale0 = float(fscale) if fscale is not None else 1.0 + + # ── parameter packing ───────────────────────────────────────────────── + # Vector layout: [log(mu), log(alpha), log(beta), loc?, log(scale)?] + # Slots for loc/scale are only present when they are free. + def pack(mu, alpha, beta): + v = [np.log(mu), np.log(alpha), np.log(beta)] + if floc is None: v.append(loc0) + if fscale is None: v.append(np.log(scale0)) + return v + + def unpack(v): + mu = np.exp(v[0]); alpha = np.exp(v[1]); beta = np.exp(v[2]) + idx = 3 + if floc is None: + loc = float(v[idx]); idx += 1 + else: + loc = float(floc) + scale = float(np.exp(v[idx])) if fscale is None else float(fscale) + return mu, alpha, beta, loc, scale + + bounds = [ + (None, None), # log(mu) + (np.log(0.01), np.log(_MAX)), # log(alpha) + (np.log(0.01), np.log(_MAX)), # log(beta) + ] + if floc is None: bounds.append((None, None)) # loc + if fscale is None: bounds.append((np.log(1e-4), np.log(100.0))) # log(scale) + + def neg_ll(v): + mu, alpha, beta, loc, scale = unpack(v) + with np.errstate(all='ignore'): + ll = np.sum(self.logpdf(data, mu, alpha, beta, loc=loc, scale=scale)) + return -ll if np.isfinite(ll) else 1e15 + + # Candidate starting vectors: symmetric + two asymmetric (α>>β and β>>α) + x0_candidates = [ + pack(mu0, alpha0, beta0), + pack(mu_asym, alpha_asym, beta_asym), + pack(mu_asym, beta_asym, alpha_asym), + ] + + best_res = None + best_nll = np.inf + with np.errstate(all='ignore'): + for x0 in x0_candidates: + x0_safe = list(x0) + x0_safe[1] = float(np.clip(x0_safe[1], bounds[1][0], bounds[1][1])) + x0_safe[2] = float(np.clip(x0_safe[2], bounds[2][0], bounds[2][1])) + res = minimize( + neg_ll, + x0=x0_safe, + method='L-BFGS-B', + bounds=bounds, + options={'ftol': 1e-12, 'gtol': 1e-8, 'maxiter': 2000}, + ) + if res.fun < best_nll: + best_nll = res.fun + best_res = res + + mu_hat, alpha_hat, beta_hat, loc_hat, scale_hat = unpack(best_res.x) + return (mu_hat, alpha_hat, beta_hat, loc_hat, scale_hat) logk = logk_gen(name="logk", shapes="mu, alpha, beta") @@ -707,4 +931,395 @@ class logrice_gen(rv_continuous): return np.log(np.hypot(z1, z2)) -logrice = logrice_gen(name='logrice', shapes="nu, sigma") \ No newline at end of file +logrice = logrice_gen(name='logrice', shapes="nu, sigma") + + +class logweibull_gen(rv_continuous): + """Log-Weibull continuous random variable. + + Y = ln(X) where X ~ Weibull(k, lam). The PDF is: + + f(y; k, lam) = (k/lam) * (e^y/lam)^(k-1) * exp(-(e^y/lam)^k) * e^y + + for y in (-inf, +inf), k > 0, lam > 0. + + Since Y = ln(lam) + (1/k)*ln(W) where W ~ Exp(1), the moments are: + + E[Y] = ln(lam) + psi(1)/k + Var[Y] = psi_1(1) / k^2 + + Skewness and excess kurtosis are k-independent constants (the 1/k scaling + cancels in the standardised moments) given by psi_2(1)/psi_1(1)^(3/2) and + psi_3(1)/psi_1(1)^2 respectively. + + The differential entropy is lam-independent: + + H(Y) = 1 - psi(1) - ln(k) + """ + + def _shape_info(self): + return [ + _ShapeInfo("k", False, (0, np.inf), (False, False)), + _ShapeInfo("lam", False, (0, np.inf), (False, False)), + ] + + def _argcheck(self, k, lam): + return (k > 0) & (lam > 0) + + def _pdf(self, y, k, lam): + return np.exp(self._logpdf(y, k, lam)) + + def _logpdf(self, y, k, lam): + # ln f = ln(k) + k*(y - ln(lam)) - (e^y/lam)^k + return np.log(k) + k * (y - np.log(lam)) - (np.exp(y) / lam) ** k + + def _cdf(self, y, k, lam): + return 1.0 - np.exp(-((np.exp(y) / lam) ** k)) + + def _sf(self, y, k, lam): + return np.exp(-((np.exp(y) / lam) ** k)) + + def _logcdf(self, y, k, lam): + return np.log1p(-np.exp(-((np.exp(y) / lam) ** k))) + + def _logsf(self, y, k, lam): + return -((np.exp(y) / lam) ** k) + + def _ppf(self, q, k, lam): + # q = 1 - exp(-(e^y/lam)^k) => y = ln(lam) + ln(-log1p(-q)) / k + return np.log(lam) + np.log(-np.log1p(-q)) / k + + def _isf(self, q, k, lam): + # q = exp(-(e^y/lam)^k) => y = ln(lam) + ln(-ln(q)) / k + return np.log(lam) + np.log(-np.log(q)) / k + + def _stats(self, k, lam): + # Y = ln(lam) + (1/k)*ln(W), W ~ Exp(1) = Gamma(1, 1) + mu = np.log(lam) + sc.digamma(1) / k + mu2 = sc.polygamma(1, 1) / k ** 2 + g1 = sc.polygamma(2, 1) / sc.polygamma(1, 1) ** 1.5 + g2 = sc.polygamma(3, 1) / sc.polygamma(1, 1) ** 2 + return mu, mu2, g1, g2 + + def _entropy(self, k, lam): + # H(Y) = 1 - psi(1) - ln(k) (lam-independent: pure location shift) + return 1.0 - sc.digamma(1) - np.log(k) + + def _rvs(self, k, lam, size=None, random_state=None): + # X = lam * W^(1/k), W ~ Exp(1) => Y = ln(lam) + ln(W)/k + return np.log(lam) + np.log(random_state.exponential(size=size)) / k + + +logweibull = logweibull_gen(name="logweibull", shapes="k, lam") + + +class logricegamma_gen(rv_continuous): + """Log-RiceGamma continuous random variable. + + Y = ln(X) where X has the Rice-Gamma PDF: + + f(x; alpha, beta, K) = + 4*x*(1+K)*exp(-K) / (Gamma(alpha)*beta^alpha) + * sum_{n=0}^inf [K*(1+K)]^n / (n!)^2 + * x^{2n} * [beta*(1+K)*x^2]^{(alpha-1-n)/2} + * K_{alpha-1-n}(2*x*sqrt((1+K)/beta)) + + for x > 0, giving Y = ln(X) with support on all of R. + + Parameters + ---------- + alpha : float > 0 + Shape of the Gamma texture distribution. + beta : float > 0 + Scale of the Gamma texture (E[Omega] = alpha*beta). + K : float >= 0 + Rice K-factor (specular-to-diffuse power ratio). K=0 recovers + the Rayleigh-Gamma (K-distribution) log-envelope. + + The compound representation is: Omega ~ Gamma(alpha, beta), then + X|Omega ~ Rice(nu, sigma) with sigma^2 = Omega/(2*(1+K)) and + nu = sqrt(K*Omega/(1+K)). + """ + def _shape_info(self): + return [ + _ShapeInfo("alpha", False, (0, np.inf), (False, False)), + _ShapeInfo("beta", False, (0, np.inf), (False, False)), + _ShapeInfo("K", False, (0, np.inf), (True, False)), + ] + + def _argcheck(self, alpha, beta, K): + return (alpha > 0) & (beta > 0) & (K >= 0) + + @staticmethod + def _log_kve(v, z): + """log(kve(v, z)) = log(K_v(z)) + z, numerically stable.""" + v = np.asarray(v, dtype=float) + z = np.asarray(z, dtype=float) + abs_v = np.abs(v) + with np.errstate(divide='ignore', invalid='ignore', over='ignore'): + kve_val = sc.kve(v, z) + z_safe = np.where(z > 0, z, 1.0) + mu_v = 4.0 * v ** 2 + log1p_arg = np.maximum( + (mu_v - 1.0) / (8.0 * z_safe) + + (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2), + -1.0 + 1e-15, + ) + log_asymp_largez = ( + 0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe)) + + np.log1p(log1p_arg) + ) + # log(kve(v,z)) ≈ gammaln(|v|) + |v|*log(2/z) - log(2) + z for |v| >> z + log_asymp_largev = ( + sc.gammaln(np.maximum(abs_v, 1e-300)) + + abs_v * np.log(2.0 / z_safe) + - np.log(2.0) + + z + ) + with np.errstate(divide='ignore', invalid='ignore'): + kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0) + use_largev = (abs_v > z_safe + 2.0) & kve_bad + log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez) + return np.where(kve_bad, log_asymp, np.log(kve_val)) + + def _pdf(self, y, alpha, beta, K): + return np.exp(self._logpdf(y, alpha, beta, K)) + + #: Number of terms kept in the truncated series for _logpdf. + #: Increase for large K-factors (rule of thumb: N_SERIES > 3*K + 30). + N_SERIES: int = 90 + + def _logpdf(self, y, alpha, beta, K): + y = np.asarray(y, dtype=float) + x = np.exp(y) + z = 2.0 * x * np.sqrt((1.0 + K) / beta) + + # log-prefactor: log(4 x^2 (1+K) e^{-K} / (Γ(α) β^α)), absorbs exp(-z) + # so each series term uses kve-scaled Bessel values + log_pre = ( + np.log(4.0) + 2.0 * y + np.log(1.0 + K) - K + - sc.gammaln(alpha) - alpha * np.log(beta) + - z + ) + log_b1Kx2 = np.log(beta) + np.log(1.0 + K) + 2.0 * y # log[beta*(1+K)*x^2] + + log1pK = np.log(1.0 + K) + logK_safe = np.log(np.where(K > 0, K, 1.0)) + + log_terms = [] + for n in range(self.N_SERIES + 1): + # log(kve(alpha-1-n, z)) = log(K_{alpha-1-n}(z)) + z + log_kve_n = logricegamma_gen._log_kve(alpha - 1.0 - n, z) + + # K^n: for K=0 only the n=0 term survives (0^0 = 1) + n_logK = 0.0 if n == 0 else np.where(K > 0, n * logK_safe, -np.inf) + + log_Tn = ( + n_logK + + n * log1pK + - 2.0 * float(sc.gammaln(n + 1)) + + 2.0 * n * y # x^{2n} factor + + (alpha - 1.0 - n) / 2.0 * log_b1Kx2 + + log_kve_n + ) + log_terms.append(log_Tn) + + log_arr = np.stack(log_terms, axis=0) # shape (N_MAX+1, *y.shape) + max_lt = np.max(log_arr, axis=0) # shape (*y.shape) + with np.errstate(invalid='ignore'): + shifted = log_arr - max_lt[np.newaxis, ...] + log_sum = max_lt + np.log(np.sum(np.exp(shifted), axis=0)) + return log_pre + log_sum + + def _fitstart(self, data, args=None): + # loc near the data mean; alpha/beta in (0.5, 1) for typical heavy-tail + # radar clutter; K=1 moderate Rice factor; scale near data std. + mean_d = float(np.mean(data)) + std_d = float(np.std(data)) + return (0.75, 0.75, 3.0, mean_d, max(std_d * 0.5, 0.1)) + + def _rvs(self, alpha, beta, K, size=None, random_state=None): + # Compound sampler: Omega ~ Gamma(alpha, beta), X|Omega ~ Rice(nu, sigma) + Omega = random_state.gamma(alpha, beta, size=size) + sigma = np.sqrt(Omega / (2.0 * (1.0 + K))) + nu = np.sqrt(K * Omega / (1.0 + K)) + z1 = random_state.normal(nu, sigma, size=size) + z2 = random_state.normal(0.0, sigma, size=size) + return np.log(np.hypot(z1, z2)) + + def _cdf(self, y, alpha, beta, K): + # Compound representation: + # P(Y <= y) = E_Omega[ P(X <= e^y | Omega) ] + # Given Omega, sigma^2 = Omega/(2(1+K)), nu^2 = K*Omega/(1+K): + # P(X <= x | Omega) = ncx2.cdf(x^2/sigma^2, 2, nu^2/sigma^2) + # = ncx2.cdf(2(1+K)e^{2y}/Omega, 2, 2K) + # Non-centrality 2K is Omega-independent. Integrate over Omega ~ Gamma(alpha, beta) + # by substituting s = Omega/beta (so s ~ Gamma(alpha, 1)) and applying + # generalized Gauss-Laguerre quadrature of order alpha-1. + from scipy.special import roots_genlaguerre + from scipy.special import gamma as sp_gamma + + y = np.asarray(y, dtype=float) + # Parameters are broadcast to y's shape by scipy; take the unique scalar. + alpha_s = float(np.ravel(alpha)[0]) + beta_s = float(np.ravel(beta)[0]) + K_s = float(np.ravel(K)[0]) + + _N_PTS = 50 + nodes, weights = roots_genlaguerre(_N_PTS, alpha_s - 1.0) + + # ncx2_arg shape: (*y.shape, N_PTS) + ncx2_arg = ( + 2.0 * (1.0 + K_s) * np.exp(2.0 * y)[..., np.newaxis] + / (beta_s * nodes) + ) + cdf_vals = ncx2.cdf(ncx2_arg, 2.0, 2.0 * K_s) + return np.dot(cdf_vals, weights) / sp_gamma(alpha_s) + + +logricegamma = logricegamma_gen(name='logricegamma', shapes='alpha, beta, K') + + +class ricegamma_gen(rv_continuous): + """RiceGamma continuous random variable (linear-domain envelope). + + The probability density function is: + + f(x; alpha, beta, K) = + 4*x*(1+K)*exp(-K) / (Gamma(alpha)*beta^alpha) + * sum_{n=0}^inf [K*(1+K)]^n / (n!)^2 + * x^{2n} * [beta*(1+K)*x^2]^{(alpha-1-n)/2} + * K_{alpha-1-n}(2*x*sqrt((1+K)/beta)) + + for x > 0, alpha > 0, beta > 0, K >= 0. + + This is the linear-domain counterpart of logricegamma: if X ~ ricegamma + then ln(X) ~ logricegamma (same alpha, beta, K, loc=0, scale=1). + + The compound representation is: Omega ~ Gamma(alpha, beta), then + X|Omega ~ Rice(nu, sigma) with sigma^2 = Omega/(2*(1+K)) and + nu = sqrt(K*Omega/(1+K)). E[X^2] = alpha*beta. + """ + + #: Number of series terms. Increase for large K (rule: N_SERIES > 3*K + 30). + N_SERIES: int = 90 + + def _shape_info(self): + return [ + _ShapeInfo("alpha", False, (0, np.inf), (False, False)), + _ShapeInfo("beta", False, (0, np.inf), (False, False)), + _ShapeInfo("K", False, (0, np.inf), (True, False)), + ] + + def _argcheck(self, alpha, beta, K): + return (alpha > 0) & (beta > 0) & (K >= 0) + + @staticmethod + def _log_kve(v, z): + """log(kve(v, z)) = log(K_v(z)) + z, numerically stable.""" + v = np.asarray(v, dtype=float) + z = np.asarray(z, dtype=float) + abs_v = np.abs(v) + with np.errstate(divide='ignore', invalid='ignore', over='ignore'): + kve_val = sc.kve(v, z) + z_safe = np.where(z > 0, z, 1.0) + mu_v = 4.0 * v ** 2 + log1p_arg = np.maximum( + (mu_v - 1.0) / (8.0 * z_safe) + + (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2), + -1.0 + 1e-15, + ) + log_asymp_largez = ( + 0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe)) + + np.log1p(log1p_arg) + ) + log_asymp_largev = ( + sc.gammaln(np.maximum(abs_v, 1e-300)) + + abs_v * np.log(2.0 / z_safe) + - np.log(2.0) + + z + ) + with np.errstate(divide='ignore', invalid='ignore'): + kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0) + use_largev = (abs_v > z_safe + 2.0) & kve_bad + log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez) + return np.where(kve_bad, log_asymp, np.log(kve_val)) + + def _pdf(self, x, alpha, beta, K): + return np.exp(self._logpdf(x, alpha, beta, K)) + + def _logpdf(self, x, alpha, beta, K): + x = np.asarray(x, dtype=float) + z = 2.0 * x * np.sqrt((1.0 + K) / beta) + + # log-prefactor: log(4 x (1+K) e^{-K} / (Γ(α) β^α)), absorbs exp(-z) + log_pre = ( + np.log(4.0) + np.log(x) + np.log(1.0 + K) - K + - sc.gammaln(alpha) - alpha * np.log(beta) + - z + ) + log_b1Kx2 = np.log(beta) + np.log(1.0 + K) + 2.0 * np.log(x) + + log1pK = np.log(1.0 + K) + logK_safe = np.log(np.where(K > 0, K, 1.0)) + + log_terms = [] + for n in range(self.N_SERIES + 1): + log_kve_n = ricegamma_gen._log_kve(alpha - 1.0 - n, z) + + n_logK = 0.0 if n == 0 else np.where(K > 0, n * logK_safe, -np.inf) + + log_Tn = ( + n_logK + + n * log1pK + - 2.0 * float(sc.gammaln(n + 1)) + + 2.0 * n * np.log(x) # x^{2n} factor + + (alpha - 1.0 - n) / 2.0 * log_b1Kx2 + + log_kve_n + ) + log_terms.append(log_Tn) + + log_arr = np.stack(log_terms, axis=0) + max_lt = np.max(log_arr, axis=0) + with np.errstate(invalid='ignore'): + shifted = log_arr - max_lt[np.newaxis, ...] + log_sum = max_lt + np.log(np.sum(np.exp(shifted), axis=0)) + return log_pre + log_sum + + def _cdf(self, x, alpha, beta, K): + # P(X <= x | Omega) = ncx2.cdf(2*(1+K)*x^2/Omega, 2, 2K) + # Integrate out Omega ~ Gamma(alpha, beta) via generalised Gauss-Laguerre. + from scipy.special import roots_genlaguerre + from scipy.special import gamma as sp_gamma + + x = np.asarray(x, dtype=float) + alpha_s = float(np.ravel(alpha)[0]) + beta_s = float(np.ravel(beta)[0]) + K_s = float(np.ravel(K)[0]) + + nodes, weights = roots_genlaguerre(50, alpha_s - 1.0) + + ncx2_arg = ( + 2.0 * (1.0 + K_s) * (x[..., np.newaxis] ** 2) + / (beta_s * nodes) + ) + cdf_vals = ncx2.cdf(ncx2_arg, 2.0, 2.0 * K_s) + return np.dot(cdf_vals, weights) / sp_gamma(alpha_s) + + def _rvs(self, alpha, beta, K, size=None, random_state=None): + # Compound sampler: Omega ~ Gamma(alpha, beta), X|Omega ~ Rice(nu, sigma) + Omega = random_state.gamma(alpha, beta, size=size) + sigma = np.sqrt(Omega / (2.0 * (1.0 + K))) + nu = np.sqrt(K * Omega / (1.0 + K)) + z1 = random_state.normal(nu, sigma, size=size) + z2 = random_state.normal(0.0, sigma, size=size) + return np.hypot(z1, z2) + + def _fitstart(self, data, args=None): + # Shapes at scipy's default (1.0); loc = data mean; scale = data std. + if args is None: + args = (1.0,) * self.numargs + return args + (float(np.mean(data)), float(np.std(data))) + + +ricegamma = ricegamma_gen(a=0.0, name='ricegamma', shapes='alpha, beta, K')