""" Custom continuous probability distributions for radar clutter modelling. All classes inherit from ``scipy.stats.rv_continuous``. After instantiation each distribution object exposes the full scipy public API summarised below. Parameters ``loc`` and ``scale`` shift and rescale the support; shape parameters (e.g. *mu*, *alpha*, *beta*) are passed as keyword arguments. Public methods (from rv_continuous) ------------------------------------ rvs(*args, loc=0, scale=1, size=1, random_state=None) Draw random variates. pdf(x, *args, loc=0, scale=1) Probability density function evaluated at *x*. logpdf(x, *args, loc=0, scale=1) Natural logarithm of the PDF; numerically preferred when values are small. cdf(x, *args, loc=0, scale=1) Cumulative distribution function: P(X <= x). logcdf(x, *args, loc=0, scale=1) Log of the CDF; avoids underflow in the far left tail. sf(x, *args, loc=0, scale=1) Survival function: 1 - CDF, i.e. P(X > x). logsf(x, *args, loc=0, scale=1) Log of the survival function; avoids underflow in the far right tail. ppf(q, *args, loc=0, scale=1) Percent-point function (quantile): inverse of the CDF. isf(q, *args, loc=0, scale=1) Inverse survival function: inverse of sf. moment(order, *args, loc=0, scale=1) Non-central raw moment of the specified integer order. stats(*args, loc=0, scale=1, moments='mv') Summary statistics selected by *moments* string: 'm' mean, 'v' variance, 's' skewness, 'k' excess kurtosis. entropy(*args, loc=0, scale=1) Differential (Shannon) entropy of the distribution. expect(func, args, loc=0, scale=1, lb=None, ub=None, conditional=False) Expected value of *func(x)* computed by numerical integration. median(*args, loc=0, scale=1) Median of the distribution. mean(*args, loc=0, scale=1) Mean of the distribution. std(*args, loc=0, scale=1) Standard deviation of the distribution. var(*args, loc=0, scale=1) Variance of the distribution. interval(confidence, *args, loc=0, scale=1) Confidence interval with equal probability mass on each side of the median. __call__(*args, loc=0, scale=1) Freeze the distribution — returns a frozen instance with fixed parameters, so methods can be called without repeating shape arguments. fit(data, *args, **kwds) Maximum-likelihood estimates of shape, loc, and scale from *data*. fit_loc_scale(data, *args) Quick loc and scale estimates via method of moments (mean and variance). nnlf(theta, x) Negative log-likelihood for parameter vector *theta* and observations *x*. support(*args, loc=0, scale=1) Lower and upper endpoints of the distribution's support. Overrideable private methods ----------------------------- Subclasses customise behaviour by implementing the private counterparts listed below. When a private method is *not* overridden scipy falls back to a default implementation — often a slower or less precise one. Override to gain speed or numerical accuracy. _argcheck(*args) Validate shape parameters; return a boolean array. Default: always ``True``. Should be overridden to reject invalid values. _get_support(*args) Return ``(lower, upper)`` endpoints of the support as a function of the shape parameters. Default: returns the ``(a, b)`` constants passed at construction time. _pdf(x, *args) Core of the density. No default — must be implemented (or ``_logpdf``). _logpdf(x, *args) Log-density. Default: ``log(_pdf(x))``, which loses precision when ``_pdf`` underflows to zero. Override whenever a stable closed form exists. _cdf(x, *args) Core of the cumulative distribution function. Default: numerical integration of ``_pdf`` from the lower support boundary — slow. _sf(x, *args) Survival function P(X > x). Default: ``1 - _cdf(x)``, which loses significant digits when ``_cdf(x)`` is close to 1 (far right tail). Override with a direct formula whenever one exists. _logcdf(x, *args) Log of the CDF. Default: ``log(_cdf(x))``. _logsf(x, *args) Log of the survival function. Default: ``log(_sf(x))`` = ``log(1 - _cdf(x))``, which is catastrophically inaccurate in the far right tail. Override to avoid cancellation, e.g. via ``log1p(-_cdf(x))`` or a complementary special function. _ppf(q, *args) Percent-point (quantile) function. Default: numerical inversion of ``_cdf`` — slow. Override with a closed-form inverse when available. _isf(q, *args) Inverse survival function. Default: ``_ppf(1 - q)``, which loses precision for small *q* (extreme quantiles). Override when the complementary special function provides a direct inverse. _stats(*args, moments='mv') Return ``(mean, variance, skewness, excess_kurtosis)``; use ``None`` for moments you do not compute. Default: numerical integration — override with analytical expressions for speed and accuracy. _munp(n, *args) Non-central raw moment of integer order *n*. Default: numerical integration. Implement when closed-form moments are available and ``_stats`` is not sufficient. _entropy(*args) Differential entropy. Default: numerical integration of ``-f log f``. Override with a closed-form expression when one exists. _rvs(*args, size=None, random_state=None) Random variate sampler. Default: CDF-inversion via ``_ppf`` — slow for distributions without a closed-form quantile function. Override with a direct simulation algorithm (e.g. a compound-distribution sampler). Numerical accuracy summary ~~~~~~~~~~~~~~~~~~~~~~~~~~ Priority order for overriding to improve tail accuracy: 1. ``_logsf`` / ``_logcdf`` — first line of defence against underflow. 2. ``_sf`` — avoids ``1 - cdf`` cancellation. 3. ``_isf`` — avoids ``ppf(1 - q)`` cancellation. """ from scipy.stats import rv_continuous, ncx2 from scipy.special import kve, gammaln import scipy.special as sc from scipy.stats._distn_infrastructure import _ShapeInfo import numpy as np # Gauss-Legendre nodes/weights on [-1, 1] — used by logricegamma_gen._batch_integral. # Computed once at import time; 200 nodes give machine-precision accuracy for smooth, # well-resolved integrands while keeping the (N, 200) batch cost negligible. _GL_M = 200 _gl_xi, _gl_wi = np.polynomial.legendre.leggauss(_GL_M) # Gauss-Hermite nodes/weights — used by k_dist._cdf for the large-beta branch. # When beta > _BETA_GH_THRESH, Gamma(β,1) weights overflow float64 (Γ(β) → ∞). # We instead approximate E_{t~Gamma(β,1)}[f(t)] via the Normal approximation: # Gamma(β,1) ≈ Normal(β, √β) => E[f(t)] ≈ (1/√π) Σ_k w_k f(β + √(2β)·u_k) _GH_M = 30 _gh_nodes, _gh_weights = np.polynomial.hermite.hermgauss(_GH_M) _gh_weights_norm = _gh_weights / np.sqrt(np.pi) # unit-mass weights _BETA_GH_THRESH = 150.0 class k_gen(rv_continuous): """Generalized K distribution for radar clutter modeling. The probability density function is: f(x; mu, alpha, beta) = 2 / (Gamma(alpha) * Gamma(beta)) * (alpha*beta/mu)^((alpha+beta)/2) * x^((alpha+beta)/2 - 1) * K_{alpha-beta}(2*sqrt(alpha*beta*x/mu)) for x > 0, where K_{alpha-beta} is the modified Bessel function of the second kind of order alpha-beta, mu > 0 is the mean, and alpha > 0, beta > 0 are shape parameters. The standard (2-parameter) K distribution is the special case beta=1, with nu=alpha and b=alpha/mu. """ def _shape_info(self): return [ _ShapeInfo("mu", domain=(0, np.inf), inclusive=(False, True)), _ShapeInfo("alpha", domain=(0, np.inf), inclusive=(True, True)), _ShapeInfo("beta", domain=(0, np.inf), inclusive=(True, True)), ] def _argcheck(self, mu, alpha, beta): return (mu > 0) & (alpha > 0) & (beta > 0) def _pdf(self, x, mu, alpha, beta): return np.exp(self._logpdf(x, mu, alpha, beta)) @staticmethod def _log_kve_stable(v, z): """Numerically stable log(kve(v, z)) = log(K_v(z)) + z. Three regimes: - direct: kve(v, z) > 0 (no over/underflow) - large-z: z >> |v| — Hankel 2-term asymptotic - large-order: |v| >> z — leading Gamma asymptotic log(K_v(z)) ≈ gammaln(|v|) + |v|*log(2/z) - log(2) """ z = np.asarray(z, dtype=float) v = np.asarray(v, dtype=float) abs_v = np.abs(v) kve_val = kve(v, z) z_safe = np.where(z > 0, z, 1.0) # Large-z Hankel asymptotic (accurate when z >> |v|) mu_v = 4.0 * v ** 2 # Clip log1p argument to (-1, inf) to avoid domain errors when np.where # evaluates this branch even for points where it won't be selected. log1p_arg = np.maximum( (mu_v - 1.0) / (8.0 * z_safe) + (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2), -1.0 + 1e-15, ) log_asymp_largez = ( 0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe)) + np.log1p(log1p_arg) ) # Large-order asymptotic (accurate when |v| >> z > 0) # log(kve(v,z)) = log(K_v(z)) + z ≈ gammaln(|v|) + |v|*log(2/z) - log(2) + z log_asymp_largev = gammaln(abs_v) + abs_v * np.log(2.0 / z_safe) - np.log(2.0) + z # kve overflows to +inf when |v|>>z, underflows to 0 when z>>|v| kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0) use_largev = (abs_v > z_safe + 2.0) & kve_bad log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez) return np.where(kve_bad, log_asymp, np.log(kve_val)) def _logpdf(self, x, mu, alpha, beta): half_sum = (alpha + beta) / 2.0 log_ab_over_mu = np.log(alpha) + np.log(beta) - np.log(mu) z = 2.0 * np.sqrt(alpha * beta * x / mu) return ( np.log(2.0) - gammaln(alpha) - gammaln(beta) + half_sum * log_ab_over_mu + (half_sum - 1.0) * np.log(x) + self._log_kve_stable(alpha - beta, z) - z ) def _stats(self, mu, alpha, beta, moments="mv"): mean = mu if "m" in moments or "v" in moments or "s" in moments or "k" in moments else None variance = None if "v" in moments or "s" in moments or "k" in moments: second_raw = (mu ** 2) * sc.poch(alpha, 2) * sc.poch(beta, 2) / ((alpha * beta) ** 2) variance = second_raw - mean ** 2 skewness = None if "s" in moments or "k" in moments: third_raw = (mu ** 3) * sc.poch(alpha, 3) * sc.poch(beta, 3) / ((alpha * beta) ** 3) third_central = third_raw - 3.0 * mean * second_raw + 2.0 * mean ** 3 skewness = third_central / np.power(variance, 1.5) kurtosis = None if "k" in moments: fourth_raw = (mu ** 4) * sc.poch(alpha, 4) * sc.poch(beta, 4) / ((alpha * beta) ** 4) fourth_central = fourth_raw - 4.0 * mean * third_raw + 6.0 * mean ** 2 * second_raw - 3.0 * mean ** 4 kurtosis = fourth_central / (variance ** 2) - 3.0 return mean, variance, skewness, kurtosis def _cdf(self, x, mu, alpha, beta): # K-distribution CDF via the compound-Gamma representation: # X = Gamma(alpha, tau/alpha), tau ~ Gamma(beta, mu/beta) # => F(x) = (1/Γ(β)) ∫_0^∞ gammainc(α, xαβ/(μt)) t^{β-1} e^{-t} dt # # For β ≤ _BETA_GH_THRESH: (β−1)-order Gauss-Laguerre quadrature. # For β > _BETA_GH_THRESH: Gamma(β,1) ≈ Normal(β,√β), so we switch to # Gauss-Hermite: E[f(t)] ≈ (1/√π) Σ_k w_k f(β + √(2β)·u_k). from scipy.special import gammainc, roots_genlaguerre from scipy.special import gamma as sp_gamma x = np.asarray(x, dtype=float) mu = np.asarray(mu, dtype=float) alpha = np.asarray(alpha, dtype=float) beta = np.asarray(beta, dtype=float) _N_PTS = 50 def _gh_branch(x_, alpha_, mu_, bi): """Gauss-Hermite CDF for large beta (Normal approx to Gamma(β,1)).""" t_gh = bi + np.sqrt(2.0 * bi) * _gh_nodes # shape (GH_M,) t_gh = np.maximum(t_gh, 1e-10) t_args = (x_ * alpha_ / mu_)[..., np.newaxis] * (bi / t_gh) probs = gammainc(alpha_[..., np.newaxis], t_args) return np.dot(probs, _gh_weights_norm) # Fast path: beta is uniform across all inputs (the common case in fit). beta_vals = np.unique(beta) if beta_vals.size == 1: bi = float(beta_vals[0]) if bi > _BETA_GH_THRESH: return _gh_branch(x, alpha, mu, bi) nodes, weights = roots_genlaguerre(_N_PTS, bi - 1.0) t_args = (x * alpha / mu)[..., np.newaxis] * (bi / nodes) probs = gammainc(alpha[..., np.newaxis], t_args) return np.dot(probs, weights) / sp_gamma(bi) # Slow path: heterogeneous beta — fall back to per-element loop. def _scalar(xi, mui, ai, bi): bi = float(bi) xi_, ai_, mui_ = np.asarray([float(xi)]), np.asarray([float(ai)]), float(mui) if bi > _BETA_GH_THRESH: return float(_gh_branch(xi_, ai_, np.asarray([mui_]), bi)[0]) nodes, weights = roots_genlaguerre(_N_PTS, bi - 1.0) t_args = float(xi) * float(ai) * bi / (float(mui) * nodes) probs = gammainc(float(ai), t_args) return float(np.dot(probs, weights) / sp_gamma(bi)) return np.vectorize(_scalar)(x, mu, alpha, beta) def _rvs(self, mu, alpha, beta, size=None, random_state=None): # Compound gamma: tau ~ Gamma(beta, mu/beta), X|tau ~ Gamma(alpha, tau/alpha) tau = random_state.gamma(beta, mu / beta, size=size) return random_state.gamma(alpha, tau / alpha) def fit(self, data, *args, **kwds): if ("loc" in kwds and kwds["loc"] != 0.0) or ("floc" in kwds and kwds["floc"] != 0.0): raise ValueError("k_distribution uses a fixed loc=0; use mu to control the mean/scale.") if ("scale" in kwds and kwds["scale"] != 1.0) or ("fscale" in kwds and kwds["fscale"] != 1.0): raise ValueError("k_distribution uses a fixed scale=1; use mu to control the mean/scale.") kwds.pop("loc", None) kwds.pop("scale", None) return super().fit(data, *args, floc=0.0, fscale=1.0, **kwds) k_dist = k_gen(a=0.0, name="k_distribution", shapes="mu, alpha, beta") class logk_gen(rv_continuous): """Log-K continuous random variable. Y = ln(X) where X ~ K(mu, alpha, beta). The PDF is: f(y; mu, alpha, beta) = 2 / (Gamma(alpha) * Gamma(beta)) * (alpha*beta/mu)^((alpha+beta)/2) * exp((alpha+beta)/2 * y) * K_{alpha-beta}(2*sqrt(alpha*beta*exp(y)/mu)) for y in (-inf, +inf), mu > 0, alpha > 0, beta > 0. The cumulants of Y follow from the CGF K(t) = t*ln(mu/(alpha*beta)) + lnGamma(alpha+t) - lnGamma(alpha) + lnGamma(beta+t) - lnGamma(beta), giving: E[Y] = ln(mu) - ln(alpha) - ln(beta) + psi(alpha) + psi(beta) Var[Y] = psi_1(alpha) + psi_1(beta) """ def _shape_info(self): return [ _ShapeInfo("mu", domain=(0, np.inf), inclusive=(False, True)), _ShapeInfo("alpha", domain=(0, np.inf), inclusive=(True, True)), _ShapeInfo("beta", domain=(0, np.inf), inclusive=(True, True)), ] def _argcheck(self, mu, alpha, beta): return (mu > 0) & (alpha > 0) & (beta > 0) @staticmethod def _log_kve(v, z): """Numerically stable log(kve(v, z)) = log(K_v(z)) + z. Three regimes: - direct: kve(v, z) > 0 (no over/underflow) - large-z: z >> |v| — Hankel 2-term asymptotic - large-order: |v| >> z — leading Gamma asymptotic log(K_v(z)) ≈ gammaln(|v|) + |v|*log(2/z) - log(2) """ z = np.asarray(z, dtype=float) v = np.asarray(v, dtype=float) abs_v = np.abs(v) kve_val = kve(v, z) z_safe = np.where(z > 0, z, 1.0) # Large-z Hankel asymptotic (accurate when z >> |v|) mu_v = 4.0 * v ** 2 # Clip log1p argument to (-1, inf) to avoid domain errors when np.where # evaluates this branch even for points where it won't be selected. log1p_arg = np.maximum( (mu_v - 1.0) / (8.0 * z_safe) + (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2), -1.0 + 1e-15, ) log_asymp_largez = ( 0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe)) + np.log1p(log1p_arg) ) # Large-order asymptotic (accurate when |v| >> z > 0) # log(kve(v,z)) = log(K_v(z)) + z ≈ gammaln(|v|) + |v|*log(2/z) - log(2) + z log_asymp_largev = gammaln(abs_v) + abs_v * np.log(2.0 / z_safe) - np.log(2.0) + z # kve overflows to +inf when |v|>>z, underflows to 0 when z>>|v| kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0) use_largev = (abs_v > z_safe + 2.0) & kve_bad log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez) return np.where(kve_bad, log_asymp, np.log(kve_val)) def _pdf(self, y, mu, alpha, beta): return np.exp(self._logpdf(y, mu, alpha, beta)) def _logpdf(self, y, mu, alpha, beta): half_sum = (alpha + beta) / 2.0 log_ab_over_mu = np.log(alpha) + np.log(beta) - np.log(mu) z = 2.0 * np.sqrt(alpha * beta * np.exp(y) / mu) return ( np.log(2.0) - gammaln(alpha) - gammaln(beta) + half_sum * log_ab_over_mu + half_sum * y + self._log_kve(alpha - beta, z) - z ) def _stats(self, mu, alpha, beta): mean = np.log(mu) - np.log(alpha) - np.log(beta) + sc.digamma(alpha) + sc.digamma(beta) var = sc.polygamma(1, alpha) + sc.polygamma(1, beta) g1 = (sc.polygamma(2, alpha) + sc.polygamma(2, beta)) / var**1.5 g2 = (sc.polygamma(3, alpha) + sc.polygamma(3, beta)) / var**2 return mean, var, g1, g2 def _cdf(self, y, mu, alpha, beta): # Clip y to avoid exp() overflow (float64 max ≈ exp(709.78)). # For y > 709 the CDF is indistinguishable from 1.0. y_clipped = np.minimum(y, 709.0) return k_dist.cdf(np.exp(y_clipped), mu, alpha, beta) def _ppf(self, q, mu, alpha, beta): # Invert via the linear-domain ppf: Y = ln(X), X ~ K(mu, alpha, beta). # k_dist.ppf has a finite lower bound (a=0) so its bracket search is # well-defined, avoiding the exp-overflow problem in the default logk ppf. x = k_dist.ppf(q, mu, alpha, beta) return np.log(np.maximum(x, np.finfo(float).tiny)) def _rvs(self, mu, alpha, beta, size=None, random_state=None): # Compound Gamma: tau ~ Gamma(beta, mu/beta), X|tau ~ Gamma(alpha, tau/alpha) # Y = ln(X), avoids the ppf -> k_dist.ppf -> quad-CDF chain. tau = random_state.gamma(beta, mu / beta, size=size) x = random_state.gamma(alpha, tau / alpha) return np.log(np.maximum(x, np.finfo(float).tiny)) def _sf(self, y, mu, alpha, beta): y_clipped = np.minimum(y, 709.0) return k_dist.sf(np.exp(y_clipped), mu, alpha, beta) def _fitstart(self, data, args=None): # Symmetric method-of-moments starting point using the first two cumulants: # E[Y] = ln(mu) - ln(alpha) - ln(beta) + psi(alpha) + psi(beta) # Var[Y] = psi_1(alpha) + psi_1(beta) # Symmetric start (alpha=beta): psi_1(a) ≈ 1/a => a ≈ 2/Var mean_d = float(np.mean(data)) var_d = max(float(np.var(data)), 1e-6) alpha0 = float(np.clip(2.0 / var_d, 0.5, 50.0)) beta0 = alpha0 mu0 = float(np.exp( mean_d + np.log(alpha0) + np.log(beta0) - sc.digamma(alpha0) - sc.digamma(beta0) )) return (mu0, alpha0, beta0, 0.0, 1.0) def fit(self, data, *args, **kwds): """MLE fit with bounded shape parameters; loc and scale are free by default. Optimises in log-parameter space via L-BFGS-B to keep mu, alpha, beta and scale strictly positive. The K-distribution log-likelihood is unbounded when alpha or beta grows without limit, so the search is capped at _MAX. Three starting points are tried — one symmetric (alpha=beta from variance matching) and two asymmetric (one with alpha>>beta and its mirror) — to escape the symmetric local maximum when the data is skewed. loc/scale can be pinned by passing floc=0, fscale=1 as keyword args. """ from scipy.optimize import minimize, brentq # Respect user-supplied fixed values; None means "free to optimise". floc = kwds.pop('floc', None) fscale = kwds.pop('fscale', None) for k in ('loc', 'scale'): kwds.pop(k, None) data = np.asarray(data, dtype=float).ravel() # Upper bound: prevents the degenerate alpha→∞ (or beta→∞) regime. _MAX = 50.0 # ── symmetric starting point ────────────────────────────────────────── start = self._fitstart(data) mu0 = max(float(args[0]) if len(args) > 0 else start[0], 1e-12) alpha0 = float(np.clip(args[1] if len(args) > 1 else start[1], 0.01, _MAX)) beta0 = float(np.clip(args[2] if len(args) > 2 else start[2], 0.01, _MAX)) # ── asymmetric starting point (all variance in beta, large alpha) ───── mean_d = float(np.mean(data)) var_d = max(float(np.var(data)), 1e-6) try: beta_asym = float(brentq( lambda b: sc.polygamma(1, b) - var_d, 0.05, 500.0, xtol=1e-8 )) except Exception: beta_asym = beta0 alpha_asym = float(np.clip(5.0 * beta_asym, 1.0, _MAX)) beta_asym = float(np.clip(beta_asym, 0.01, _MAX)) mu_asym = float(np.exp( mean_d + np.log(alpha_asym) + np.log(beta_asym) - sc.digamma(alpha_asym) - sc.digamma(beta_asym) )) # Starting values for loc / scale (used when free) loc0 = float(floc) if floc is not None else 0.0 scale0 = float(fscale) if fscale is not None else 1.0 # ── parameter packing ───────────────────────────────────────────────── # Vector layout: [log(mu), log(alpha), log(beta), loc?, log(scale)?] # Slots for loc/scale are only present when they are free. def pack(mu, alpha, beta): v = [np.log(mu), np.log(alpha), np.log(beta)] if floc is None: v.append(loc0) if fscale is None: v.append(np.log(scale0)) return v def unpack(v): mu = np.exp(v[0]); alpha = np.exp(v[1]); beta = np.exp(v[2]) idx = 3 if floc is None: loc = float(v[idx]); idx += 1 else: loc = float(floc) scale = float(np.exp(v[idx])) if fscale is None else float(fscale) return mu, alpha, beta, loc, scale bounds = [ (None, None), # log(mu) (np.log(0.01), np.log(_MAX)), # log(alpha) (np.log(0.01), np.log(_MAX)), # log(beta) ] if floc is None: bounds.append((None, None)) # loc if fscale is None: bounds.append((np.log(1e-4), np.log(100.0))) # log(scale) def neg_ll(v): mu, alpha, beta, loc, scale = unpack(v) with np.errstate(all='ignore'): ll = np.sum(self.logpdf(data, mu, alpha, beta, loc=loc, scale=scale)) return -ll if np.isfinite(ll) else 1e15 # Candidate starting vectors: symmetric + two asymmetric (α>>β and β>>α) x0_candidates = [ pack(mu0, alpha0, beta0), pack(mu_asym, alpha_asym, beta_asym), pack(mu_asym, beta_asym, alpha_asym), ] best_res = None best_nll = np.inf with np.errstate(all='ignore'): for x0 in x0_candidates: x0_safe = list(x0) x0_safe[1] = float(np.clip(x0_safe[1], bounds[1][0], bounds[1][1])) x0_safe[2] = float(np.clip(x0_safe[2], bounds[2][0], bounds[2][1])) res = minimize( neg_ll, x0=x0_safe, method='L-BFGS-B', bounds=bounds, options={'ftol': 1e-12, 'gtol': 1e-8, 'maxiter': 2000}, ) if res.fun < best_nll: best_nll = res.fun best_res = res mu_hat, alpha_hat, beta_hat, loc_hat, scale_hat = unpack(best_res.x) return (mu_hat, alpha_hat, beta_hat, loc_hat, scale_hat) logk = logk_gen(name="logk", shapes="mu, alpha, beta") class lognakagami_gen(rv_continuous): """Log-Nakagami continuous random variable. The probability density function is: f(y; m, Omega) = 2*m^m / (Gamma(m)*Omega^m) * exp(2*m*y) * exp(-(m/Omega)*exp(2*y)) for y in (-inf, +inf), m >= 0.5, Omega > 0. Derived via the change of variable Y = ln X on a Nakagami(m, Omega) variate; the Jacobian e^y combines with x^(2m-1) to give the exp(2*m*y) term. The mean and variance are: E[Y] = 0.5 * (psi(m) - ln(m) + ln(Omega)) Var[Y] = 0.25 * psi_1(m) where psi and psi_1 are the digamma and trigamma functions. The spread Omega shifts the mean by 0.5*ln(Omega) but leaves all higher moments unchanged; entropy is also Omega-independent. `lognakagami` takes ``m`` and ``Omega`` as shape parameters. """ def _shape_info(self): return [ _ShapeInfo("m", False, (0, np.inf), (False, False)), _ShapeInfo("Omega", False, (0, np.inf), (False, False)), ] def _argcheck(self, m, Omega): return (m >= 0.5) & (Omega > 0) def _pdf(self, y, m, Omega): return np.exp(self._logpdf(y, m, Omega)) def _logpdf(self, y, m, Omega): # ln f = ln2 + m*ln(m) - ln Γ(m) - m*ln(Ω) + 2m*y - (m/Ω)*exp(2y) return (np.log(2.0) + sc.xlogy(m, m) - sc.gammaln(m) - m * np.log(Omega) + 2.0 * m * y - (m / Omega) * np.exp(2.0 * y)) def _cdf(self, y, m, Omega): # P(Y <= y) = P(X <= e^y) = gammainc(m, (m/Omega) * e^{2y}) return sc.gammainc(m, (m / Omega) * np.exp(2.0 * y)) def _sf(self, y, m, Omega): return sc.gammaincc(m, (m / Omega) * np.exp(2.0 * y)) def _logcdf(self, y, m, Omega): return np.log(sc.gammainc(m, (m / Omega) * np.exp(2.0 * y))) def _logsf(self, y, m, Omega): return np.log(sc.gammaincc(m, (m / Omega) * np.exp(2.0 * y))) def _ppf(self, q, m, Omega): # q = gammainc(m, (m/Omega)*e^{2y}) # => (m/Omega)*e^{2y} = gammaincinv(m, q) # => y = 0.5 * ln(Omega * gammaincinv(m, q) / m) return 0.5 * np.log(Omega * sc.gammaincinv(m, q) / m) def _isf(self, q, m, Omega): return 0.5 * np.log(Omega * sc.gammainccinv(m, q) / m) def _stats(self, m, Omega): # E[Y] = 0.5*(psi(m) - ln(m) + ln(Omega)) mu = 0.5 * (sc.digamma(m) - np.log(m) + np.log(Omega)) # Var[Y] = 0.25 * psi_1(m) (Omega cancels — pure shift) mu2 = 0.25 * sc.polygamma(1, m) g1 = sc.polygamma(2, m) / sc.polygamma(1, m) ** 1.5 g2 = sc.polygamma(3, m) / sc.polygamma(1, m) ** 2 return mu, mu2, g1, g2 def _entropy(self, m, Omega): # H = -E[ln f(Y)] = -ln2 + ln Γ(m) - m*psi(m) + m # Omega cancels exactly (translation invariance of differential entropy) return -np.log(2.0) + sc.gammaln(m) - m * sc.digamma(m) + m def _rvs(self, m, Omega, size=None, random_state=None): # X ~ Nakagami(m, Omega) => X^2 ~ Gamma(m, Omega/m) g = random_state.gamma(m, Omega / m, size=size) return 0.5 * np.log(g) lognakagami = lognakagami_gen(name='lognakagami', shapes="m, Omega") class loggamma_gen(rv_continuous): """Log-Gamma continuous random variable. The probability density function is: f(y; a) = 1 / Gamma(a) * exp(a*y - exp(y)) for y in (-inf, +inf), a > 0. Derived via the change of variable Y = ln X on a Gamma(a, 1) variate; the Jacobian e^y combines with x^(a-1) to give the exp(a*y) term. The mean and variance are: E[Y] = psi(a) Var[Y] = psi_1(a) where psi and psi_1 are the digamma and trigamma functions. The non-unit scale beta is recovered through scipy's ``loc`` parameter: loc = ln(beta), since ln(beta * X) = ln(beta) + ln(X). `loggamma_dist` takes ``a`` as a shape parameter. """ def _shape_info(self): return [_ShapeInfo("a", False, (0, np.inf), (False, False))] def _argcheck(self, a): return a > 0 def _pdf(self, y, a): return np.exp(self._logpdf(y, a)) def _logpdf(self, y, a): # ln f = a*y - e^y - ln Γ(a) return a * y - np.exp(y) - sc.gammaln(a) def _cdf(self, y, a): # P(Y <= y) = P(X <= e^y) = gammainc(a, e^y) return sc.gammainc(a, np.exp(y)) def _sf(self, y, a): return sc.gammaincc(a, np.exp(y)) def _logcdf(self, y, a): return np.log(sc.gammainc(a, np.exp(y))) def _logsf(self, y, a): return np.log(sc.gammaincc(a, np.exp(y))) def _ppf(self, q, a): # Invert CDF: q = gammainc(a, e^y) # => e^y = gammaincinv(a, q) # => y = ln(gammaincinv(a, q)) return np.log(sc.gammaincinv(a, q)) def _isf(self, q, a): return np.log(sc.gammainccinv(a, q)) def _stats(self, a): # Y = ln(X), X ~ Gamma(a, 1) # E[Y] = psi(a) (digamma) # Var[Y] = psi_1(a) (trigamma) # skew = psi_2(a) / psi_1(a)^{3/2} # excess kurtosis = psi_3(a) / psi_1(a)^2 mu = sc.digamma(a) mu2 = sc.polygamma(1, a) g1 = sc.polygamma(2, a) / mu2 ** 1.5 g2 = sc.polygamma(3, a) / mu2 ** 2 return mu, mu2, g1, g2 def _entropy(self, a): # H(Y) = -E[ln f(Y)] = -E[a*Y - e^Y - ln Γ(a)] # = ln Γ(a) - a*psi(a) + E[e^Y] # E[e^Y] = E[X] = a (for Gamma(a, 1)) return sc.gammaln(a) - a * sc.digamma(a) + a def _rvs(self, a, size=None, random_state=None): # Generate Gamma(a, 1) variates, then take log. # For small a, use Gamma(a) ~ Gamma(a+1) * U^(1/a) # so ln Gamma(a) ~ ln Gamma(a+1) + ln(U)/a # This avoids precision loss when a << 1. return (np.log(random_state.gamma(a + 1, size=size)) + np.log(random_state.uniform(size=size)) / a) loggamma_dist = loggamma_gen(name='loggamma_dist') class lograyleigh_gen(rv_continuous): """Log-Rayleigh continuous random variable. The probability density function is: f(y; sigma) = (e^{2y} / sigma^2) * exp(-e^{2y} / (2*sigma^2)) for y in (-inf, +inf), sigma > 0. Derived via the change of variable Y = ln X on a Rayleigh(sigma) variate; the Jacobian e^y combines with x to give the e^{2y} term. The mean and variance are: E[Y] = 0.5 * (ln(2*sigma^2) + psi(1)) Var[Y] = pi^2 / 24 where psi is the digamma function. The scale sigma shifts the mean by ln(sigma) but leaves all higher central moments unchanged; entropy is also sigma-independent. `lograyleigh` takes ``sigma`` as a shape parameter. """ def _shape_info(self): return [_ShapeInfo("sigma", False, (0, np.inf), (False, False))] def _argcheck(self, sigma): return sigma > 0 def _pdf(self, y, sigma): return np.exp(self._logpdf(y, sigma)) def _logpdf(self, y, sigma): # ln f = 2y - 2*ln(sigma) - e^{2y} / (2*sigma^2) return 2.0 * y - 2.0 * np.log(sigma) - np.exp(2.0 * y) / (2.0 * sigma**2) def _cdf(self, y, sigma): return 1.0 - np.exp(-np.exp(2.0 * y) / (2.0 * sigma**2)) def _sf(self, y, sigma): return np.exp(-np.exp(2.0 * y) / (2.0 * sigma**2)) def _logcdf(self, y, sigma): return np.log1p(-np.exp(-np.exp(2.0 * y) / (2.0 * sigma**2))) def _logsf(self, y, sigma): return -np.exp(2.0 * y) / (2.0 * sigma**2) def _ppf(self, q, sigma): # q = 1 - exp(-e^{2y}/(2*sigma^2)) # => e^{2y} = -2*sigma^2 * ln(1-q) # => y = 0.5 * ln(-2*sigma^2 * ln(1-q)) return 0.5 * np.log(-2.0 * sigma**2 * np.log1p(-q)) def _isf(self, q, sigma): # q = exp(-e^{2y}/(2*sigma^2)) # => y = 0.5 * ln(-2*sigma^2 * ln(q)) return 0.5 * np.log(-2.0 * sigma**2 * np.log(q)) def _stats(self, sigma): # Y = 0.5*ln(2) + 0.5*ln(W) + ln(sigma), W ~ Exp(1) = Gamma(1, 1) # E[Y] = 0.5*(ln(2*sigma^2) + psi(1)) # Var[Y] = 0.25*psi_1(1) = pi^2/24 # Standardised skewness/kurtosis are sigma-independent (pure shift) mu = 0.5 * (np.log(2.0 * sigma**2) + sc.digamma(1)) mu2 = 0.25 * sc.polygamma(1, 1) g1 = sc.polygamma(2, 1) / sc.polygamma(1, 1)**1.5 g2 = sc.polygamma(3, 1) / sc.polygamma(1, 1)**2 return mu, mu2, g1, g2 def _entropy(self, sigma): # H(Y) = 1 - ln(2) - psi(1) (sigma-independent; derived via H(X) - E[ln X]) return 1.0 - np.log(2.0) - sc.digamma(1) def _rvs(self, sigma, size=None, random_state=None): # X ~ Rayleigh(sigma) = sigma*sqrt(-2*ln(U)), U ~ Uniform(0,1) u = random_state.uniform(size=size) return np.log(sigma) + 0.5 * np.log(-2.0 * np.log(u)) lograyleigh = lograyleigh_gen(name='lograyleigh', shapes="sigma") class logrice_gen(rv_continuous): """Log-Rice continuous random variable. The probability density function is: f(y; nu, sigma) = (e^{2y} / sigma^2) * exp(-(e^{2y} + nu^2) / (2*sigma^2)) * I_0(e^y * nu / sigma^2) for y in (-inf, +inf), nu >= 0, sigma > 0. Derived via the change of variable Y = ln X on a Rice(nu, sigma) variate; the Jacobian e^y combines with x to give the e^{2y} factor. I_0 is the modified Bessel function of the first kind of order zero. The CDF involves the Marcum Q-function of order 1: F(y; nu, sigma) = 1 - Q_1(nu/sigma, e^y/sigma) When nu = 0, the distribution reduces to Log-Rayleigh. `logrice` takes ``nu`` and ``sigma`` as shape parameters. """ def _shape_info(self): return [ _ShapeInfo("nu", False, (0, np.inf), (True, False)), _ShapeInfo("sigma", False, (0, np.inf), (False, False)), ] def _argcheck(self, nu, sigma): return (nu >= 0) & (sigma > 0) def _pdf(self, y, nu, sigma): return np.exp(self._logpdf(y, nu, sigma)) def _logpdf(self, y, nu, sigma): # ln f = 2y - 2*ln(sigma) - (e^{2y} + nu^2)/(2*sigma^2) + ln(I_0(z)) # Use ive(0, z)*exp(z) = I_0(z) for stability: ln(I_0(z)) = ln(ive(0,z)) + z z = np.exp(y) * nu / sigma**2 return (2.0 * y - 2.0 * np.log(sigma) - (np.exp(2.0 * y) + nu**2) / (2.0 * sigma**2) + np.log(sc.ive(0, z)) + z) def _cdf(self, y, nu, sigma): # P(Y <= y) = 1 - Q_1(nu/sigma, e^y/sigma) = ncx2.cdf(e^{2y}/sigma^2, 2, nu^2/sigma^2) return ncx2.cdf(np.exp(2.0 * y) / sigma**2, 2, (nu / sigma)**2) def _sf(self, y, nu, sigma): return ncx2.sf(np.exp(2.0 * y) / sigma**2, 2, (nu / sigma)**2) def _logcdf(self, y, nu, sigma): return ncx2.logcdf(np.exp(2.0 * y) / sigma**2, 2, (nu / sigma)**2) def _logsf(self, y, nu, sigma): return ncx2.logsf(np.exp(2.0 * y) / sigma**2, 2, (nu / sigma)**2) def _rvs(self, nu, sigma, size=None, random_state=None): # X ~ Rice(nu, sigma): norm of 2D Gaussian with mean (nu, 0) and std sigma z1 = random_state.normal(nu, sigma, size=size) z2 = random_state.normal(0.0, sigma, size=size) return np.log(np.hypot(z1, z2)) logrice = logrice_gen(name='logrice', shapes="nu, sigma") class logweibull_gen(rv_continuous): """Log-Weibull continuous random variable. Y = ln(X) where X ~ Weibull(k, lam). The PDF is: f(y; k, lam) = (k/lam) * (e^y/lam)^(k-1) * exp(-(e^y/lam)^k) * e^y for y in (-inf, +inf), k > 0, lam > 0. Since Y = ln(lam) + (1/k)*ln(W) where W ~ Exp(1), the moments are: E[Y] = ln(lam) + psi(1)/k Var[Y] = psi_1(1) / k^2 Skewness and excess kurtosis are k-independent constants (the 1/k scaling cancels in the standardised moments) given by psi_2(1)/psi_1(1)^(3/2) and psi_3(1)/psi_1(1)^2 respectively. The differential entropy is lam-independent: H(Y) = 1 - psi(1) - ln(k) """ def _shape_info(self): return [ _ShapeInfo("k", False, (0, np.inf), (False, False)), _ShapeInfo("lam", False, (0, np.inf), (False, False)), ] def _argcheck(self, k, lam): return (k > 0) & (lam > 0) def _pdf(self, y, k, lam): return np.exp(self._logpdf(y, k, lam)) def _logpdf(self, y, k, lam): # ln f = ln(k) + k*(y - ln(lam)) - (e^y/lam)^k return np.log(k) + k * (y - np.log(lam)) - (np.exp(y) / lam) ** k def _cdf(self, y, k, lam): return 1.0 - np.exp(-((np.exp(y) / lam) ** k)) def _sf(self, y, k, lam): return np.exp(-((np.exp(y) / lam) ** k)) def _logcdf(self, y, k, lam): return np.log1p(-np.exp(-((np.exp(y) / lam) ** k))) def _logsf(self, y, k, lam): return -((np.exp(y) / lam) ** k) def _ppf(self, q, k, lam): # q = 1 - exp(-(e^y/lam)^k) => y = ln(lam) + ln(-log1p(-q)) / k return np.log(lam) + np.log(-np.log1p(-q)) / k def _isf(self, q, k, lam): # q = exp(-(e^y/lam)^k) => y = ln(lam) + ln(-ln(q)) / k return np.log(lam) + np.log(-np.log(q)) / k def _stats(self, k, lam): # Y = ln(lam) + (1/k)*ln(W), W ~ Exp(1) = Gamma(1, 1) mu = np.log(lam) + sc.digamma(1) / k mu2 = sc.polygamma(1, 1) / k ** 2 g1 = sc.polygamma(2, 1) / sc.polygamma(1, 1) ** 1.5 g2 = sc.polygamma(3, 1) / sc.polygamma(1, 1) ** 2 return mu, mu2, g1, g2 def _entropy(self, k, lam): # H(Y) = 1 - psi(1) - ln(k) (lam-independent: pure location shift) return 1.0 - sc.digamma(1) - np.log(k) def _rvs(self, k, lam, size=None, random_state=None): # X = lam * W^(1/k), W ~ Exp(1) => Y = ln(lam) + ln(W)/k return np.log(lam) + np.log(random_state.exponential(size=size)) / k logweibull = logweibull_gen(name="logweibull", shapes="k, lam") class logricegamma_gen(rv_continuous): """Log-RiceGamma continuous random variable. Y = ln(X) where X has the Rice-Gamma PDF: f(x; alpha, beta, K) = 4*x*(1+K)*exp(-K) / (Gamma(alpha)*beta^alpha) * sum_{n=0}^inf [K*(1+K)]^n / (n!)^2 * x^{2n} * [beta*(1+K)*x^2]^{(alpha-1-n)/2} * K_{alpha-1-n}(2*x*sqrt((1+K)/beta)) for x > 0, giving Y = ln(X) with support on all of R. Parameters ---------- alpha : float > 0 Shape of the Gamma texture distribution. beta : float > 0 Scale of the Gamma texture (E[Omega] = alpha*beta). K : float >= 0 Rice K-factor (specular-to-diffuse power ratio). K=0 recovers the Rayleigh-Gamma (K-distribution) log-envelope. The compound representation is: Omega ~ Gamma(alpha, beta), then X|Omega ~ Rice(nu, sigma) with sigma^2 = Omega/(2*(1+K)) and nu = sqrt(K*Omega/(1+K)). """ def _shape_info(self): return [ _ShapeInfo("alpha", False, (0, np.inf), (False, False)), _ShapeInfo("beta", False, (0, np.inf), (False, False)), _ShapeInfo("K", False, (0, np.inf), (True, False)), ] def _argcheck(self, alpha, beta, K): return (alpha > 0) & (beta > 0) & (K >= 0) @staticmethod def _log_kve(v, z): """log(kve(v, z)) = log(K_v(z)) + z, numerically stable.""" v = np.asarray(v, dtype=float) z = np.asarray(z, dtype=float) abs_v = np.abs(v) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): kve_val = sc.kve(v, z) z_safe = np.where(z > 0, z, 1.0) mu_v = 4.0 * v ** 2 log1p_arg = np.maximum( (mu_v - 1.0) / (8.0 * z_safe) + (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2), -1.0 + 1e-15, ) log_asymp_largez = ( 0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe)) + np.log1p(log1p_arg) ) # log(kve(v,z)) ≈ gammaln(|v|) + |v|*log(2/z) - log(2) + z for |v| >> z log_asymp_largev = ( sc.gammaln(np.maximum(abs_v, 1e-300)) + abs_v * np.log(2.0 / z_safe) - np.log(2.0) + z ) with np.errstate(divide='ignore', invalid='ignore'): kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0) use_largev = (abs_v > z_safe + 2.0) & kve_bad log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez) return np.where(kve_bad, log_asymp, np.log(kve_val)) def _pdf(self, y, alpha, beta, K): return np.exp(self._logpdf(y, alpha, beta, K)) #: Number of terms kept in the truncated series for _logpdf. #: Increase for large K-factors (rule of thumb: N_SERIES > 3*K + 30). N_SERIES: int = 90 def _logpdf(self, y, alpha, beta, K): y = np.asarray(y, dtype=float) x = np.exp(y) z = 2.0 * x * np.sqrt((1.0 + K) / beta) # log-prefactor: log(4 x^2 (1+K) e^{-K} / (Γ(α) β^α)), absorbs exp(-z) # so each series term uses kve-scaled Bessel values log_pre = ( np.log(4.0) + 2.0 * y + np.log(1.0 + K) - K - sc.gammaln(alpha) - alpha * np.log(beta) - z ) log_b1Kx2 = np.log(beta) + np.log(1.0 + K) + 2.0 * y # log[beta*(1+K)*x^2] log1pK = np.log(1.0 + K) logK_safe = np.log(np.where(K > 0, K, 1.0)) log_terms = [] for n in range(self.N_SERIES + 1): # log(kve(alpha-1-n, z)) = log(K_{alpha-1-n}(z)) + z log_kve_n = logricegamma_gen._log_kve(alpha - 1.0 - n, z) # K^n: for K=0 only the n=0 term survives (0^0 = 1) n_logK = 0.0 if n == 0 else np.where(K > 0, n * logK_safe, -np.inf) log_Tn = ( n_logK + n * log1pK - 2.0 * float(sc.gammaln(n + 1)) + 2.0 * n * y # x^{2n} factor + (alpha - 1.0 - n) / 2.0 * log_b1Kx2 + log_kve_n ) log_terms.append(log_Tn) log_arr = np.stack(log_terms, axis=0) # shape (N_MAX+1, *y.shape) max_lt = np.max(log_arr, axis=0) # shape (*y.shape) with np.errstate(invalid='ignore'): shifted = log_arr - max_lt[np.newaxis, ...] log_sum = max_lt + np.log(np.sum(np.exp(shifted), axis=0)) return log_pre + log_sum def _fitstart(self, data, args=None): # loc near the data mean; alpha/beta in (0.5, 1) for typical heavy-tail # radar clutter; K=1 moderate Rice factor; scale near data std. mean_d = float(np.mean(data)) std_d = float(np.std(data)) return (0.75, 0.75, 3.0, mean_d, max(std_d * 0.5, 0.1)) def _rvs(self, alpha, beta, K, size=None, random_state=None): # Compound sampler: Omega ~ Gamma(alpha, beta), X|Omega ~ Rice(nu, sigma) Omega = random_state.gamma(alpha, beta, size=size) sigma = np.sqrt(Omega / (2.0 * (1.0 + K))) nu = np.sqrt(K * Omega / (1.0 + K)) z1 = random_state.normal(nu, sigma, size=size) z2 = random_state.normal(0.0, sigma, size=size) return np.log(np.hypot(z1, z2)) def _cdf(self, y, alpha, beta, K): # Compound representation: # P(Y <= y) = E_Omega[ P(X <= e^y | Omega) ] # Given Omega, sigma^2 = Omega/(2(1+K)), nu^2 = K*Omega/(1+K): # P(X <= x | Omega) = ncx2.cdf(x^2/sigma^2, 2, nu^2/sigma^2) # = ncx2.cdf(2(1+K)e^{2y}/Omega, 2, 2K) # Non-centrality 2K is Omega-independent. Integrate over Omega ~ Gamma(alpha, beta) # by substituting s = Omega/beta (so s ~ Gamma(alpha, 1)) and applying # generalized Gauss-Laguerre quadrature of order alpha-1. from scipy.special import roots_genlaguerre from scipy.special import gamma as sp_gamma y = np.asarray(y, dtype=float) # Parameters are broadcast to y's shape by scipy; take the unique scalar. alpha_s = float(np.ravel(alpha)[0]) beta_s = float(np.ravel(beta)[0]) K_s = float(np.ravel(K)[0]) _N_PTS = 50 nodes, weights = roots_genlaguerre(_N_PTS, alpha_s - 1.0) # ncx2_arg shape: (*y.shape, N_PTS) ncx2_arg = ( 2.0 * (1.0 + K_s) * np.exp(2.0 * y)[..., np.newaxis] / (beta_s * nodes) ) cdf_vals = ncx2.cdf(ncx2_arg, 2.0, 2.0 * K_s) return np.dot(cdf_vals, weights) / sp_gamma(alpha_s) logricegamma = logricegamma_gen(name='logricegamma', shapes='alpha, beta, K') class ricegamma_gen(rv_continuous): """RiceGamma continuous random variable (linear-domain envelope). The probability density function is: f(x; alpha, beta, K) = 4*x*(1+K)*exp(-K) / (Gamma(alpha)*beta^alpha) * sum_{n=0}^inf [K*(1+K)]^n / (n!)^2 * x^{2n} * [beta*(1+K)*x^2]^{(alpha-1-n)/2} * K_{alpha-1-n}(2*x*sqrt((1+K)/beta)) for x > 0, alpha > 0, beta > 0, K >= 0. This is the linear-domain counterpart of logricegamma: if X ~ ricegamma then ln(X) ~ logricegamma (same alpha, beta, K, loc=0, scale=1). The compound representation is: Omega ~ Gamma(alpha, beta), then X|Omega ~ Rice(nu, sigma) with sigma^2 = Omega/(2*(1+K)) and nu = sqrt(K*Omega/(1+K)). E[X^2] = alpha*beta. """ #: Number of series terms. Increase for large K (rule: N_SERIES > 3*K + 30). N_SERIES: int = 90 def _shape_info(self): return [ _ShapeInfo("alpha", False, (0, np.inf), (False, False)), _ShapeInfo("beta", False, (0, np.inf), (False, False)), _ShapeInfo("K", False, (0, np.inf), (True, False)), ] def _argcheck(self, alpha, beta, K): return (alpha > 0) & (beta > 0) & (K >= 0) @staticmethod def _log_kve(v, z): """log(kve(v, z)) = log(K_v(z)) + z, numerically stable.""" v = np.asarray(v, dtype=float) z = np.asarray(z, dtype=float) abs_v = np.abs(v) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): kve_val = sc.kve(v, z) z_safe = np.where(z > 0, z, 1.0) mu_v = 4.0 * v ** 2 log1p_arg = np.maximum( (mu_v - 1.0) / (8.0 * z_safe) + (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * z_safe ** 2), -1.0 + 1e-15, ) log_asymp_largez = ( 0.5 * (np.log(np.pi) - np.log(2.0) - np.log(z_safe)) + np.log1p(log1p_arg) ) log_asymp_largev = ( sc.gammaln(np.maximum(abs_v, 1e-300)) + abs_v * np.log(2.0 / z_safe) - np.log(2.0) + z ) with np.errstate(divide='ignore', invalid='ignore'): kve_bad = ~np.isfinite(kve_val) | (kve_val <= 0) use_largev = (abs_v > z_safe + 2.0) & kve_bad log_asymp = np.where(use_largev, log_asymp_largev, log_asymp_largez) return np.where(kve_bad, log_asymp, np.log(kve_val)) def _pdf(self, x, alpha, beta, K): return np.exp(self._logpdf(x, alpha, beta, K)) def _logpdf(self, x, alpha, beta, K): x = np.asarray(x, dtype=float) z = 2.0 * x * np.sqrt((1.0 + K) / beta) # log-prefactor: log(4 x (1+K) e^{-K} / (Γ(α) β^α)), absorbs exp(-z) log_pre = ( np.log(4.0) + np.log(x) + np.log(1.0 + K) - K - sc.gammaln(alpha) - alpha * np.log(beta) - z ) log_b1Kx2 = np.log(beta) + np.log(1.0 + K) + 2.0 * np.log(x) log1pK = np.log(1.0 + K) logK_safe = np.log(np.where(K > 0, K, 1.0)) log_terms = [] for n in range(self.N_SERIES + 1): log_kve_n = ricegamma_gen._log_kve(alpha - 1.0 - n, z) n_logK = 0.0 if n == 0 else np.where(K > 0, n * logK_safe, -np.inf) log_Tn = ( n_logK + n * log1pK - 2.0 * float(sc.gammaln(n + 1)) + 2.0 * n * np.log(x) # x^{2n} factor + (alpha - 1.0 - n) / 2.0 * log_b1Kx2 + log_kve_n ) log_terms.append(log_Tn) log_arr = np.stack(log_terms, axis=0) max_lt = np.max(log_arr, axis=0) with np.errstate(invalid='ignore'): shifted = log_arr - max_lt[np.newaxis, ...] log_sum = max_lt + np.log(np.sum(np.exp(shifted), axis=0)) return log_pre + log_sum def _cdf(self, x, alpha, beta, K): # P(X <= x | Omega) = ncx2.cdf(2*(1+K)*x^2/Omega, 2, 2K) # Integrate out Omega ~ Gamma(alpha, beta) via generalised Gauss-Laguerre. from scipy.special import roots_genlaguerre from scipy.special import gamma as sp_gamma x = np.asarray(x, dtype=float) alpha_s = float(np.ravel(alpha)[0]) beta_s = float(np.ravel(beta)[0]) K_s = float(np.ravel(K)[0]) nodes, weights = roots_genlaguerre(50, alpha_s - 1.0) ncx2_arg = ( 2.0 * (1.0 + K_s) * (x[..., np.newaxis] ** 2) / (beta_s * nodes) ) cdf_vals = ncx2.cdf(ncx2_arg, 2.0, 2.0 * K_s) return np.dot(cdf_vals, weights) / sp_gamma(alpha_s) def _rvs(self, alpha, beta, K, size=None, random_state=None): # Compound sampler: Omega ~ Gamma(alpha, beta), X|Omega ~ Rice(nu, sigma) Omega = random_state.gamma(alpha, beta, size=size) sigma = np.sqrt(Omega / (2.0 * (1.0 + K))) nu = np.sqrt(K * Omega / (1.0 + K)) z1 = random_state.normal(nu, sigma, size=size) z2 = random_state.normal(0.0, sigma, size=size) return np.hypot(z1, z2) def _fitstart(self, data, args=None): # Shapes at scipy's default (1.0); loc = data mean; scale = data std. if args is None: args = (1.0,) * self.numargs return args + (float(np.mean(data)), float(np.std(data))) ricegamma = ricegamma_gen(a=0.0, name='ricegamma', shapes='alpha, beta, K')