"""Ergodic analysis framework for comparing time-average vs ensemble-average growth.
Implements Ole Peters' ergodic economics framework for insurance decision
making. For multiplicative processes like business growth with volatile
losses, ensemble averages and time averages diverge — insurance transforms
growth dynamics in ways that traditional expected value analysis cannot
capture.
Core class:
:class:`ErgodicAnalyzer` — time-average growth, ensemble statistics,
convergence analysis, and significance testing.
Data containers (re-exported from :mod:`~ergodic_insurance.ergodic_types`):
:class:`ErgodicData`, :class:`ErgodicAnalysisResults`,
:class:`ValidationResults`
Scenario and pipeline helpers (delegated to submodules):
:func:`~ergodic_insurance.scenario_analysis.compare_scenarios`,
:func:`~ergodic_insurance.scenario_analysis.analyze_simulation_batch`,
:func:`~ergodic_insurance.integrated_analysis.integrate_loss_ergodic_analysis`,
:func:`~ergodic_insurance.integrated_analysis.validate_insurance_ergodic_impact`
For usage examples see the
`Analyzing Results tutorial <https://docs.mostlyoptimal.com/tutorials/05_analyzing_results.html>`_.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
import numpy as np
from scipy import stats
# Re-export data containers for backward compatibility — downstream code
# imports ErgodicData, ErgodicAnalysisResults, and ValidationResults from
# this module.
from .ergodic_types import (
BatchAnalysisResults,
ErgodicAnalysisResults,
ErgodicData,
ScenarioComparison,
ValidationResults,
)
from .simulation import SimulationResults
if TYPE_CHECKING:
from .insurance_program import InsuranceProgram
from .loss_distributions import LossData
from .monte_carlo import MonteCarloResults
logger = logging.getLogger(__name__)
__all__ = [
"ErgodicAnalyzer",
"BatchAnalysisResults",
"ErgodicAnalysisResults",
"ErgodicData",
"ScenarioComparison",
"ValidationResults",
]
[docs]
class ErgodicAnalyzer:
"""Analyzer for ergodic properties of insurance strategies.
Computes time-average vs ensemble-average growth rates, demonstrating
that insurance can improve time-average growth even when premiums
exceed expected losses.
Attributes:
convergence_threshold: Standard error threshold for Monte Carlo
convergence.
For detailed examples see the
`Analyzing Results tutorial <https://docs.mostlyoptimal.com/tutorials/05_analyzing_results.html>`_.
"""
def __init__(self, convergence_threshold: float = 0.01):
"""Initialize with convergence criteria.
Args:
convergence_threshold: SE threshold for Monte Carlo convergence
(default 0.01). Lower values require more simulations.
"""
self.convergence_threshold = convergence_threshold
# ------------------------------------------------------------------
# Core ergodic calculations
# ------------------------------------------------------------------
[docs]
def calculate_time_average_growth(
self, values: np.ndarray, time_horizon: Optional[int] = None
) -> float:
"""Calculate time-average growth rate for a single trajectory.
Computes ``g = (1/T) * ln(X(T)/X(0))``, the actual compound
growth experienced by a single entity over time.
Args:
values: Array of values over time (equity, assets, wealth).
Length must be >= 2.
time_horizon: Override for time period *T*. If *None*, uses
``len(values) - 1``.
Returns:
Time-average growth rate per period. Returns ``-inf`` for
bankrupt trajectories (final value <= 0) and ``0.0`` when
``time_horizon <= 0``.
Note:
Assumes uniform unit time steps. For non-uniform steps,
pass an explicit *time_horizon*.
"""
if values is None or len(values) == 0:
return -np.inf
# Check for zero/negative values (bankruptcy)
if values[-1] <= 0:
return -np.inf
# Skip any leading zero/negative values
valid_mask = values > 0
if not np.any(valid_mask):
return -np.inf
# Get first valid positive value
first_idx = np.argmax(valid_mask)
initial_value = values[first_idx]
final_value = values[-1]
# Calculate time period
if time_horizon is None:
time_horizon = int(len(values) - 1 - first_idx)
# Calculate growth rate
if final_value > 0 and initial_value > 0 and time_horizon > 0:
growth_rate = float((1.0 / time_horizon) * np.log(final_value / initial_value))
return growth_rate
return 0.0 if time_horizon <= 0 else -np.inf
def _extract_trajectory_values(
self, trajectories: List[np.ndarray]
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Extract initial, final values and lengths from trajectories."""
values_data = [(traj[-1], traj[0], len(traj)) for traj in trajectories if len(traj) > 0]
if not values_data:
return np.array([]), np.array([]), np.array([])
finals, initials, lengths = zip(*values_data)
return np.array(finals), np.array(initials), np.array(lengths)
def _calculate_growth_rates(
self, finals: np.ndarray, initials: np.ndarray, lengths: np.ndarray
) -> np.ndarray:
"""Calculate growth rates from trajectory values.
Note:
Assumes uniform unit time steps. The formula
``log(f/i) / (t-1)`` divides by the number of
inter-observation intervals, so the returned rate is
per-period. For non-uniform time steps, callers should
use :meth:`calculate_time_average_growth` with an explicit
*time_horizon*.
"""
rates = [np.log(f / i) / (t - 1) for f, i, t in zip(finals, initials, lengths) if t > 1]
return np.array(rates) if rates else np.array([])
def _process_variable_length_trajectories(
self, trajectories: List[np.ndarray], metric: str
) -> Dict[str, float]:
"""Process trajectories with variable lengths."""
n_paths = len(trajectories)
results: Dict[str, Any] = {}
if metric in ["final_value", "growth_rate"]:
# Get final and initial values from each trajectory
final_values, initial_values, time_lengths = self._extract_trajectory_values(
trajectories
)
# Filter valid paths (positive initial and final values)
if len(final_values) > 0:
valid_mask = (initial_values > 0) & (final_values > 0)
valid_finals = final_values[valid_mask]
valid_initials = initial_values[valid_mask]
valid_lengths = time_lengths[valid_mask]
else:
valid_finals = valid_initials = valid_lengths = np.array([])
if metric == "final_value":
results["mean"] = np.mean(valid_finals) if len(valid_finals) > 0 else 0.0
results["std"] = np.std(valid_finals, ddof=1) if len(valid_finals) > 1 else 0.0
results["median"] = np.median(valid_finals) if len(valid_finals) > 0 else 0.0
else: # growth_rate
growth_rates = self._calculate_growth_rates(
valid_finals, valid_initials, valid_lengths
)
results["mean"] = np.mean(growth_rates) if len(growth_rates) > 0 else 0.0
results["std"] = np.std(growth_rates, ddof=1) if len(growth_rates) > 1 else 0.0
results["median"] = np.median(growth_rates) if len(growth_rates) > 0 else 0.0
else: # full trajectory - not well-defined for different lengths
results["mean_trajectory"] = None
results["std_trajectory"] = None
# Add survival statistics
survived = sum(1 for traj in trajectories if len(traj) > 0 and traj[-1] > 0)
results["survival_rate"] = survived / n_paths if n_paths > 0 else 0.0
results["n_survived"] = survived
results["n_total"] = n_paths
return results
def _process_fixed_length_trajectories(
self, trajectories: np.ndarray, metric: str
) -> Dict[str, float]:
"""Process trajectories with fixed lengths."""
n_paths, n_time = trajectories.shape
results: Dict[str, Any] = {}
if metric in ["final_value", "growth_rate"]:
# Get final values (trajectories is now a numpy array)
final_values = trajectories[:, -1]
initial_values = trajectories[:, 0]
# Filter valid paths (positive initial and final values)
valid_mask = (initial_values > 0) & (final_values > 0)
valid_finals = final_values[valid_mask]
valid_initials = initial_values[valid_mask]
if metric == "final_value":
results["mean"] = np.mean(valid_finals) if len(valid_finals) > 0 else 0.0
results["std"] = np.std(valid_finals, ddof=1) if len(valid_finals) > 1 else 0.0
results["median"] = np.median(valid_finals) if len(valid_finals) > 0 else 0.0
else: # growth_rate
growth_rates = np.log(valid_finals / valid_initials) / (n_time - 1)
results["mean"] = np.mean(growth_rates) if len(growth_rates) > 0 else 0.0
results["std"] = np.std(growth_rates, ddof=1) if len(growth_rates) > 1 else 0.0
results["median"] = np.median(growth_rates) if len(growth_rates) > 0 else 0.0
else: # full trajectory
results["mean_trajectory"] = np.mean(trajectories, axis=0)
results["std_trajectory"] = np.std(trajectories, axis=0, ddof=1)
# Add survival statistics
survived = np.sum(trajectories[:, -1] > 0)
results["survival_rate"] = survived / n_paths
results["n_survived"] = survived
results["n_total"] = n_paths
return results
[docs]
def calculate_ensemble_average(
self,
trajectories: Union[List[np.ndarray], np.ndarray],
metric: str = "final_value",
) -> Dict[str, float]:
"""Calculate ensemble average and statistics across multiple paths.
Args:
trajectories: List of 1-D arrays (variable lengths) or
2-D array ``(n_paths, n_timesteps)``.
metric: ``"final_value"``, ``"growth_rate"``, or ``"full"``.
Returns:
Dict with ``mean``, ``std``, ``median``, ``survival_rate``,
``n_survived``, ``n_total`` (and ``mean_trajectory`` /
``std_trajectory`` for metric ``"full"``).
"""
# Handle list of arrays with potentially different lengths
if isinstance(trajectories, list):
# Check if all arrays have the same length
lengths = [len(traj) for traj in trajectories if len(traj) > 0]
if len(set(lengths)) == 1 and lengths:
# All same length, can convert to 2D array
trajectories_array = np.array(trajectories)
return self._process_fixed_length_trajectories(trajectories_array, metric)
# Different lengths, work with list
return self._process_variable_length_trajectories(trajectories, metric)
return self._process_fixed_length_trajectories(trajectories, metric)
[docs]
def check_convergence(self, values: np.ndarray, window_size: int = 100) -> Tuple[bool, float]:
"""Check Monte Carlo convergence using rolling standard error.
Args:
values: Array of metric values (e.g. time-average growth rates).
window_size: Rolling window size (default 100).
Returns:
``(converged, standard_error)`` — *converged* is ``True``
when SE < :attr:`convergence_threshold`.
"""
if len(values) < window_size:
return False, np.inf
# Standard error of the mean
se = np.std(values[-window_size:], ddof=1) / np.sqrt(window_size)
# Check if SE is below threshold
converged = bool(se < self.convergence_threshold)
return converged, se
[docs]
def significance_test(
self,
sample1: Union[List[float], np.ndarray],
sample2: Union[List[float], np.ndarray],
test_type: str = "two-sided",
) -> Tuple[float, float]:
"""Welch's t-test between two growth rate samples.
Args:
sample1: First sample (e.g. insured growth rates).
sample2: Second sample (e.g. uninsured growth rates).
test_type: ``"two-sided"``, ``"greater"``, or ``"less"``.
Returns:
``(t_statistic, p_value)``.
"""
# Perform Welch's t-test (does not assume equal variances)
t_stat, p_value = stats.ttest_ind(
sample1, sample2, equal_var=False, alternative=test_type, nan_policy="omit"
)
return t_stat, p_value
# ------------------------------------------------------------------
# Scenario comparison (delegated to scenario_analysis module)
# ------------------------------------------------------------------
[docs]
def compare_scenarios(
self,
insured_results: Union[List[SimulationResults], np.ndarray, "MonteCarloResults"],
uninsured_results: Union[List[SimulationResults], np.ndarray, "MonteCarloResults"],
metric: str = "equity",
) -> ScenarioComparison:
"""Compare insured vs uninsured scenarios using ergodic analysis.
For detailed examples see the
`Advanced Scenarios tutorial <https://docs.mostlyoptimal.com/tutorials/06_advanced_scenarios.html>`_.
Args:
insured_results: Simulation results from insured scenarios —
list of :class:`SimulationResults`, 2-D array, or
:class:`~ergodic_insurance.monte_carlo.MonteCarloResults`.
uninsured_results: Simulation results from uninsured scenarios
(same format as *insured_results*; types may differ).
metric: Financial metric to analyze (default ``"equity"``).
Ignored when a :class:`MonteCarloResults` is passed.
Returns:
:class:`ScenarioComparison` with ``insured``, ``uninsured``,
and ``ergodic_advantage`` fields. Supports dict-style access
for backward compatibility (with deprecation warnings).
Example:
Using :class:`MonteCarloResults` directly::
from ergodic_insurance import ErgodicAnalyzer
from ergodic_insurance.monte_carlo import MonteCarloEngine
engine = MonteCarloEngine(manufacturer, config)
insured_mc = engine.run(insurance_program=program)
uninsured_mc = engine.run(insurance_program=None)
analyzer = ErgodicAnalyzer()
comparison = analyzer.compare_scenarios(insured_mc, uninsured_mc)
"""
from . import scenario_analysis
return scenario_analysis.compare_scenarios(self, insured_results, uninsured_results, metric)
[docs]
def analyze_simulation_batch(
self, simulation_results: List[SimulationResults], label: str = "Scenario"
) -> BatchAnalysisResults:
"""Perform comprehensive ergodic analysis on a batch of simulations.
For detailed examples see the
`Analyzing Results tutorial <https://docs.mostlyoptimal.com/tutorials/05_analyzing_results.html>`_.
Args:
simulation_results: List of :class:`SimulationResults` from
Monte Carlo runs.
label: Descriptive label for this batch.
Returns:
:class:`BatchAnalysisResults` with ``time_average``,
``ensemble_average``, ``convergence``, ``survival_analysis``,
and ``ergodic_divergence`` fields. Supports dict-style access
for backward compatibility (with deprecation warnings).
"""
from . import scenario_analysis
return scenario_analysis.analyze_simulation_batch(self, simulation_results, label)
# ------------------------------------------------------------------
# Integrated analysis pipeline (delegated to integrated_analysis)
# ------------------------------------------------------------------
[docs]
def integrate_loss_ergodic_analysis(
self,
loss_data: "LossData",
insurance_program: Optional["InsuranceProgram"],
manufacturer: Any,
time_horizon: int,
n_simulations: int = 100,
) -> ErgodicAnalysisResults:
"""End-to-end integrated loss modelling and ergodic analysis.
Pipeline: validate -> apply insurance -> aggregate losses ->
Monte Carlo -> ergodic metrics -> validate -> package results.
For detailed examples see the
`Optimization Workflow tutorial <https://docs.mostlyoptimal.com/tutorials/04_optimization_workflow.html>`_.
Args:
loss_data: Standardized loss data.
insurance_program: Insurance program or *None* for uninsured.
manufacturer: Manufacturer model instance.
time_horizon: Analysis time horizon in years.
n_simulations: Number of Monte Carlo runs (default 100).
Returns:
:class:`ErgodicAnalysisResults` with growth, survival,
insurance impact, and validation status.
"""
from . import integrated_analysis
return integrated_analysis.integrate_loss_ergodic_analysis(
self,
loss_data,
insurance_program,
manufacturer,
time_horizon,
n_simulations,
)
[docs]
def validate_insurance_ergodic_impact(
self,
base_scenario: SimulationResults,
insurance_scenario: SimulationResults,
insurance_program: Optional["InsuranceProgram"] = None,
) -> ValidationResults:
"""Validate insurance effects in ergodic calculations.
Checks premium deductions, recovery credits, collateral impacts,
and growth rate consistency.
For detailed examples see the
`Advanced Scenarios tutorial <https://docs.mostlyoptimal.com/tutorials/06_advanced_scenarios.html>`_.
Args:
base_scenario: Simulation results without insurance.
insurance_scenario: Simulation results with insurance.
insurance_program: Insurance program (optional, for detailed
premium checks).
Returns:
:class:`ValidationResults` with individual check flags and
diagnostics.
"""
from . import integrated_analysis
return integrated_analysis.validate_insurance_ergodic_impact(
self, base_scenario, insurance_scenario, insurance_program
)