"""Comprehensive summary statistics and report generation for simulation results.
This module provides statistical analysis tools, distribution fitting utilities,
and formatted report generation for Monte Carlo simulation results.
"""
from dataclasses import dataclass
from datetime import datetime
from functools import lru_cache
import io
import math
from typing import Any, Dict, List, Optional, Tuple
import warnings
import numpy as np
import pandas as pd
from scipy import stats
[docs]
@dataclass
class StatisticalSummary:
"""Complete statistical summary of simulation results."""
basic_stats: Dict[str, float]
distribution_params: Dict[str, Dict[str, float]]
confidence_intervals: Dict[str, Tuple[float, float]]
hypothesis_tests: Dict[str, Dict[str, float]]
extreme_values: Dict[str, float]
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Convert summary to pandas DataFrame.
Returns:
DataFrame with all summary statistics
"""
rows = []
# Basic statistics
for stat, value in self.basic_stats.items():
rows.append({"category": "basic", "metric": stat, "value": value})
# Distribution parameters
for dist, params in self.distribution_params.items():
for param, value in params.items():
rows.append({"category": f"distribution_{dist}", "metric": param, "value": value})
# Confidence intervals
for metric, (lower, upper) in self.confidence_intervals.items():
rows.append(
{"category": "confidence_interval", "metric": f"{metric}_lower", "value": lower}
)
rows.append(
{"category": "confidence_interval", "metric": f"{metric}_upper", "value": upper}
)
# Hypothesis tests
for test, results in self.hypothesis_tests.items():
for metric, value in results.items():
rows.append({"category": f"test_{test}", "metric": metric, "value": value})
# Extreme values
for metric, value in self.extreme_values.items():
rows.append({"category": "extreme", "metric": metric, "value": value})
return pd.DataFrame(rows)
[docs]
class SummaryStatistics:
"""Calculate comprehensive summary statistics for simulation results."""
def __init__(
self,
confidence_level: float = 0.95,
bootstrap_iterations: int = 1000,
seed: Optional[int] = None,
):
"""Initialize summary statistics calculator.
Args:
confidence_level: Confidence level for intervals
bootstrap_iterations: Number of bootstrap iterations
seed: Optional random seed for reproducibility
"""
self.confidence_level = confidence_level
self.bootstrap_iterations = bootstrap_iterations
self._rng = np.random.default_rng(seed)
[docs]
def calculate_summary(
self, data: np.ndarray, weights: Optional[np.ndarray] = None
) -> StatisticalSummary:
"""Calculate complete statistical summary.
Args:
data: Input data array
weights: Optional weights for weighted statistics
Returns:
Complete statistical summary
"""
# Handle empty data
if len(data) == 0:
return StatisticalSummary(
basic_stats=self._calculate_basic_stats(data, weights),
distribution_params={},
confidence_intervals={},
hypothesis_tests={},
extreme_values={},
)
# Basic statistics
basic_stats = self._calculate_basic_stats(data, weights)
# Fit distributions
distribution_params = self._fit_distributions(data)
# Bootstrap confidence intervals
confidence_intervals = self._calculate_confidence_intervals(data)
# Hypothesis tests
hypothesis_tests = self._perform_hypothesis_tests(data)
# Extreme value statistics
extreme_values = self._calculate_extreme_values(data)
return StatisticalSummary(
basic_stats=basic_stats,
distribution_params=distribution_params,
confidence_intervals=confidence_intervals,
hypothesis_tests=hypothesis_tests,
extreme_values=extreme_values,
)
def _safe_skew_kurtosis(self, data: np.ndarray, stat_type: str) -> float:
"""Calculate skewness or kurtosis with warning suppression."""
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="Precision loss occurred")
if stat_type == "skew":
return float(stats.skew(data, nan_policy="omit"))
return float(stats.kurtosis(data, nan_policy="omit"))
def _calculate_basic_stats(
self, data: np.ndarray, weights: Optional[np.ndarray] = None
) -> Dict[str, float]:
"""Calculate basic descriptive statistics.
Args:
data: Input data
weights: Optional weights
Returns:
Dictionary of basic statistics
"""
# Handle empty data
if len(data) == 0:
return {
"count": 0,
"mean": 0.0,
"median": 0.0,
"std": 0.0,
"variance": 0.0,
"min": 0.0,
"max": 0.0,
"range": 0.0,
"iqr": 0.0,
"cv": 0.0,
"skewness": 0.0,
"kurtosis": 0.0,
"stderr": 0.0,
}
if weights is None:
return {
"count": len(data),
"mean": float(np.mean(data)),
"median": float(np.median(data)),
"std": float(np.std(data)),
"variance": float(np.var(data)),
"min": float(np.min(data)),
"max": float(np.max(data)),
"range": float(np.max(data) - np.min(data)),
"iqr": float(np.percentile(data, 75) - np.percentile(data, 25)),
"cv": float(np.std(data) / np.mean(data)) if np.mean(data) != 0 else np.inf,
"skewness": float(self._safe_skew_kurtosis(data, "skew")),
"kurtosis": float(self._safe_skew_kurtosis(data, "kurtosis")),
"stderr": float(np.std(data) / np.sqrt(len(data))),
}
mean = np.average(data, weights=weights)
variance = np.average((data - mean) ** 2, weights=weights)
std = np.sqrt(variance)
return {
"count": len(data),
"mean": float(mean),
"median": float(self._weighted_percentile(data, weights, 50)),
"std": float(std),
"variance": float(variance),
"min": float(np.min(data)),
"max": float(np.max(data)),
"range": float(np.max(data) - np.min(data)),
"iqr": float(
self._weighted_percentile(data, weights, 75)
- self._weighted_percentile(data, weights, 25)
),
"cv": float(std / mean) if mean != 0 else np.inf,
"effective_sample_size": float(np.sum(weights) ** 2 / np.sum(weights**2)),
}
def _weighted_percentile(
self, data: np.ndarray, weights: np.ndarray, percentile: float
) -> float:
"""Calculate weighted percentile.
Args:
data: Data values
weights: Weights
percentile: Percentile to calculate
Returns:
Weighted percentile value
"""
sorted_indices = np.argsort(data)
sorted_data = data[sorted_indices]
sorted_weights = weights[sorted_indices]
cumsum = np.cumsum(sorted_weights)
cutoff = percentile / 100.0 * cumsum[-1]
return float(sorted_data[np.searchsorted(cumsum, cutoff)])
def _fit_distributions(self, data: np.ndarray) -> Dict[str, Dict[str, float]]:
"""Fit various distributions to data.
Args:
data: Input data
Returns:
Dictionary of fitted distribution parameters
"""
results = {}
# Normal distribution
try:
with np.errstate(divide="ignore", invalid="ignore"):
mu, sigma = stats.norm.fit(data)
ks_stat, ks_pvalue = stats.kstest(data, lambda x: stats.norm.cdf(x, mu, sigma))
results["normal"] = {
"mu": float(mu),
"sigma": float(sigma),
"ks_statistic": float(ks_stat),
"ks_pvalue": float(ks_pvalue),
"aic": float(self._calculate_aic(data, stats.norm, mu, sigma)),
}
except (ValueError, TypeError, RuntimeError):
pass
# Log-normal distribution
try:
with np.errstate(divide="ignore", invalid="ignore"):
shape, loc, scale = stats.lognorm.fit(data, floc=0)
ks_stat, ks_pvalue = stats.kstest(
data, lambda x: stats.lognorm.cdf(x, shape, loc, scale)
)
results["lognormal"] = {
"shape": float(shape),
"location": float(loc),
"scale": float(scale),
"ks_statistic": float(ks_stat),
"ks_pvalue": float(ks_pvalue),
"aic": float(self._calculate_aic(data, stats.lognorm, shape, loc, scale)),
}
except (ValueError, TypeError, RuntimeError):
pass
# Gamma distribution
try:
with np.errstate(divide="ignore", invalid="ignore"):
alpha, loc, scale = stats.gamma.fit(data, floc=0)
ks_stat, ks_pvalue = stats.kstest(
data, lambda x: stats.gamma.cdf(x, alpha, loc, scale)
)
results["gamma"] = {
"alpha": float(alpha),
"location": float(loc),
"scale": float(scale),
"ks_statistic": float(ks_stat),
"ks_pvalue": float(ks_pvalue),
"aic": float(self._calculate_aic(data, stats.gamma, alpha, loc, scale)),
}
except (ValueError, TypeError, RuntimeError):
pass
# Exponential distribution
try:
loc, scale = stats.expon.fit(data, floc=0)
ks_stat, ks_pvalue = stats.kstest(data, lambda x: stats.expon.cdf(x, loc, scale))
results["exponential"] = {
"location": float(loc),
"scale": float(scale),
"ks_statistic": float(ks_stat),
"ks_pvalue": float(ks_pvalue),
"aic": float(self._calculate_aic(data, stats.expon, loc, scale)),
}
except (ValueError, TypeError, RuntimeError):
pass
return results
def _calculate_aic(self, data: np.ndarray, distribution: stats.rv_continuous, *params) -> float:
"""Calculate Akaike Information Criterion for distribution fit.
Args:
data: Data points
distribution: Scipy distribution object
params: Distribution parameters
Returns:
AIC value
"""
log_likelihood = np.sum(distribution.logpdf(data, *params))
n_params = len(params)
return float(2 * n_params - 2 * log_likelihood)
def _calculate_confidence_intervals(self, data: np.ndarray) -> Dict[str, Tuple[float, float]]:
"""Calculate bootstrap confidence intervals.
Args:
data: Input data
Returns:
Dictionary of confidence intervals
"""
n_samples = len(data)
alpha = 1 - self.confidence_level
# Bootstrap samples
means = []
medians = []
stds = []
for _ in range(self.bootstrap_iterations):
sample = self._rng.choice(data, size=n_samples, replace=True)
means.append(np.mean(sample))
medians.append(np.median(sample))
stds.append(np.std(sample))
# Calculate percentile confidence intervals
lower_percentile = (alpha / 2) * 100
upper_percentile = (1 - alpha / 2) * 100
return {
"mean": (
float(np.percentile(means, lower_percentile)),
float(np.percentile(means, upper_percentile)),
),
"median": (
float(np.percentile(medians, lower_percentile)),
float(np.percentile(medians, upper_percentile)),
),
"std": (
float(np.percentile(stds, lower_percentile)),
float(np.percentile(stds, upper_percentile)),
),
}
def _perform_hypothesis_tests(self, data: np.ndarray) -> Dict[str, Dict[str, float]]:
"""Perform various hypothesis tests on data.
Args:
data: Input data
Returns:
Dictionary of test results
"""
results = {}
# Normality tests
# Shapiro test requires at least 3 samples
if len(data) >= 3:
shapiro_stat, shapiro_p = stats.shapiro(
data[: min(5000, len(data))]
) # Limit sample size
else:
shapiro_stat, shapiro_p = np.nan, np.nan
# Jarque-Bera test needs sufficient samples
if len(data) >= 2:
with np.errstate(divide="ignore", invalid="ignore"):
jarque_bera_stat, jarque_bera_p = stats.jarque_bera(data)
else:
jarque_bera_stat, jarque_bera_p = np.nan, np.nan
results["normality"] = {
"shapiro_statistic": float(shapiro_stat),
"shapiro_pvalue": float(shapiro_p),
"jarque_bera_statistic": float(jarque_bera_stat),
"jarque_bera_pvalue": float(jarque_bera_p),
}
# One-sample t-test (test if mean is different from 0)
if len(data) >= 2:
with np.errstate(divide="ignore", invalid="ignore"):
t_stat, t_p = stats.ttest_1samp(data, 0)
else:
t_stat, t_p = np.nan, np.nan
results["t_test"] = {"statistic": float(t_stat), "pvalue": float(t_p)}
# Autocorrelation test (Ljung-Box test approximation)
if len(data) > 1:
with np.errstate(divide="ignore", invalid="ignore"):
corr_matrix = np.corrcoef(data[:-1], data[1:])
lag1_corr = corr_matrix[0, 1] if not np.isnan(corr_matrix[0, 1]) else 0.0
results["autocorrelation"] = {
"lag1_correlation": float(lag1_corr),
"significant": float(abs(lag1_corr) > 2 / np.sqrt(len(data))),
}
return results
def _calculate_extreme_values(self, data: np.ndarray) -> Dict[str, float]:
"""Calculate extreme value statistics.
Args:
data: Input data
Returns:
Dictionary of extreme value statistics
"""
percentiles = [0.1, 1, 5, 95, 99, 99.9]
extreme_stats = {}
for p in percentiles:
extreme_stats[f"percentile_{p}"] = float(np.percentile(data, p))
# Tail indices
threshold_lower = np.percentile(data, 5)
threshold_upper = np.percentile(data, 95)
lower_tail = data[data <= threshold_lower]
upper_tail = data[data >= threshold_upper]
if len(lower_tail) > 1 and np.mean(lower_tail) != 0:
extreme_stats["lower_tail_index"] = float(np.std(lower_tail) / np.mean(lower_tail))
if len(upper_tail) > 1 and np.mean(upper_tail) != 0:
extreme_stats["upper_tail_index"] = float(np.std(upper_tail) / np.mean(upper_tail))
# Expected shortfall (CVaR)
var_95 = np.percentile(data, 5)
expected_shortfall = np.mean(data[data <= var_95])
extreme_stats["expected_shortfall_5%"] = float(expected_shortfall)
return extreme_stats
[docs]
class TDigest:
"""T-digest data structure for streaming quantile estimation.
Implements the merging digest variant from Dunning & Ertl (2019).
Provides accurate quantile estimates, especially at the tails,
with bounded memory usage proportional to the compression parameter.
The t-digest maintains a sorted set of centroids (mean, weight) that
adaptively cluster data points. Clusters near the tails (q->0 or q->1)
are kept small for precision, while clusters near the median can be larger.
Args:
compression: Controls accuracy vs memory tradeoff. Higher values
give more accuracy but use more memory. Typical range: 100-300.
Default 200 gives ~0.2-1% error at median, ~0.005-0.05% at q01/q99.
References:
Dunning, T. & Ertl, O. (2019). "Computing Extremely Accurate Quantiles
Using t-Digests." arXiv:1902.04023.
"""
def __init__(self, compression: float = 200):
self.compression = compression
self._means: np.ndarray = np.array([], dtype=np.float64)
self._weights: np.ndarray = np.array([], dtype=np.float64)
self._buffer: List[float] = []
self._buffer_capacity = max(int(compression * 5), 500)
self._total_weight = 0.0
self._min_val = float("inf")
self._max_val = float("-inf")
self._count = 0
self._merge_direction = True # Alternate merge direction for better accuracy
[docs]
def update(self, value: float) -> None:
"""Add a single observation to the digest.
Args:
value: The value to add.
"""
self._buffer.append(value)
self._min_val = min(self._min_val, value)
self._max_val = max(self._max_val, value)
self._count += 1
if len(self._buffer) >= self._buffer_capacity:
self._flush()
[docs]
def update_batch(self, values: np.ndarray) -> None:
"""Add an array of observations to the digest.
Args:
values: Array of values to add.
"""
if len(values) == 0:
return
flat = values.ravel()
self._buffer.extend(flat.tolist())
min_v = float(np.min(flat))
max_v = float(np.max(flat))
self._min_val = min(self._min_val, min_v)
self._max_val = max(self._max_val, max_v)
self._count += len(flat)
if len(self._buffer) >= self._buffer_capacity:
self._flush()
[docs]
def merge(self, other: "TDigest") -> None:
"""Merge another t-digest into this one.
After merging, this digest contains the combined information from both
digests. The other digest is not modified.
Args:
other: Another TDigest to merge into this one.
"""
other._flush()
if len(other._means) == 0 and other._count == 0:
return
self._flush()
if len(self._means) == 0 and self._count == 0:
self._means = other._means.copy()
self._weights = other._weights.copy()
self._total_weight = other._total_weight
self._min_val = other._min_val
self._max_val = other._max_val
self._count = other._count
return
all_means = np.concatenate([self._means, other._means])
all_weights = np.concatenate([self._weights, other._weights])
self._count += other._count
self._min_val = min(self._min_val, other._min_val)
self._max_val = max(self._max_val, other._max_val)
self._merge_centroids(all_means, all_weights)
[docs]
def quantile(self, q: float) -> float:
"""Estimate a single quantile.
Args:
q: Quantile to estimate, in range [0, 1].
Returns:
Estimated value at the given quantile.
Raises:
ValueError: If the digest is empty.
"""
self._flush()
if len(self._means) == 0:
raise ValueError("Cannot compute quantile of empty digest")
if q <= 0:
return self._min_val
if q >= 1:
return self._max_val
if len(self._means) == 1:
return float(self._means[0])
total = self._total_weight
target = q * total
# Compute centroid weight centers (cumulative weight at center of each centroid)
cum = np.cumsum(self._weights)
centers = cum - self._weights / 2.0
# Left tail: target before center of first centroid
if target <= centers[0]:
if centers[0] > 0:
return float(self._min_val + (self._means[0] - self._min_val) * target / centers[0])
return float(self._min_val)
# Right tail: target after center of last centroid
if target >= centers[-1]:
remaining = total - centers[-1]
if remaining > 0:
return float(
self._means[-1]
+ (self._max_val - self._means[-1]) * (target - centers[-1]) / remaining
)
return float(self._max_val)
# Interior: interpolate between adjacent centroid centers
idx = int(np.searchsorted(centers, target, side="right")) - 1
idx = max(0, min(idx, len(centers) - 2))
left_center = centers[idx]
right_center = centers[idx + 1]
if right_center > left_center:
t = (target - left_center) / (right_center - left_center)
return float(self._means[idx] + t * (self._means[idx + 1] - self._means[idx]))
return float(self._means[idx])
[docs]
def quantiles(self, qs: List[float]) -> Dict[str, float]:
"""Estimate multiple quantiles.
Args:
qs: List of quantiles to estimate, each in range [0, 1].
Returns:
Dictionary mapping per-mille quantile keys (e.g. ``q0250``
for the 25th percentile) to estimated values.
"""
results = {}
for q in sorted(qs):
key = format_quantile_key(q)
results[key] = self.quantile(q)
return results
[docs]
def cdf(self, value: float) -> float:
"""Estimate the cumulative distribution function at a value.
Args:
value: The value at which to estimate the CDF.
Returns:
Estimated probability P(X <= value).
Raises:
ValueError: If the digest is empty.
"""
self._flush()
if len(self._means) == 0:
raise ValueError("Cannot compute CDF of empty digest")
if value <= self._min_val:
return 0.0
if value >= self._max_val:
return 1.0
if len(self._means) == 1:
# Single centroid: linear interpolation between min and max
if self._max_val > self._min_val:
return (value - self._min_val) / (self._max_val - self._min_val)
return 0.5
total = self._total_weight
cum = np.cumsum(self._weights)
centers = cum - self._weights / 2.0
# Before first centroid mean
if value <= self._means[0]:
if self._means[0] > self._min_val:
t = (value - self._min_val) / (self._means[0] - self._min_val)
return float(t * centers[0] / total)
return float(centers[0] / total)
# After last centroid mean
if value >= self._means[-1]:
if self._max_val > self._means[-1]:
t = (value - self._means[-1]) / (self._max_val - self._means[-1])
return float((centers[-1] + t * (total - centers[-1])) / total)
return float(centers[-1] / total)
# Interior: find bracketing centroids
idx = int(np.searchsorted(self._means, value, side="right")) - 1
idx = max(0, min(idx, len(self._means) - 2))
left_mean = self._means[idx]
right_mean = self._means[idx + 1]
if right_mean > left_mean:
t = (value - left_mean) / (right_mean - left_mean)
weight_pos = centers[idx] + t * (centers[idx + 1] - centers[idx])
return float(weight_pos / total)
return float(centers[idx] / total)
@property
def centroid_count(self) -> int:
"""Return the number of centroids currently stored."""
self._flush()
return len(self._means)
[docs]
def __len__(self) -> int:
"""Return the total count of observations added."""
return self._count
def _flush(self) -> None:
"""Merge buffered values into centroids."""
if not self._buffer:
return
buf = np.array(self._buffer, dtype=np.float64)
self._buffer = []
if len(self._means) > 0:
all_means = np.concatenate([self._means, buf])
all_weights = np.concatenate([self._weights, np.ones(len(buf), dtype=np.float64)])
else:
all_means = buf
all_weights = np.ones(len(buf), dtype=np.float64)
self._merge_centroids(all_means, all_weights)
def _merge_centroids(self, means: np.ndarray, weights: np.ndarray) -> None:
"""Core merge step: sort centroids and merge under scale function constraints.
Alternates merge direction (left-to-right / right-to-left) for balanced accuracy.
"""
total = float(np.sum(weights))
# Sort by mean
order = np.argsort(means, kind="mergesort")
means = means[order]
weights = weights[order]
# Alternate merge direction for better accuracy at both tails
if not self._merge_direction:
means = means[::-1]
weights = weights[::-1]
self._merge_direction = not self._merge_direction
result_m: List[float] = []
result_w: List[float] = []
cum_weight = 0.0
cur_m = float(means[0])
cur_w = float(weights[0])
for i in range(1, len(means)):
proposed_w = cur_w + float(weights[i])
q_left = cum_weight / total
q_right = (cum_weight + proposed_w) / total
# Scale function constraint: k(q_right) - k(q_left) <= 1
if self._k(q_right) - self._k(q_left) <= 1.0:
# Merge into current centroid
cur_m = (cur_m * cur_w + float(means[i]) * float(weights[i])) / proposed_w
cur_w = proposed_w
else:
# Emit current centroid, start new one
result_m.append(cur_m)
result_w.append(cur_w)
cum_weight += cur_w
cur_m = float(means[i])
cur_w = float(weights[i])
# Emit last centroid
result_m.append(cur_m)
result_w.append(cur_w)
new_means = np.array(result_m, dtype=np.float64)
new_weights = np.array(result_w, dtype=np.float64)
# If we merged right-to-left, reverse back to sorted order
if self._merge_direction: # We already flipped the flag, so check current state
new_order = np.argsort(new_means, kind="mergesort")
new_means = new_means[new_order]
new_weights = new_weights[new_order]
self._means = new_means
self._weights = new_weights
self._total_weight = total
def _k(self, q: float) -> float:
"""Scale function k1 from Dunning & Ertl (2019).
k1(q) = (delta / (2*pi)) * arcsin(2*q - 1)
This scale function provides highest precision at the tails
(q near 0 or 1) where insurance risk metrics (VaR, TVaR) are computed.
"""
q = max(1e-15, min(q, 1 - 1e-15))
return (self.compression / (2.0 * math.pi)) * math.asin(2.0 * q - 1.0)
[docs]
class QuantileCalculator:
"""Efficient quantile calculation for large datasets."""
def __init__(self, quantiles: Optional[List[float]] = None, seed: Optional[int] = None):
"""Initialize quantile calculator.
Args:
quantiles: List of quantiles to calculate (0-1 range)
seed: Optional random seed for reproducibility
"""
if quantiles is None:
quantiles = [0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]
self.quantiles = sorted(quantiles)
self._rng = np.random.default_rng(seed)
[docs]
@lru_cache(maxsize=128)
def calculate_quantiles(self, data_hash: int, method: str = "linear") -> Dict[str, float]:
"""Calculate quantiles with caching.
Args:
data_hash: Hash of data array for caching
method: Interpolation method
Returns:
Dictionary of quantile values
"""
# This is a placeholder - actual data needs to be passed separately
# due to hashing limitations
return {}
[docs]
def calculate(self, data: np.ndarray, method: str = "linear") -> Dict[str, float]:
"""Calculate quantiles for data.
Args:
data: Input data array
method: Interpolation method ('linear', 'nearest', 'lower', 'higher', 'midpoint')
Returns:
Dictionary of quantile values
"""
results = {}
# Use numpy's percentile function for compatibility
quantile_values = np.percentile(data, [q * 100 for q in self.quantiles])
for q, value in zip(self.quantiles, quantile_values):
results[format_quantile_key(q)] = float(value)
return results
[docs]
def streaming_quantiles(
self, data_stream: np.ndarray, compression: float = 200
) -> Dict[str, float]:
"""Calculate quantiles for streaming data using the t-digest algorithm.
Uses the t-digest merging digest algorithm (Dunning & Ertl, 2019) for
streaming quantile estimation with bounded memory and high accuracy,
especially at tail quantiles relevant to insurance risk metrics.
Args:
data_stream: Streaming data array
compression: Controls accuracy vs memory tradeoff. Higher values
give more accuracy but use more memory. Typical range: 100-300.
Default 200 gives ~0.2-1% error at median, ~0.005-0.05% at
q01/q99. Passed directly to TDigest.
Returns:
Dictionary of approximate quantile values
"""
if len(data_stream) <= int(compression * 5):
return self.calculate(data_stream)
digest = TDigest(compression=compression)
digest.update_batch(data_stream)
return digest.quantiles(self.quantiles)
[docs]
class DistributionFitter:
"""Fit and compare multiple probability distributions to data."""
DISTRIBUTIONS = {
"normal": stats.norm,
"lognormal": stats.lognorm,
"gamma": stats.gamma,
"exponential": stats.expon,
"weibull": stats.weibull_min,
"beta": stats.beta,
"pareto": stats.pareto,
"uniform": stats.uniform,
}
def __init__(self):
"""Initialize distribution fitter."""
self.fitted_params = {}
self.goodness_of_fit = {}
[docs]
def fit_all(self, data: np.ndarray, distributions: Optional[List[str]] = None) -> pd.DataFrame:
"""Fit multiple distributions and compare goodness of fit.
Args:
data: Input data
distributions: List of distributions to fit (None for all)
Returns:
DataFrame comparing distribution fits
"""
if distributions is None:
distributions = list(self.DISTRIBUTIONS.keys())
results = []
for dist_name in distributions:
if dist_name not in self.DISTRIBUTIONS:
continue
dist = self.DISTRIBUTIONS[dist_name]
try:
# Fit distribution
params = dist.fit(data)
self.fitted_params[dist_name] = params
# Calculate goodness of fit metrics
ks_stat, ks_p = stats.kstest(
data, lambda x, dist=dist, params=params: dist.cdf(x, *params)
)
log_likelihood = np.sum(dist.logpdf(data, *params))
aic = 2 * len(params) - 2 * log_likelihood
bic = len(params) * np.log(len(data)) - 2 * log_likelihood
results.append(
{
"distribution": dist_name,
"n_params": len(params),
"ks_statistic": ks_stat,
"ks_pvalue": ks_p,
"log_likelihood": log_likelihood,
"aic": aic,
"bic": bic,
}
)
self.goodness_of_fit[dist_name] = {
"ks_statistic": ks_stat,
"ks_pvalue": ks_p,
"aic": aic,
"bic": bic,
}
except (ValueError, TypeError, RuntimeError) as e:
results.append({"distribution": dist_name, "error": str(e)})
# Create DataFrame and sort by AIC
df = pd.DataFrame(results)
if "aic" in df.columns:
df = df.sort_values("aic")
return df
[docs]
def get_best_distribution(self, criterion: str = "aic") -> Tuple[str, Dict[str, float]]:
"""Get the best-fitting distribution based on criterion.
Args:
criterion: Selection criterion ('aic', 'bic', 'ks_pvalue')
Returns:
Tuple of (distribution name, parameters)
"""
if not self.goodness_of_fit:
raise ValueError("No distributions fitted yet")
if criterion == "ks_pvalue":
# Higher p-value is better
best_dist = max(
self.goodness_of_fit.items(), key=lambda x: x[1].get(criterion, -np.inf)
)[0]
else:
# Lower AIC/BIC is better
best_dist = min(
self.goodness_of_fit.items(), key=lambda x: x[1].get(criterion, np.inf)
)[0]
return best_dist, self.fitted_params[best_dist]
[docs]
def generate_qq_plot_data(
self, data: np.ndarray, distribution: str
) -> Tuple[np.ndarray, np.ndarray]:
"""Generate data for Q-Q plot.
Args:
data: Original data
distribution: Distribution name
Returns:
Tuple of (theoretical quantiles, sample quantiles)
"""
if distribution not in self.fitted_params:
raise ValueError(f"Distribution {distribution} not fitted")
params = self.fitted_params[distribution]
dist = self.DISTRIBUTIONS[distribution]
# Calculate quantiles
n = len(data)
theoretical_quantiles = np.array(
[dist.ppf((i - 0.5) / n, *params) for i in range(1, n + 1)]
)
sample_quantiles = np.sort(data)
return theoretical_quantiles, sample_quantiles
[docs]
class SummaryReportGenerator:
"""Generate formatted summary reports for simulation results."""
def __init__(self, style: str = "markdown"):
"""Initialize report generator.
Args:
style: Report style ('markdown', 'html', 'latex')
"""
self.style = style
[docs]
def generate_report(
self,
summary: StatisticalSummary,
title: str = "Simulation Results Summary",
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Generate formatted report.
Args:
summary: Statistical summary object
title: Report title
metadata: Additional metadata to include
Returns:
Formatted report string
"""
if self.style == "markdown":
return self._generate_markdown_report(summary, title, metadata)
if self.style == "html":
return self._generate_html_report(summary, title, metadata)
if self.style == "latex":
return self._generate_latex_report(summary, title, metadata)
raise ValueError(f"Unsupported style: {self.style}")
def _generate_markdown_report(
self, summary: StatisticalSummary, title: str, metadata: Optional[Dict[str, Any]]
) -> str:
"""Generate Markdown report.
Args:
summary: Statistical summary
title: Report title
metadata: Additional metadata
Returns:
Markdown formatted report
"""
report = io.StringIO()
# Title and metadata
report.write(f"# {title}\n\n")
report.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
if metadata:
report.write("## Metadata\n\n")
for key, value in metadata.items():
report.write(f"- **{key}**: {value}\n")
report.write("\n")
# Basic statistics
report.write("## Basic Statistics\n\n")
report.write("| Metric | Value |\n")
report.write("|--------|-------|\n")
for metric, value in summary.basic_stats.items():
report.write(f"| {metric} | {value:.6f} |\n")
report.write("\n")
# Distribution fits
if summary.distribution_params:
report.write("## Distribution Fitting\n\n")
for dist, params in summary.distribution_params.items():
report.write(f"### {dist.title()} Distribution\n\n")
for param, value in params.items():
report.write(f"- {param}: {value:.6f}\n")
report.write("\n")
# Confidence intervals
report.write("## Confidence Intervals\n\n")
report.write("| Metric | Lower | Upper |\n")
report.write("|--------|-------|-------|\n")
for metric, (lower, upper) in summary.confidence_intervals.items():
report.write(f"| {metric} | {lower:.6f} | {upper:.6f} |\n")
report.write("\n")
# Hypothesis tests
if summary.hypothesis_tests:
report.write("## Hypothesis Tests\n\n")
for test, results in summary.hypothesis_tests.items():
report.write(f"### {test.replace('_', ' ').title()}\n\n")
for metric, value in results.items():
report.write(f"- {metric}: {value:.6f}\n")
report.write("\n")
# Extreme values
report.write("## Extreme Value Statistics\n\n")
report.write("| Metric | Value |\n")
report.write("|--------|-------|\n")
for metric, value in summary.extreme_values.items():
report.write(f"| {metric} | {value:.6f} |\n")
return report.getvalue()
def _generate_html_report(
self, summary: StatisticalSummary, title: str, metadata: Optional[Dict[str, Any]]
) -> str:
"""Generate HTML report.
Args:
summary: Statistical summary
title: Report title
metadata: Additional metadata
Returns:
HTML formatted report
"""
df = summary.to_dataframe()
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>{title}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
h1 {{ color: #333; }}
table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
.metadata {{ background-color: #f9f9f9; padding: 10px; margin: 10px 0; }}
</style>
</head>
<body>
<h1>{title}</h1>
<p>Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
"""
if metadata:
html += '<div class="metadata"><h2>Metadata</h2><ul>'
for key, value in metadata.items():
html += f"<li><strong>{key}</strong>: {value}</li>"
html += "</ul></div>"
html += df.to_html(index=False, classes="results-table")
html += "</body></html>"
return str(html)
def _generate_latex_report(
self, summary: StatisticalSummary, title: str, metadata: Optional[Dict[str, Any]]
) -> str:
"""Generate LaTeX report.
Args:
summary: Statistical summary
title: Report title
metadata: Additional metadata
Returns:
LaTeX formatted report
"""
df = summary.to_dataframe()
latex = f"""
\\documentclass{{article}}
\\usepackage{{booktabs}}
\\usepackage{{longtable}}
\\title{{{title}}}
\\date{{\\today}}
\\begin{{document}}
\\maketitle
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
if metadata:
latex += "\\section{Metadata}\n\\begin{itemize}\n"
for key, value in metadata.items():
latex += f"\\item \\textbf{{{key}}}: {value}\n"
latex += "\\end{itemize}\n"
latex += "\\section{Results}\n"
latex += df.to_latex(index=False, longtable=True)
latex += "\\end{document}"
return str(latex)