diff --git a/etc/tests/test_distributions.py b/etc/tests/test_distributions.py index 884e8f2..e91005b 100644 --- a/etc/tests/test_distributions.py +++ b/etc/tests/test_distributions.py @@ -7,7 +7,7 @@ import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) -from tools.distributions import k_dist, lognakagami, loggamma_dist, lograyleigh, logrice +from tools.distributions import k_dist, logk, lognakagami, loggamma_dist, lograyleigh, logrice X = np.linspace(0.01, 10.0, 500) @@ -73,6 +73,48 @@ class TestKDistPdf: pdf_ba = k_dist.pdf(x_test, mu=1.0, alpha=3.0, beta=2.0) np.testing.assert_allclose(pdf_ab, pdf_ba, rtol=1e-6) + def test_stats_mean_equals_mu(self): + """Analytical mean must equal mu.""" + for mu in [0.5, 1.0, 3.0]: + dist_mean = float(k_dist.stats(mu=mu, alpha=2.0, beta=3.0, moments="m")) + assert pytest.approx(dist_mean, rel=1e-10) == mu + + def test_stats_variance_formula_eq4(self): + """Variance must equal mu^2*(alpha+beta+1)/(alpha*beta) (equation 4).""" + mu, alpha, beta = 2.0, 3.0, 2.0 + expected_var = mu**2 * (alpha + beta + 1) / (alpha * beta) + _, dist_var, *_ = k_dist.stats(mu=mu, alpha=alpha, beta=beta, moments="mv") + assert pytest.approx(float(dist_var), rel=1e-10) == expected_var + + def test_stats_variance_symmetric_in_alpha_beta(self): + """Variance is symmetric in alpha and beta.""" + mu = 2.0 + _, var_ab, *_ = k_dist.stats(mu=mu, alpha=2.0, beta=3.0, moments="mv") + _, var_ba, *_ = k_dist.stats(mu=mu, alpha=3.0, beta=2.0, moments="mv") + assert pytest.approx(float(var_ab), rel=1e-10) == float(var_ba) + + def test_stats_variance_numerical(self): + """Analytical variance should match sample variance from rvs.""" + mu, alpha, beta = 2.0, 2.0, 3.0 + rng = np.random.default_rng(42) + samples = k_dist.rvs(mu=mu, alpha=alpha, beta=beta, size=100_000, random_state=rng) + _, dist_var, *_ = k_dist.stats(mu=mu, alpha=alpha, beta=beta, moments="mv") + assert pytest.approx(float(np.var(samples)), rel=5e-2) == float(dist_var) + + def test_rvs_samples_are_positive_and_finite(self): + """K distribution samples must be positive and finite.""" + rng = np.random.default_rng(7) + samples = k_dist.rvs(mu=1.0, alpha=2.0, beta=2.0, size=500, random_state=rng) + assert np.all(samples > 0) + assert np.all(np.isfinite(samples)) + + def test_rvs_sample_mean_near_mu(self): + """Sample mean of k_dist rvs should be close to mu.""" + mu, alpha, beta = 2.0, 3.0, 2.0 + rng = np.random.default_rng(0) + samples = k_dist.rvs(mu=mu, alpha=alpha, beta=beta, size=100_000, random_state=rng) + assert pytest.approx(float(np.mean(samples)), rel=5e-2) == mu + # ── Parametric curve plots ─────────────────────────────────────────────────── @@ -514,6 +556,127 @@ class TestLogRice: assert pytest.approx(samples.mean(), rel=5e-2) == numerical_mean +# ── logk unit tests ─────────────────────────────────────────────────────────── + +Y_LOGK = np.linspace(-10.0, 10.0, 500) + + +class TestLogK: + def test_logpdf_is_finite_on_real_line(self): + """logpdf must be finite for all real y.""" + log_vals = logk.logpdf(Y_LOGK, mu=2.0, alpha=3.0, beta=2.0) + assert np.all(np.isfinite(log_vals)) + + def test_pdf_integrates_to_one(self): + """Numerical integral of PDF over the real line should be ≈ 1.""" + y_fine = np.linspace(-30, 20, 200_000) + integral = np.trapezoid(logk.pdf(y_fine, mu=2.0, alpha=3.0, beta=2.0), y_fine) + assert pytest.approx(integral, abs=1e-3) == 1.0 + + def test_logpdf_equals_log_pdf(self): + """logpdf must equal log(pdf) at points where pdf does not underflow.""" + y_bulk = np.linspace(-5.0, 5.0, 30) + log_via_pdf = np.log(logk.pdf(y_bulk, mu=2.0, alpha=3.0, beta=2.0)) + log_direct = logk.logpdf(y_bulk, mu=2.0, alpha=3.0, beta=2.0) + np.testing.assert_allclose(log_direct, log_via_pdf, rtol=1e-6) + + def test_log_transform_relation_to_k_dist(self): + """logk.pdf(y) must equal k_dist.pdf(exp(y)) * exp(y) (change-of-variable).""" + y_test = np.linspace(-3.0, 5.0, 20) + mu, alpha, beta = 2.0, 3.0, 2.0 + direct = logk.pdf(y_test, mu=mu, alpha=alpha, beta=beta) + via_k = k_dist.pdf(np.exp(y_test), mu=mu, alpha=alpha, beta=beta) * np.exp(y_test) + np.testing.assert_allclose(direct, via_k, rtol=1e-6) + + def test_cdf_consistent_with_k_dist(self): + """logk.cdf(y) must equal k_dist.cdf(exp(y)).""" + y_test = np.array([-2.0, 0.0, 1.0, 3.0]) + mu, alpha, beta = 2.0, 2.0, 3.0 + cdf_logk = logk.cdf(y_test, mu=mu, alpha=alpha, beta=beta) + cdf_k = k_dist.cdf(np.exp(y_test), mu=mu, alpha=alpha, beta=beta) + np.testing.assert_allclose(cdf_logk, cdf_k, rtol=1e-6) + + def test_cdf_is_monotone_increasing(self): + """CDF must be strictly non-decreasing (up to floating-point noise in the saturated tail).""" + y_grid = np.linspace(-5.0, 8.0, 30) + cdf_vals = logk.cdf(y_grid, mu=2.0, alpha=3.0, beta=2.0) + assert np.all(np.diff(cdf_vals) >= -1e-10) + + def test_stats_mean_analytical(self): + """Mean must equal ln(mu) - ln(alpha) - ln(beta) + psi(alpha) + psi(beta).""" + mu, alpha, beta = 2.0, 3.0, 2.0 + expected = np.log(mu) - np.log(alpha) - np.log(beta) + sc.digamma(alpha) + sc.digamma(beta) + dist_mean = float(logk.stats(mu=mu, alpha=alpha, beta=beta, moments="m")) + assert pytest.approx(dist_mean, rel=1e-10) == expected + + def test_stats_variance_analytical(self): + """Variance must equal psi_1(alpha) + psi_1(beta).""" + mu, alpha, beta = 2.0, 3.0, 2.0 + expected_var = sc.polygamma(1, alpha) + sc.polygamma(1, beta) + _, dist_var, *_ = logk.stats(mu=mu, alpha=alpha, beta=beta, moments="mv") + assert pytest.approx(float(dist_var), rel=1e-10) == expected_var + + def test_stats_variance_mu_independent(self): + """Variance must not depend on mu (mu is a pure shift in log-space).""" + alpha, beta = 3.0, 2.0 + _, var1, *_ = logk.stats(mu=1.0, alpha=alpha, beta=beta, moments="mv") + _, var4, *_ = logk.stats(mu=4.0, alpha=alpha, beta=beta, moments="mv") + assert pytest.approx(float(var1), rel=1e-10) == float(var4) + + def test_stats_mean_shifts_by_log_mu(self): + """Doubling mu shifts the mean by ln(2) and leaves variance unchanged.""" + alpha, beta = 3.0, 2.0 + mean1 = float(logk.stats(mu=1.0, alpha=alpha, beta=beta, moments="m")) + mean2 = float(logk.stats(mu=2.0, alpha=alpha, beta=beta, moments="m")) + assert pytest.approx(mean2 - mean1, rel=1e-10) == np.log(2.0) + + def test_argcheck_rejects_non_positive_mu(self): + """mu <= 0 must not produce a valid PDF value.""" + val = logk.pdf(0.0, mu=-1.0, alpha=2.0, beta=2.0) + assert not (np.isfinite(val) and val > 0) + + def test_argcheck_rejects_non_positive_alpha(self): + """alpha <= 0 must not produce a valid PDF value.""" + val = logk.pdf(0.0, mu=1.0, alpha=-1.0, beta=2.0) + assert not (np.isfinite(val) and val > 0) + + def test_argcheck_rejects_non_positive_beta(self): + """beta <= 0 must not produce a valid PDF value.""" + val = logk.pdf(0.0, mu=1.0, alpha=2.0, beta=-1.0) + assert not (np.isfinite(val) and val > 0) + + def test_rvs_samples_are_finite(self): + """Random samples must be finite real numbers.""" + rng = np.random.default_rng(42) + samples = logk.rvs(mu=2.0, alpha=3.0, beta=2.0, size=200, random_state=rng) + assert samples.shape == (200,) + assert np.all(np.isfinite(samples)) + + def test_rvs_sample_mean_near_analytical(self): + """Sample mean of many RVS should be close to the analytical mean.""" + mu, alpha, beta = 2.0, 3.0, 2.0 + rng = np.random.default_rng(0) + samples = logk.rvs(mu=mu, alpha=alpha, beta=beta, size=100_000, random_state=rng) + expected_mean = float(logk.stats(mu=mu, alpha=alpha, beta=beta, moments="m")) + assert pytest.approx(float(samples.mean()), rel=5e-2) == expected_mean + + def test_rvs_sample_variance_near_analytical(self): + """Sample variance of many RVS should be close to the analytical variance.""" + mu, alpha, beta = 2.0, 3.0, 2.0 + rng = np.random.default_rng(1) + samples = logk.rvs(mu=mu, alpha=alpha, beta=beta, size=100_000, random_state=rng) + _, expected_var, *_ = logk.stats(mu=mu, alpha=alpha, beta=beta, moments="mv") + assert pytest.approx(float(np.var(samples)), rel=5e-2) == float(expected_var) + + def test_symmetry_in_alpha_beta(self): + """logk PDF is symmetric in alpha and beta.""" + y_test = np.linspace(-3.0, 5.0, 20) + mu = 2.0 + pdf_ab = logk.pdf(y_test, mu=mu, alpha=2.0, beta=3.0) + pdf_ba = logk.pdf(y_test, mu=mu, alpha=3.0, beta=2.0) + np.testing.assert_allclose(pdf_ab, pdf_ba, rtol=1e-6) + + if __name__ == "__main__": plot_k_dist_varying_alpha() plot_k_dist_varying_mu() diff --git a/etc/tools/distributions.py b/etc/tools/distributions.py index 8c6c488..4881c6b 100644 --- a/etc/tools/distributions.py +++ b/etc/tools/distributions.py @@ -1,3 +1,164 @@ +""" +Custom continuous probability distributions for radar clutter modelling. + +All classes inherit from ``scipy.stats.rv_continuous``. After instantiation +each distribution object exposes the full scipy public API summarised below. +Parameters ``loc`` and ``scale`` shift and rescale the support; shape +parameters (e.g. *mu*, *alpha*, *beta*) are passed as keyword arguments. + +Public methods (from rv_continuous) +------------------------------------ +rvs(*args, loc=0, scale=1, size=1, random_state=None) + Draw random variates. + +pdf(x, *args, loc=0, scale=1) + Probability density function evaluated at *x*. + +logpdf(x, *args, loc=0, scale=1) + Natural logarithm of the PDF; numerically preferred when values are small. + +cdf(x, *args, loc=0, scale=1) + Cumulative distribution function: P(X <= x). + +logcdf(x, *args, loc=0, scale=1) + Log of the CDF; avoids underflow in the far left tail. + +sf(x, *args, loc=0, scale=1) + Survival function: 1 - CDF, i.e. P(X > x). + +logsf(x, *args, loc=0, scale=1) + Log of the survival function; avoids underflow in the far right tail. + +ppf(q, *args, loc=0, scale=1) + Percent-point function (quantile): inverse of the CDF. + +isf(q, *args, loc=0, scale=1) + Inverse survival function: inverse of sf. + +moment(order, *args, loc=0, scale=1) + Non-central raw moment of the specified integer order. + +stats(*args, loc=0, scale=1, moments='mv') + Summary statistics selected by *moments* string: 'm' mean, 'v' variance, + 's' skewness, 'k' excess kurtosis. + +entropy(*args, loc=0, scale=1) + Differential (Shannon) entropy of the distribution. + +expect(func, args, loc=0, scale=1, lb=None, ub=None, conditional=False) + Expected value of *func(x)* computed by numerical integration. + +median(*args, loc=0, scale=1) + Median of the distribution. + +mean(*args, loc=0, scale=1) + Mean of the distribution. + +std(*args, loc=0, scale=1) + Standard deviation of the distribution. + +var(*args, loc=0, scale=1) + Variance of the distribution. + +interval(confidence, *args, loc=0, scale=1) + Confidence interval with equal probability mass on each side of the median. + +__call__(*args, loc=0, scale=1) + Freeze the distribution — returns a frozen instance with fixed parameters, + so methods can be called without repeating shape arguments. + +fit(data, *args, **kwds) + Maximum-likelihood estimates of shape, loc, and scale from *data*. + +fit_loc_scale(data, *args) + Quick loc and scale estimates via method of moments (mean and variance). + +nnlf(theta, x) + Negative log-likelihood for parameter vector *theta* and observations *x*. + +support(*args, loc=0, scale=1) + Lower and upper endpoints of the distribution's support. + +Overrideable private methods +----------------------------- +Subclasses customise behaviour by implementing the private counterparts +listed below. When a private method is *not* overridden scipy falls back to +a default implementation — often a slower or less precise one. Override to +gain speed or numerical accuracy. + +_argcheck(*args) + Validate shape parameters; return a boolean array. + Default: always ``True``. Should be overridden to reject invalid values. + +_get_support(*args) + Return ``(lower, upper)`` endpoints of the support as a function of the + shape parameters. Default: returns the ``(a, b)`` constants passed at + construction time. + +_pdf(x, *args) + Core of the density. No default — must be implemented (or ``_logpdf``). + +_logpdf(x, *args) + Log-density. Default: ``log(_pdf(x))``, which loses precision when + ``_pdf`` underflows to zero. Override whenever a stable closed form + exists. + +_cdf(x, *args) + Core of the cumulative distribution function. Default: numerical + integration of ``_pdf`` from the lower support boundary — slow. + +_sf(x, *args) + Survival function P(X > x). Default: ``1 - _cdf(x)``, which loses + significant digits when ``_cdf(x)`` is close to 1 (far right tail). + Override with a direct formula whenever one exists. + +_logcdf(x, *args) + Log of the CDF. Default: ``log(_cdf(x))``. + +_logsf(x, *args) + Log of the survival function. Default: ``log(_sf(x))`` = ``log(1 - + _cdf(x))``, which is catastrophically inaccurate in the far right tail. + Override to avoid cancellation, e.g. via ``log1p(-_cdf(x))`` or a + complementary special function. + +_ppf(q, *args) + Percent-point (quantile) function. Default: numerical inversion of + ``_cdf`` — slow. Override with a closed-form inverse when available. + +_isf(q, *args) + Inverse survival function. Default: ``_ppf(1 - q)``, which loses + precision for small *q* (extreme quantiles). Override when the + complementary special function provides a direct inverse. + +_stats(*args, moments='mv') + Return ``(mean, variance, skewness, excess_kurtosis)``; use ``None`` for + moments you do not compute. Default: numerical integration — override + with analytical expressions for speed and accuracy. + +_munp(n, *args) + Non-central raw moment of integer order *n*. Default: numerical + integration. Implement when closed-form moments are available and + ``_stats`` is not sufficient. + +_entropy(*args) + Differential entropy. Default: numerical integration of ``-f log f``. + Override with a closed-form expression when one exists. + +_rvs(*args, size=None, random_state=None) + Random variate sampler. Default: CDF-inversion via ``_ppf`` — slow for + distributions without a closed-form quantile function. Override with + a direct simulation algorithm (e.g. a compound-distribution sampler). + +Numerical accuracy summary +~~~~~~~~~~~~~~~~~~~~~~~~~~ +Priority order for overriding to improve tail accuracy: + +1. ``_logsf`` / ``_logcdf`` — first line of defence against underflow. +2. ``_sf`` — avoids ``1 - cdf`` cancellation. +3. ``_isf`` — avoids ``ppf(1 - q)`` cancellation. + +""" + from scipy.stats import rv_continuous, ncx2 from scipy.special import kve, gammaln from scipy.integrate import quad @@ -88,6 +249,11 @@ class k_gen(rv_continuous): return val return np.vectorize(_scalar)(x, mu, alpha, beta) + def _rvs(self, mu, alpha, beta, size=None, random_state=None): + # Compound gamma: tau ~ Gamma(beta, mu/beta), X|tau ~ Gamma(alpha, tau/alpha) + tau = random_state.gamma(beta, mu / beta, size=size) + return random_state.gamma(alpha, tau / alpha) + def fit(self, data, *args, **kwds): if ("loc" in kwds and kwds["loc"] != 0.0) or ("floc" in kwds and kwds["floc"] != 0.0): raise ValueError("k_distribution uses a fixed loc=0; use mu to control the mean/scale.") @@ -102,6 +268,117 @@ class k_gen(rv_continuous): k_dist = k_gen(a=0.0, name="k_distribution", shapes="mu, alpha, beta") +class logk_gen(rv_continuous): + """Log-K continuous random variable. + + Y = ln(X) where X ~ K(mu, alpha, beta). The PDF is: + + f(y; mu, alpha, beta) = 2 / (Gamma(alpha) * Gamma(beta)) + * (alpha*beta/mu)^((alpha+beta)/2) + * exp((alpha+beta)/2 * y) + * K_{alpha-beta}(2*sqrt(alpha*beta*exp(y)/mu)) + + for y in (-inf, +inf), mu > 0, alpha > 0, beta > 0. + + The cumulants of Y follow from the CGF + K(t) = t*ln(mu/(alpha*beta)) + lnGamma(alpha+t) - lnGamma(alpha) + + lnGamma(beta+t) - lnGamma(beta), + giving: + E[Y] = ln(mu) - ln(alpha) - ln(beta) + psi(alpha) + psi(beta) + Var[Y] = psi_1(alpha) + psi_1(beta) + """ + + def _shape_info(self): + return [ + _ShapeInfo("mu", domain=(0, np.inf), inclusive=(False, True)), + _ShapeInfo("alpha", domain=(0, np.inf), inclusive=(True, True)), + _ShapeInfo("beta", domain=(0, np.inf), inclusive=(True, True)), + ] + + def _argcheck(self, mu, alpha, beta): + return (mu > 0) & (alpha > 0) & (beta > 0) + + @staticmethod + def _log_kve(v, z): + """log(kve(v, z)) = log(Kv(z)) + z, stable for all z > 0. + + For large z, kve(v, z) ≈ sqrt(π/(2z)) underflows to 0 in float64, + making log(kve) return -inf/-nan. The asymptotic expansion: + + log(kve(v,z)) ≈ 0.5*log(π/(2z)) + log1p((4v²-1)/(8z) + + (4v²-1)(4v²-9)/(128z²)) + + is used whenever the direct evaluation would underflow. + """ + z = np.asarray(z, dtype=float) + v = np.asarray(v, dtype=float) + kve_val = kve(v, z) + # Asymptotic (2-term Hankel expansion) — accurate to O(z^{-3}) + mu_v = 4.0 * v ** 2 + log_asymp = ( + 0.5 * (np.log(np.pi) - np.log(2.0) - np.log(np.where(z > 0, z, 1.0))) + + np.log1p((mu_v - 1.0) / (8.0 * np.where(z > 0, z, 1.0)) + + (mu_v - 1.0) * (mu_v - 9.0) / (128.0 * np.where(z > 0, z**2, 1.0))) + ) + return np.where(kve_val > 0, np.log(np.where(kve_val > 0, kve_val, 1.0)), log_asymp) + + def _pdf(self, y, mu, alpha, beta): + return np.exp(self._logpdf(y, mu, alpha, beta)) + + def _logpdf(self, y, mu, alpha, beta): + half_sum = (alpha + beta) / 2.0 + log_ab_over_mu = np.log(alpha) + np.log(beta) - np.log(mu) + z = 2.0 * np.sqrt(alpha * beta * np.exp(y) / mu) + return ( + np.log(2.0) + - gammaln(alpha) + - gammaln(beta) + + half_sum * log_ab_over_mu + + half_sum * y + + self._log_kve(alpha - beta, z) + - z + ) + + def _stats(self, mu, alpha, beta): + mean = np.log(mu) - np.log(alpha) - np.log(beta) + sc.digamma(alpha) + sc.digamma(beta) + var = sc.polygamma(1, alpha) + sc.polygamma(1, beta) + g1 = (sc.polygamma(2, alpha) + sc.polygamma(2, beta)) / var**1.5 + g2 = (sc.polygamma(3, alpha) + sc.polygamma(3, beta)) / var**2 + return mean, var, g1, g2 + + def _cdf(self, y, mu, alpha, beta): + return k_dist.cdf(np.exp(y), mu, alpha, beta) + + def _rvs(self, mu, alpha, beta, size=None, random_state=None): + tau = random_state.gamma(beta, mu / beta, size=size) + sample = random_state.gamma(alpha, tau / alpha) + return np.log(np.clip(sample, np.finfo(float).tiny, None)) + + def fit(self, data, *args, **kwds): + if ("loc" in kwds and kwds["loc"] != 0.0) or ("floc" in kwds and kwds["floc"] != 0.0): + raise ValueError("logk uses a fixed loc=0.") + if ("scale" in kwds and kwds["scale"] != 1.0) or ("fscale" in kwds and kwds["fscale"] != 1.0): + raise ValueError("logk uses a fixed scale=1.") + kwds.pop("loc", None) + kwds.pop("scale", None) + # Supply data-driven initial guesses when none are provided so the + # optimizer starts close to the data instead of the default (1,1,1). + # E[Y] = ln(mu) + ln(alpha) + ln(beta) - digamma(alpha) - digamma(beta) + # => mu0 = exp(mean(data) + ln(a0) + ln(b0) - psi(a0) - psi(b0)) + if not args: + alpha0, beta0 = 1.0, 1.0 + mu0 = float(np.exp( + np.mean(data) + + np.log(alpha0) + np.log(beta0) + - sc.digamma(alpha0) - sc.digamma(beta0) + )) + args = (mu0, alpha0, beta0) + return super().fit(data, *args, floc=0.0, fscale=1.0, **kwds) + + +logk = logk_gen(name="logk", shapes="mu, alpha, beta") + + class lognakagami_gen(rv_continuous): """Log-Nakagami continuous random variable.