Source code for ergodic_insurance.simulation

"""Simulation engine for time evolution of widget manufacturer model.

This module provides the main simulation engine that orchestrates the
time evolution of the widget manufacturer financial model, managing
loss events, financial calculations, and result collection.

The simulation framework supports both single-path and Monte Carlo simulations,
enabling comprehensive analysis of insurance strategies and business outcomes
under uncertainty. It tracks detailed financial metrics, processes insurance claims,
and handles bankruptcy conditions appropriately.

Key Features:
    - Single-path trajectory simulation with detailed metrics
    - Monte Carlo simulation support through integration
    - Insurance claim processing with policy application
    - Financial statement tracking and ROE calculation
    - Bankruptcy detection and proper termination
    - Comprehensive result analysis and export capabilities

Examples:
    Basic simulation::

        from ergodic_insurance import Simulation, Config, WidgetManufacturer, ManufacturingLossGenerator

        config = Config()
        manufacturer = WidgetManufacturer(config.manufacturer)
        loss_generator = ManufacturingLossGenerator.create_simple(
            frequency=0.1, severity_mean=5_000_000, seed=42
        )

        sim = Simulation(
            manufacturer=manufacturer,
            loss_generator=loss_generator,
            time_horizon=50
        )
        results = sim.run()

        print(f"Mean ROE: {results.summary_stats()['mean_roe']:.2%}")

Note:
    This module is thread-safe for parallel Monte Carlo simulations when
    each thread has its own Simulation instance.

Since:
    Version 0.1.0
"""

import copy
from dataclasses import dataclass
import logging
from pathlib import Path
import time
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd

from .config import DEFAULT_RISK_FREE_RATE, Config
from .decimal_utils import ZERO, to_decimal
from .insurance import InsurancePolicy
from .insurance_program import InsuranceProgram
from .loss_distributions import LossData, LossEvent, ManufacturingLossGenerator
from .manufacturer import WidgetManufacturer
from .monte_carlo import MonteCarloEngine

logger = logging.getLogger(__name__)


[docs] @dataclass class SimulationResults: """Container for simulation trajectory data. Holds the complete time series of financial metrics and events from a single simulation run, with methods for analysis and export. This dataclass provides comprehensive storage for all simulation outputs and includes utility methods for calculating derived metrics, performing statistical analysis, and exporting data for further processing. Attributes: years: Array of simulation years (0 to time_horizon-1). assets: Total assets at each year. equity: Shareholder equity at each year. roe: Return on equity for each year. revenue: Annual revenue for each year. net_income: Annual net income for each year. claim_counts: Number of claims in each year. claim_amounts: Total claim amount in each year. insolvency_year: Year when bankruptcy occurred (None if survived). Examples: Analyzing simulation results:: results = simulation.run() # Get summary statistics stats = results.summary_stats() print(f"Survival: {stats['survived']}") print(f"Mean ROE: {stats['mean_roe']:.2%}") # Export to DataFrame df = results.to_dataframe() df.to_csv('simulation_results.csv') # Calculate volatility metrics volatility = results.calculate_roe_volatility() print(f"ROE Sharpe Ratio: {volatility['roe_sharpe']:.2f}") Note: All financial values are in nominal dollars without inflation adjustment. ROE calculations handle edge cases like zero equity appropriately. """ years: np.ndarray assets: np.ndarray equity: np.ndarray roe: np.ndarray revenue: np.ndarray net_income: np.ndarray claim_counts: np.ndarray claim_amounts: np.ndarray insolvency_year: Optional[int] = None
[docs] def to_dataframe(self) -> pd.DataFrame: """Convert simulation results to pandas DataFrame. Returns: pd.DataFrame: DataFrame with columns for year, assets, equity, roe, revenue, net_income, claim_count, and claim_amount. Examples: Export to Excel:: df = results.to_dataframe() df.to_excel('results.xlsx', index=False) """ return pd.DataFrame( { "year": self.years, "assets": self.assets, "equity": self.equity, "roe": self.roe, "revenue": self.revenue, "net_income": self.net_income, "claim_count": self.claim_counts, "claim_amount": self.claim_amounts, } )
[docs] def calculate_time_weighted_roe(self) -> float: """Calculate time-weighted average ROE. Time-weighted ROE gives equal weight to each period regardless of the equity level, providing a better measure of consistent performance over time. Uses geometric mean for proper compounding. Returns: float: Time-weighted average ROE as a decimal (e.g., 0.08 for 8%). Note: This method uses geometric mean of growth factors (1 + ROE) to properly account for compounding effects. NaN values are excluded from the calculation. Examples: Compare different ROE measures:: simple_avg = np.mean(results.roe) time_weighted = results.calculate_time_weighted_roe() print(f"Simple average: {simple_avg:.2%}") print(f"Time-weighted: {time_weighted:.2%}") """ valid_roe = self.roe[~np.isnan(self.roe)] if len(valid_roe) == 0: return 0.0 # For time-weighted average, we use geometric mean for compounding returns # Guard against total loss scenarios where ROE <= -1 would cause log domain errors # Clip to -0.99 (99% loss) to maintain valid growth factors >= 0.01 clipped_roe = np.clip(valid_roe, -0.99, None) # Convert ROE to growth factors (1 + roe) growth_factors = 1 + clipped_roe # Geometric mean minus 1 to get average return time_weighted_roe = np.exp(np.mean(np.log(growth_factors))) - 1 return float(time_weighted_roe)
[docs] def calculate_rolling_roe(self, window: int) -> np.ndarray: """Calculate rolling window ROE. Args: window: Window size in years (e.g., 1, 3, 5). Must be positive and not exceed the data length. Returns: np.ndarray: Array of rolling ROE values. Values are NaN for positions where the full window is not available. Raises: ValueError: If window size exceeds data length. Examples: Calculate and plot rolling ROE:: rolling_3yr = results.calculate_rolling_roe(3) plt.plot(results.years, rolling_3yr, label='3-Year Rolling ROE') plt.axhline(y=0.08, color='r', linestyle='--', label='Target') """ if window > len(self.roe): raise ValueError(f"Window {window} larger than data length {len(self.roe)}") rolling_roe = np.full(len(self.roe), np.nan) for i in range(window - 1, len(self.roe)): window_data = self.roe[i - window + 1 : i + 1] valid_data = window_data[~np.isnan(window_data)] if len(valid_data) > 0: rolling_roe[i] = np.mean(valid_data) return rolling_roe
[docs] def calculate_roe_components( self, base_operating_margin: float = 0.08, tax_rate: float = 0.25 ) -> Dict[str, np.ndarray]: """Calculate ROE component breakdown. Decomposes ROE into operating, insurance, and tax components using DuPont-style analysis. This helps identify the drivers of ROE performance and the impact of insurance decisions. Args: base_operating_margin: Baseline operating margin for the business. Defaults to 0.08 (8%). Can be sourced from manufacturer.config.base_operating_margin. tax_rate: Corporate tax rate. Defaults to 0.25 (25%). Can be sourced from manufacturer.config.tax_rate. Returns: Dict[str, np.ndarray]: Dictionary containing: - 'operating_roe': Base business ROE without claims - 'insurance_impact': ROE reduction from claims/premiums - 'tax_effect': Impact of taxes on ROE - 'total_roe': Actual ROE for reference Note: This is a simplified decomposition. Actual implementation would require more detailed financial data for precise attribution. Examples: Analyze ROE drivers:: components = results.calculate_roe_components() operating_avg = np.mean(components['operating_roe']) insurance_drag = np.mean(components['insurance_impact']) print(f"Operating ROE: {operating_avg:.2%}") print(f"Insurance drag: {insurance_drag:.2%}") Using manufacturer config values:: components = results.calculate_roe_components( base_operating_margin=manufacturer.config.base_operating_margin, tax_rate=manufacturer.config.tax_rate, ) """ components = { "operating_roe": np.zeros(len(self.years)), "insurance_impact": np.zeros(len(self.years)), "tax_effect": np.zeros(len(self.years)), "total_roe": self.roe.copy(), } # Calculate components based on available data for i in range(len(self.years)): if self.equity[i] > 0 and not np.isnan(self.roe[i]): # Operating ROE = (Revenue - Operating Costs) / Equity # This is a simplified calculation; actual implementation # would need more detailed financial data base_margin = base_operating_margin components["operating_roe"][i] = (self.revenue[i] * base_margin) / self.equity[i] # Insurance impact = reduction in ROE due to premiums and retained losses if self.claim_amounts[i] > 0: components["insurance_impact"][i] = -self.claim_amounts[i] / self.equity[i] # Tax effect (simplified) components["tax_effect"][i] = self.roe[i] * (1 - tax_rate) - self.roe[i] return components
[docs] def calculate_roe_volatility(self) -> Dict[str, float]: """Calculate ROE volatility metrics. Computes various risk-adjusted performance metrics for ROE, including standard deviation, downside deviation, Sharpe ratio, and coefficient of variation. Returns: Dict[str, float]: Dictionary containing: - 'roe_std': Standard deviation of ROE - 'roe_downside_deviation': Downside deviation from mean - 'roe_sharpe': Sharpe ratio using 2% risk-free rate - 'roe_coefficient_variation': Coefficient of variation (std/mean) Note: Returns zeros for all metrics if insufficient data (< 2 observations). Sharpe ratio uses a 2% risk-free rate assumption. Examples: Risk-adjusted performance analysis:: volatility = results.calculate_roe_volatility() if volatility['roe_sharpe'] > 1.0: print("Strong risk-adjusted performance") print(f"Downside risk: {volatility['roe_downside_deviation']:.2%}") """ valid_roe = self.roe[~np.isnan(self.roe)] if len(valid_roe) < 2: return { "roe_std": 0.0, "roe_downside_deviation": 0.0, "roe_sharpe": 0.0, "roe_coefficient_variation": 0.0, } mean_roe = np.mean(valid_roe) std_roe = np.std(valid_roe, ddof=1) # Downside deviation (only negative deviations from mean) negative_deviations = valid_roe[valid_roe < mean_roe] - mean_roe downside_dev = ( np.sqrt(np.mean(negative_deviations**2)) if len(negative_deviations) > 0 else 0.0 ) # Sharpe ratio equivalent for ROE risk_free_rate = DEFAULT_RISK_FREE_RATE sharpe = (mean_roe - risk_free_rate) / std_roe if std_roe > 0 else 0.0 # Coefficient of variation cv = std_roe / abs(mean_roe) if mean_roe != 0 else float("inf") return { "roe_std": std_roe, "roe_downside_deviation": downside_dev, "roe_sharpe": sharpe, "roe_coefficient_variation": cv, }
[docs] def summary_stats(self) -> Dict[str, float]: """Calculate summary statistics for the simulation. Computes comprehensive summary statistics including ROE metrics, rolling averages, volatility measures, and survival indicators. Returns: Dict[str, float]: Dictionary containing: - Basic ROE metrics (mean, std, median, time-weighted) - Rolling averages (1, 3, 5 year) - Final state (assets, equity) - Claims statistics (total, frequency) - Survival indicators (survived, insolvency_year) - Volatility metrics (from calculate_roe_volatility) Examples: Generate summary report:: stats = results.summary_stats() print("Performance Summary:") print(f" Mean ROE: {stats['mean_roe']:.2%}") print(f" Volatility: {stats['std_roe']:.2%}") print(f" Sharpe Ratio: {stats['roe_sharpe']:.2f}") print("\nRisk Summary:") print(f" Survived: {stats['survived']}") print(f" Total Claims: ${stats['total_claims']:,.0f}") """ # Filter out NaN values for ROE calculation valid_roe = self.roe[~np.isnan(self.roe)] # Calculate rolling ROE for different windows rolling_3yr = self.calculate_rolling_roe(3) if len(self.years) >= 3 else np.array([]) rolling_5yr = self.calculate_rolling_roe(5) if len(self.years) >= 5 else np.array([]) # Get volatility metrics volatility_metrics = self.calculate_roe_volatility() base_stats = { "mean_roe": np.mean(valid_roe) if len(valid_roe) > 0 else 0.0, "std_roe": np.std(valid_roe, ddof=1) if len(valid_roe) > 1 else 0.0, "median_roe": np.median(valid_roe) if len(valid_roe) > 0 else 0.0, "time_weighted_roe": self.calculate_time_weighted_roe(), "roe_3yr_avg": np.nanmean(rolling_3yr) if len(rolling_3yr) > 0 else 0.0, "roe_5yr_avg": np.nanmean(rolling_5yr) if len(rolling_5yr) > 0 else 0.0, "final_assets": self.assets[-1], "final_equity": self.equity[-1], "total_claims": np.sum(self.claim_amounts), "claim_frequency": np.mean(self.claim_counts), "survived": self.insolvency_year is None, "insolvency_year": ( float(self.insolvency_year) if self.insolvency_year is not None else 0.0 ), } # Add volatility metrics base_stats.update(volatility_metrics) return base_stats
[docs] class Simulation: """Simulation engine for widget manufacturer time evolution. The main simulation class that coordinates the time evolution of the widget manufacturer model, processing losses and tracking financial performance over the specified time horizon. Supports both single-path and Monte Carlo simulations, with comprehensive tracking of financial metrics, loss events, and bankruptcy conditions. Examples: Basic simulation setup and execution:: from ergodic_insurance.config import ManufacturerConfig from ergodic_insurance.manufacturer import WidgetManufacturer from ergodic_insurance.loss_distributions import ManufacturingLossGenerator from ergodic_insurance.insurance import InsurancePolicy from ergodic_insurance.simulation import Simulation # Create manufacturer config = ManufacturerConfig(initial_assets=10_000_000) manufacturer = WidgetManufacturer(config) # Create insurance policy policy = InsurancePolicy( deductible=500_000, limit=5_000_000, premium_rate=0.02 ) # Run simulation sim = Simulation( manufacturer=manufacturer, loss_generator=ManufacturingLossGenerator.create_simple(seed=42), insurance_policy=policy, time_horizon=10 ) results = sim.run() # Analyze results print(f"Mean ROE: {results.summary_stats()['mean_roe']:.2%}") print(f"Survived: {results.insolvency_year is None}") Running Monte Carlo simulation:: # Use MonteCarloEngine for multiple paths monte_carlo = MonteCarloEngine( base_simulation=sim, n_simulations=1000, parallel=True ) mc_results = monte_carlo.run() print(f"Survival rate: {mc_results.survival_rate:.1%}") print(f"95% VaR: ${mc_results.var_95:,.0f}") Attributes: manufacturer: The widget manufacturer being simulated loss_generator: Generator for loss events insurance_policy: Optional insurance coverage time_horizon: Simulation duration in years seed: Random seed for reproducibility See Also: :class:`SimulationResults`: Container for simulation output :class:`MonteCarloEngine`: For running multiple simulation paths :class:`WidgetManufacturer`: The core financial model :class:`ManufacturingLossGenerator`: For generating loss events """ def __init__( self, manufacturer: WidgetManufacturer, loss_generator: Optional[ Union[ManufacturingLossGenerator, List[ManufacturingLossGenerator]] ] = None, insurance_policy: Optional[InsurancePolicy] = None, time_horizon: int = 100, seed: Optional[int] = None, growth_rate: float = 0.0, letter_of_credit_rate: float = 0.015, ): """Initialize simulation. Args: manufacturer: WidgetManufacturer instance to simulate. This object maintains the financial state and is modified during simulation. loss_generator: ManufacturingLossGenerator or list of generators for creating loss events. If a list is provided, losses from all generators are combined. If None, a default generator with standard parameters is created. insurance_policy: Insurance policy for claim processing. If None, legacy claim processing is used with fixed parameters. time_horizon: Number of years to simulate. Must be positive. seed: Random seed for reproducibility. Passed to loss generator(s). growth_rate: Revenue growth rate per period (default 0.0). letter_of_credit_rate: Annual LoC rate for collateral costs (default 0.015). Note: The manufacturer object is modified in-place during simulation. Create a copy if you need to preserve the initial state. Examples: Setup with custom insurance:: from ergodic_insurance.insurance import InsurancePolicy policy = InsurancePolicy( deductible=1_000_000, limit=10_000_000, premium_rate=0.03 ) sim = Simulation( manufacturer=manufacturer, insurance_policy=policy, time_horizon=50 ) Setup with multiple loss generators:: sim = Simulation( manufacturer=manufacturer, loss_generator=[standard_losses, catastrophic_risk], insurance_policy=policy, time_horizon=20 ) """ self.manufacturer = manufacturer # Deep-copy the initial manufacturer state for re-entrancy (Issue #349) self._initial_manufacturer = copy.deepcopy(manufacturer) self.growth_rate = growth_rate self.letter_of_credit_rate = letter_of_credit_rate self._seed = seed # Handle single generator or list of generators if loss_generator is None: # Create default loss generator with reasonable parameters self.loss_generator = [ ManufacturingLossGenerator.create_simple( frequency=0.1, # 10% chance per year severity_mean=5_000_000, # $5M mean claim severity_std=2_000_000, # $2M standard deviation seed=seed, ) ] elif isinstance(loss_generator, list): self.loss_generator = loss_generator else: self.loss_generator = [loss_generator] self.insurance_policy = insurance_policy self.time_horizon = time_horizon self.seed = seed # Pre-allocate arrays for efficiency self.years = np.arange(time_horizon) self.assets = np.zeros(time_horizon) self.equity = np.zeros(time_horizon) self.roe = np.zeros(time_horizon) self.revenue = np.zeros(time_horizon) self.net_income = np.zeros(time_horizon) self.claim_counts = np.zeros(time_horizon) self.claim_amounts = np.zeros(time_horizon) self.insolvency_year: Optional[int] = None
[docs] def step_annual(self, year: int, losses: List[LossEvent]) -> Dict[str, Any]: """Execute single annual time step. Processes losses for the year, applies insurance coverage, updates manufacturer financial state, and returns metrics. Args: year: Current simulation year (0-indexed). losses: List of LossEvent objects for this year. Returns: Dict[str, float]: Dictionary containing metrics: - All metrics from manufacturer.step() - 'claim_count': Number of losses this year - 'claim_amount': Total loss amount before insurance - 'company_payment': Amount paid by company after deductible - 'insurance_recovery': Amount recovered from insurance Note: This method modifies the manufacturer state in-place. Insurance premiums are deducted from both assets and equity to maintain balance sheet integrity. Side Effects: - Modifies manufacturer.assets and manufacturer.equity - Updates manufacturer internal state via step() method """ # Unified ordering: claims → premium → step (Issue #349) # Process claims and premium BEFORE stepping the manufacturer so that # the metrics returned by step() reflect post-claim equity, and # insolvency is detected in the year it occurs. total_loss_amount = sum(loss.amount for loss in losses) total_company_payment = ZERO total_insurance_recovery = ZERO # Apply insurance if self.insurance_policy: for loss in losses: # Use manufacturer's built-in claim processing which handles collateral and payment schedules company_payment, insurance_recovery = self.manufacturer.process_insurance_claim( claim_amount=loss.amount, deductible_amount=self.insurance_policy.deductible, insurance_limit=self.insurance_policy.get_total_coverage(), ) total_company_payment += company_payment total_insurance_recovery += insurance_recovery # Record annual premium annual_premium = self.insurance_policy.calculate_premium() self.manufacturer.record_insurance_premium(annual_premium) else: # No insurance - process as uninsured claims for loss in losses: # Company pays full amount with no insurance coverage # Use deferred payment to avoid immediate double-hit on equity self.manufacturer.process_uninsured_claim( claim_amount=loss.amount, immediate_payment=False, # Create liability with payment schedule starting next year ) total_company_payment += to_decimal(loss.amount) # Step manufacturer AFTER claims and premium (Issue #349) # Uses configurable parameters instead of hardcoded values metrics = self.manufacturer.step( letter_of_credit_rate=self.letter_of_credit_rate, growth_rate=self.growth_rate, ) # Add loss information to metrics metrics["claim_count"] = len(losses) metrics["claim_amount"] = total_loss_amount metrics["company_payment"] = total_company_payment metrics["insurance_recovery"] = total_insurance_recovery return metrics
[docs] def run(self, progress_interval: int = 100) -> SimulationResults: """Run the full simulation over the specified time horizon. Executes a complete simulation trajectory, processing claims each year, updating the manufacturer's financial state, and tracking all metrics. The simulation terminates early if the manufacturer becomes insolvent. Args: progress_interval: How often to log progress (in years). Set to 0 to disable progress logging. Useful for long simulations. Returns: SimulationResults object containing: - Complete time series of financial metrics - Claim history and amounts - ROE trajectory - Insolvency year (if bankruptcy occurred) Examples: Run simulation with progress updates:: sim = Simulation(manufacturer, time_horizon=1000) results = sim.run(progress_interval=100) # Log every 100 years # Check if company survived if results.insolvency_year: print(f"Bankruptcy in year {results.insolvency_year}") else: print(f"Survived {len(results.years)} years") Analyze simulation results:: results = sim.run() df = results.to_dataframe() # Plot equity evolution import matplotlib.pyplot as plt plt.plot(df['year'], df['equity']) plt.xlabel('Year') plt.ylabel('Equity ($)') plt.title('Company Equity Over Time') plt.show() Note: The simulation uses pre-generated claims for efficiency. All claims are generated at the start based on the configured loss distributions and random seed. See Also: :meth:`step_annual`: Single year simulation step :class:`SimulationResults`: Output data structure """ start_time = time.time() # Reset mutable state so the simulation is re-entrant # Reset manufacturer from initial state (Issue #349) self.manufacturer = copy.deepcopy(self._initial_manufacturer) # Reseed loss generators so repeated runs produce identical sequences if self._seed is not None: for gen in self.loss_generator: if hasattr(gen, "reseed"): gen.reseed(self._seed) self.insolvency_year = None self.assets = np.zeros(self.time_horizon) self.equity = np.zeros(self.time_horizon) self.roe = np.zeros(self.time_horizon) self.revenue = np.zeros(self.time_horizon) self.net_income = np.zeros(self.time_horizon) self.claim_counts = np.zeros(self.time_horizon) self.claim_amounts = np.zeros(self.time_horizon) logger.info(f"Starting {self.time_horizon}-year simulation with dynamic loss generation") # Run simulation for year in range(self.time_horizon): # Log progress if year > 0 and year % progress_interval == 0: elapsed = time.time() - start_time rate = year / elapsed if elapsed > 0 else float("inf") remaining = (self.time_horizon - year) / rate logger.info( f"Year {year}/{self.time_horizon} - {elapsed:.1f}s elapsed, {remaining:.1f}s remaining" ) # Generate losses for this year based on current financial state year_losses: List[LossEvent] = [] revenue = ( self.manufacturer.revenue if hasattr(self.manufacturer, "revenue") else 10_000_000 ) for generator in self.loss_generator: # Generate losses for one year duration losses, _ = generator.generate_losses(duration=1, revenue=revenue, time=float(year)) year_losses.extend(losses) # Execute time step metrics = self.step_annual(year, year_losses) # Store results self.assets[year] = metrics.get("assets", 0) self.equity[year] = metrics.get("equity", 0) self.roe[year] = metrics.get("roe", 0) self.revenue[year] = metrics.get("revenue", 0) self.net_income[year] = metrics.get("net_income", 0) self.claim_counts[year] = metrics.get("claim_count", 0) self.claim_amounts[year] = metrics.get("claim_amount", 0) # Check for insolvency using manufacturer's insolvency tolerance tolerance = self.manufacturer.config.insolvency_tolerance if metrics.get("equity", 0) <= tolerance and self.insolvency_year is None: self.insolvency_year = year logger.warning( f"Manufacturer became insolvent in year {year} (equity <= ${tolerance:,.2f})" ) # Fill remaining years with zeros self.assets[year + 1 :] = 0 self.equity[year + 1 :] = 0 self.roe[year + 1 :] = np.nan self.revenue[year + 1 :] = 0 self.net_income[year + 1 :] = 0 break # Stop simulation — manufacturer is insolvent # Calculate total time total_time = time.time() - start_time logger.info(f"Simulation completed in {total_time:.2f} seconds") # Create and return results results = SimulationResults( years=self.years, assets=self.assets, equity=self.equity, roe=self.roe, revenue=self.revenue, net_income=self.net_income, claim_counts=self.claim_counts, claim_amounts=self.claim_amounts, insolvency_year=self.insolvency_year, ) return results
[docs] def run_with_loss_data( self, loss_data: LossData, validate: bool = True, progress_interval: int = 100 ) -> SimulationResults: """Run simulation using standardized LossData. Args: loss_data: Standardized loss data. validate: Whether to validate loss data before running. progress_interval: How often to log progress. Returns: SimulationResults object with full trajectory. """ # Validate if requested if validate and not loss_data.validate(): logger.warning("Loss data validation failed") raise ValueError("Invalid loss data provided") # Convert to LossEvents losses = loss_data.to_loss_events() # Group losses by year losses_by_year: Dict[int, List[LossEvent]] = {year: [] for year in range(self.time_horizon)} for loss in losses: year = int(loss.time) if 0 <= year < self.time_horizon: losses_by_year[year].append(loss) # Reset mutable state so the simulation is re-entrant # Reset manufacturer from initial state (Issue #349) self.manufacturer = copy.deepcopy(self._initial_manufacturer) if self._seed is not None: for gen in self.loss_generator: if hasattr(gen, "reseed"): gen.reseed(self._seed) self.insolvency_year = None self.assets = np.zeros(self.time_horizon) self.equity = np.zeros(self.time_horizon) self.roe = np.zeros(self.time_horizon) self.revenue = np.zeros(self.time_horizon) self.net_income = np.zeros(self.time_horizon) self.claim_counts = np.zeros(self.time_horizon) self.claim_amounts = np.zeros(self.time_horizon) logger.info( f"Starting {self.time_horizon}-year simulation with {len(losses)} losses from LossData" ) start_time = time.time() # Run simulation year by year for year in range(self.time_horizon): # Log progress if year > 0 and year % progress_interval == 0: elapsed = time.time() - start_time rate = year / elapsed if elapsed > 0 else float("inf") remaining = (self.time_horizon - year) / rate logger.info( f"Year {year}/{self.time_horizon} - {elapsed:.1f}s elapsed, {remaining:.1f}s remaining" ) # Get losses for this year year_losses = losses_by_year.get(year, []) # Execute time step metrics = self.step_annual(year, year_losses) # Store results self.assets[year] = metrics.get("assets", 0) self.equity[year] = metrics.get("equity", 0) self.roe[year] = metrics.get("roe", 0) self.revenue[year] = metrics.get("revenue", 0) self.net_income[year] = metrics.get("net_income", 0) self.claim_counts[year] = metrics.get("claim_count", 0) self.claim_amounts[year] = metrics.get("claim_amount", 0) # Check for insolvency using manufacturer's insolvency tolerance tolerance = self.manufacturer.config.insolvency_tolerance if metrics.get("equity", 0) <= tolerance and self.insolvency_year is None: self.insolvency_year = year logger.warning( f"Manufacturer became insolvent in year {year} (equity <= ${tolerance:,.2f})" ) # Fill remaining years with zeros self.assets[year + 1 :] = 0 self.equity[year + 1 :] = 0 self.roe[year + 1 :] = np.nan self.revenue[year + 1 :] = 0 self.net_income[year + 1 :] = 0 break # Stop simulation — manufacturer is insolvent # Log completion total_time = time.time() - start_time logger.info(f"Simulation with LossData completed in {total_time:.2f} seconds") # Create and return results results = SimulationResults( years=self.years, assets=self.assets, equity=self.equity, roe=self.roe, revenue=self.revenue, net_income=self.net_income, claim_counts=self.claim_counts, claim_amounts=self.claim_amounts, insolvency_year=self.insolvency_year, ) return results
[docs] def get_trajectory(self) -> pd.DataFrame: """Get simulation trajectory as pandas DataFrame. This is a convenience method that runs the simulation if needed and returns the results as a DataFrame. Returns: DataFrame with simulation trajectory. """ results = self.run() return results.to_dataframe()
[docs] @classmethod def run_monte_carlo( # pylint: disable=too-many-locals cls, config: Config, insurance_policy: InsurancePolicy, n_scenarios: int = 10000, batch_size: int = 1000, n_jobs: int = 7, checkpoint_dir: Optional[Path] = None, checkpoint_frequency: int = 5000, seed: Optional[int] = None, resume: bool = True, ) -> Dict[str, Any]: """Run Monte Carlo simulation using the MonteCarloEngine. This is a convenience class method for running large-scale Monte Carlo simulations with the optimized engine. Args: config: Configuration object. insurance_policy: Insurance policy to simulate. n_scenarios: Number of scenarios to run. batch_size: Scenarios per batch. n_jobs: Number of parallel jobs. checkpoint_dir: Directory for checkpoints. checkpoint_frequency: Save checkpoint every N scenarios. seed: Random seed. resume: Whether to resume from checkpoint. Returns: Dictionary of Monte Carlo results and statistics. """ # Create loss generator from .loss_distributions import ManufacturingLossGenerator loss_generator = ManufacturingLossGenerator(seed=seed) # Create insurance program with layers passed to constructor so that # layer_states are properly initialized (Issue #348) if insurance_policy: from .insurance_program import EnhancedInsuranceLayer layer = EnhancedInsuranceLayer( attachment_point=0, limit=( insurance_policy.limit if hasattr(insurance_policy, "limit") else float("inf") ), base_premium_rate=0.01, ) insurance_program = InsuranceProgram(layers=[layer]) else: insurance_program = InsuranceProgram(layers=[]) # Create manufacturer manufacturer = WidgetManufacturer(config=config.manufacturer) # Create simulation config from .monte_carlo import SimulationConfig sim_config = SimulationConfig( n_simulations=n_scenarios, n_years=getattr(config.simulation, "years", 10), parallel=n_jobs > 1 if n_jobs else True, n_workers=n_jobs, chunk_size=batch_size, checkpoint_interval=checkpoint_frequency, seed=seed, ) # Run simulation WITH insurance engine_with_insurance = MonteCarloEngine( loss_generator=loss_generator, insurance_program=insurance_program, manufacturer=manufacturer, config=sim_config, ) logger.info("Running Monte Carlo simulation WITH insurance...") results_with_insurance = engine_with_insurance.run() # Run simulation WITHOUT insurance using the same seed for fair comparison logger.info("Running Monte Carlo simulation WITHOUT insurance (same seed)...") # Create new loss generator with same seed for reproducibility loss_generator_no_insurance = ManufacturingLossGenerator(seed=seed) # Create new manufacturer instance (must be fresh for second run) manufacturer_no_insurance = WidgetManufacturer(config=config.manufacturer) # Create empty insurance program (no coverage) insurance_program_no_insurance = InsuranceProgram(layers=[]) engine_without_insurance = MonteCarloEngine( loss_generator=loss_generator_no_insurance, insurance_program=insurance_program_no_insurance, manufacturer=manufacturer_no_insurance, config=sim_config, ) results_without_insurance = engine_without_insurance.run() # Calculate empirical ergodic analysis by comparing the two runs ergodic_analysis = {} if hasattr(results_with_insurance, "statistics") and hasattr( results_without_insurance, "statistics" ): stats_with = results_with_insurance.statistics stats_without = results_without_insurance.statistics # Extract metrics from both runs if "geometric_return" in stats_with and "geometric_return" in stats_without: geo_with = stats_with["geometric_return"] geo_without = stats_without["geometric_return"] premium_cost = insurance_policy.calculate_premium() initial_assets = config.manufacturer.initial_assets premium_rate = premium_cost / initial_assets # Calculate empirical insurance impact ergodic_analysis = { "premium_rate": premium_rate, # With insurance metrics "geometric_mean_return_with_insurance": geo_with.get("geometric_mean", 0.0), "survival_rate_with_insurance": geo_with.get("survival_rate", 0.0), "volatility_with_insurance": geo_with.get("std", 0.0), # Without insurance metrics "geometric_mean_return_without_insurance": geo_without.get( "geometric_mean", 0.0 ), "survival_rate_without_insurance": geo_without.get("survival_rate", 0.0), "volatility_without_insurance": geo_without.get("std", 0.0), # Empirical deltas (insurance impact) "growth_impact": geo_with.get("geometric_mean", 0.0) - geo_without.get("geometric_mean", 0.0), "survival_benefit": geo_with.get("survival_rate", 0.0) - geo_without.get("survival_rate", 0.0), "volatility_reduction": geo_without.get("std", 0.0) - geo_with.get("std", 0.0), } logger.info( f"Empirical insurance impact: growth={ergodic_analysis['growth_impact']:.2%}, " f"survival={ergodic_analysis['survival_benefit']:.2%}, " f"volatility_reduction={ergodic_analysis['volatility_reduction']:.2%}" ) return { "results_with_insurance": results_with_insurance, "results_without_insurance": results_without_insurance, "ergodic_analysis": ergodic_analysis, } # Fallback if statistics not available return { "results_with_insurance": results_with_insurance, "results_without_insurance": results_without_insurance, }
[docs] @classmethod def compare_insurance_strategies( cls, config: Config, insurance_policies: Dict[str, InsurancePolicy], n_scenarios: int = 1000, n_jobs: int = 7, seed: Optional[int] = None, ) -> pd.DataFrame: """Compare multiple insurance strategies via Monte Carlo. Args: config: Configuration object. insurance_policies: Dictionary of policy name to InsurancePolicy. n_scenarios: Scenarios per policy. n_jobs: Number of parallel jobs. seed: Random seed. Returns: DataFrame comparing results across strategies. """ results = [] for policy_name, policy in insurance_policies.items(): logger.info(f"Running Monte Carlo for policy: {policy_name}") # Run Monte Carlo mc_results = cls.run_monte_carlo( config=config, insurance_policy=policy, n_scenarios=n_scenarios, n_jobs=n_jobs, seed=seed, checkpoint_frequency=n_scenarios + 1, # Don't checkpoint for comparisons resume=False, ) # Extract key metrics from the SimulationResults object sim_results = mc_results["results_with_insurance"] final_assets = sim_results.final_assets growth_rates = sim_results.growth_rates survival_rate = float(np.mean(final_assets > 0)) if len(final_assets) > 0 else 0.0 mean_final = float(np.mean(final_assets)) if len(final_assets) > 0 else 0.0 std_final = float(np.std(final_assets, ddof=1)) if len(final_assets) > 1 else 0.0 positive_rates = growth_rates[growth_rates > 0] geo_mean = ( float(np.exp(np.mean(np.log(positive_rates))) - 1) if len(positive_rates) > 0 else 0.0 ) arith_mean = float(np.mean(growth_rates)) if len(growth_rates) > 0 else 0.0 p95 = float(np.percentile(final_assets, 95)) if len(final_assets) > 0 else 0.0 p99 = float(np.percentile(final_assets, 99)) if len(final_assets) > 0 else 0.0 result_row = { "policy": policy_name, "annual_premium": policy.calculate_premium(), "total_coverage": policy.get_total_coverage(), "survival_rate": survival_rate, "mean_final_equity": mean_final, "std_final_equity": std_final, "geometric_return": geo_mean, "arithmetic_return": arith_mean, "p95_final_equity": p95, "p99_final_equity": p99, } results.append(result_row) comparison_df = pd.DataFrame(results) # Add relative metrics if len(comparison_df) > 0: comparison_df["premium_to_coverage"] = ( comparison_df["annual_premium"] / comparison_df["total_coverage"] ) comparison_df["sharpe_ratio"] = ( comparison_df["arithmetic_return"] / comparison_df["std_final_equity"] ) return comparison_df