Source code for ergodic_insurance.monte_carlo

"""High-performance Monte Carlo simulation engine for insurance optimization."""

from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass, replace
import hashlib
import logging
import os
from pathlib import Path
import pickle
import threading
import time
from typing import Any, Callable, Dict, List, Optional, Tuple
import warnings

logger = logging.getLogger(__name__)

import numpy as np
from tqdm import tqdm

from ._warnings import ErgodicInsuranceDeprecationWarning
from .convergence import ConvergenceDiagnostics, ConvergenceStats
from .insurance_program import InsuranceProgram
from .loss_distributions import ManufacturingLossGenerator
from .manufacturer import WidgetManufacturer
from .monte_carlo_worker import run_chunk_standalone
from .parallel_executor import (
    ChunkingStrategy,
    ParallelExecutor,
    PerformanceMetrics,
    SharedMemoryConfig,
)
from .progress_monitor import ProgressMonitor
from .result_aggregator import (
    AggregationConfig,
    ResultAggregator,
    ResultExporter,
    TimeSeriesAggregator,
)
from .risk_metrics import RiskMetrics
from .ruin_probability import RuinProbabilityAnalyzer, RuinProbabilityConfig, RuinProbabilityResults
from .safe_pickle import safe_dump, safe_load
from .summary_statistics import SummaryReportGenerator, SummaryStatistics
from .trajectory_storage import StorageConfig, TrajectoryStorage


def _create_manufacturer(config_dict: Dict[str, Any]) -> "WidgetManufacturer":
    """Create a fresh manufacturer instance from a config dictionary.

    Uses :meth:`WidgetManufacturer.create_fresh` so that construction always
    goes through ``__init__`` validation.  The previous implementation used
    ``__new__`` with an unchecked ``setattr`` loop, which allowed arbitrary
    attribute injection (see issue #886).

    Args:
        config_dict: Dictionary that **must** contain a ``"config"`` key
            holding a :class:`ManufacturerConfig` instance.  An optional
            ``"stochastic_process"`` key is forwarded to the factory method.

    Returns:
        A validated :class:`WidgetManufacturer` in its initial state.

    Raises:
        TypeError: If ``config_dict["config"]`` is missing or is not a
            proper configuration object.
    """
    config = config_dict.get("config")
    if config is None or not hasattr(config, "__dict__"):
        raise TypeError(
            "_create_manufacturer requires config_dict['config'] to be a "
            "ManufacturerConfig instance, got "
            f"{type(config).__name__ if config is not None else 'None'}"
        )
    stochastic_process = config_dict.get("stochastic_process")
    return WidgetManufacturer.create_fresh(config, stochastic_process)


def _simulate_year_losses(sim_id: int, year: int) -> Tuple[float, float, float]:
    """Simulate losses for a single year.

    .. deprecated::
        This stub is unused. Enhanced parallel paths now use the configured
        loss generator passed via shared data. Retained only for backward
        compatibility with any external callers.
    """
    rng = np.random.default_rng(sim_id * 1000 + year)
    n_events = rng.poisson(3)

    if n_events == 0:
        total_loss = 0.0
    else:
        event_amounts = rng.lognormal(10, 2, n_events)
        total_loss = float(np.sum(event_amounts))

    recovery = min(total_loss, 1_000_000) * 0.9
    retained = total_loss - recovery

    return total_loss, recovery, retained


def _test_worker_function() -> bool:
    """Test function to check if multiprocessing works with scipy imports.

    Returns:
        True if the worker can execute successfully
    """
    try:
        # Try importing scipy to check if it causes issues
        from scipy import stats  # noqa: F401  # pylint: disable=unused-import

        # Test numpy is available (already imported at module level)
        _ = np.array([1, 2, 3])

        return True
    except ImportError:
        return False


# Module-level cache for the multiprocessing probe test (Issue #1385).
# Avoids spawning a throwaway ProcessPoolExecutor on every call to
# _run_enhanced_parallel().  The result is tri-state: None = not yet tested,
# True = works, False = failed.
_mp_probe_result: Optional[bool] = None


def _simulate_path_enhanced(sim_id: int, **shared) -> Dict[str, Any]:
    """Enhanced simulation function for parallel execution.

    Module-level function for pickle compatibility in multiprocessing.
    Uses the configured loss generator and insurance program passed via
    shared data instead of a hardcoded stub (see issue #299).

    Args:
        sim_id: Simulation ID for seeding
        **shared: Shared data from parallel executor

    Returns:
        Dict with simulation results
    """
    import sys

    module_path = Path(__file__).parent.parent.parent
    if str(module_path) not in sys.path:
        sys.path.insert(0, str(module_path))

    # Create manufacturer instance
    manufacturer = _create_manufacturer(shared["manufacturer_config"])

    # Create a fresh insurance program so each simulation gets clean state
    # (Issue #348).  Uses create_fresh() instead of copy.deepcopy() to avoid
    # expensive object-graph traversal — see Issue #1385.
    loss_generator = shared["loss_generator"]
    insurance_program = InsuranceProgram.create_fresh(shared["insurance_program"])

    # Re-seed the loss generator for this simulation to ensure independence
    base_seed = shared.get("base_seed")
    if base_seed is not None:
        loss_generator.reseed(base_seed + sim_id)

    # Initialize simulation arrays
    n_years = shared["n_years"]
    dtype = np.float32 if shared["use_float32"] else np.float64

    result_arrays = {
        "annual_losses": np.zeros(n_years, dtype=dtype),
        "insurance_recoveries": np.zeros(n_years, dtype=dtype),
        "retained_losses": np.zeros(n_years, dtype=dtype),
    }

    # Track ruin at evaluation points if requested
    ruin_evaluation = shared.get("ruin_evaluation", None)
    ruin_at_year = {}
    if ruin_evaluation:
        for eval_year in ruin_evaluation:
            if eval_year <= n_years:
                ruin_at_year[eval_year] = False

    # CRN: look up once before the loop
    crn_base_seed = shared.get("crn_base_seed")

    # Simulate years using the configured loss generator
    for year in range(n_years):
        # Reset insurance program aggregate limits at start of each policy year (Issue #348)
        if year > 0:
            insurance_program.reset_annual()

        # CRN: reseed per (sim_id, year) for reproducible cross-scenario comparison
        if crn_base_seed is not None:
            year_ss = np.random.SeedSequence([crn_base_seed, sim_id, year])
            children = year_ss.spawn(2)
            loss_generator.reseed(int(children[0].generate_state(1)[0]))
            if manufacturer.stochastic_process is not None:
                manufacturer.stochastic_process.reset(int(children[1].generate_state(1)[0]))

        revenue = manufacturer.calculate_revenue()

        # Generate losses using configured loss generator
        if hasattr(loss_generator, "generate_losses"):
            year_losses, _ = loss_generator.generate_losses(duration=1.0, revenue=float(revenue))
        else:
            raise AttributeError(
                f"Loss generator {type(loss_generator).__name__} has no generate_losses method"
            )

        total_loss = sum(loss.amount for loss in year_losses)
        result_arrays["annual_losses"][year] = total_loss

        # Apply insurance PER OCCURRENCE (not aggregate) and create
        # ClaimLiability objects with LoC collateral (Issue #342).
        # Previously this path applied insurance to the aggregate total_loss,
        # which mis-applied per-occurrence deductibles.
        total_recovery = 0.0
        total_retained = 0.0

        for loss_event in year_losses:
            if loss_event.amount > 0:
                claim_result = insurance_program.process_claim(loss_event.amount)
                event_recovery = claim_result.insurance_recovery
                event_retained = loss_event.amount - event_recovery

                total_recovery += event_recovery
                total_retained += event_retained

                # Create ClaimLiability and post collateral for the retained
                # portion. See Issue #342 for double-counting rationale.
                if event_retained > 0:
                    manufacturer.process_insurance_claim(
                        claim_amount=loss_event.amount,
                        insurance_recovery=event_recovery,
                    )

        result_arrays["insurance_recoveries"][year] = total_recovery
        result_arrays["retained_losses"][year] = total_retained

        # Record insurance premium scaled by revenue (Issue #349)
        current_revenue = manufacturer.calculate_revenue()
        base_revenue = float(
            manufacturer.config.initial_assets * manufacturer.config.asset_turnover_ratio
        )
        revenue_scaling_factor = float(current_revenue) / base_revenue if base_revenue > 0 else 1.0
        base_annual_premium = insurance_program.calculate_premium()
        annual_premium = base_annual_premium * revenue_scaling_factor
        if annual_premium > 0:
            manufacturer.record_insurance_premium(annual_premium)

        # Step with config parameters (Issue #349)
        manufacturer.step(
            letter_of_credit_rate=shared.get("letter_of_credit_rate", 0.015),
            growth_rate=shared.get("growth_rate", 0.0),
            time_resolution=shared.get("time_resolution", "annual"),
            apply_stochastic=shared.get("apply_stochastic", False),
        )

        # Prune old ledger entries to bound memory (Issue #315)
        if shared.get("enable_ledger_pruning", False) and year > 0:
            manufacturer.ledger.prune_entries(before_date=year)

        # Check for ruin - use insolvency tolerance from shared config
        tolerance = shared.get("insolvency_tolerance", 10_000)
        if float(manufacturer.equity) <= tolerance or manufacturer.is_ruined:
            # Mark ruin for all future evaluation points
            if ruin_evaluation:
                for eval_year in ruin_at_year:
                    if year < eval_year:
                        ruin_at_year[eval_year] = True
            break

    result = {
        "final_assets": float(manufacturer.total_assets),
        "final_equity": float(manufacturer.equity),
        **result_arrays,
    }
    if ruin_evaluation:
        result["ruin_at_year"] = ruin_at_year
    return result


[docs] @dataclass class MonteCarloConfig: """Configuration for Monte Carlo simulation. Attributes: n_simulations: Number of simulation paths n_years: Number of years per simulation n_chains: Number of chains (retained for backward compatibility; not used for IID Monte Carlo convergence diagnostics — see issue #1353) convergence_mcse_threshold: Maximum relative MCSE (MCSE / |mean|) for declaring convergence. Default 0.01 (1% of mean). parallel: Whether to use multiprocessing n_workers: Number of parallel workers (None for auto) chunk_size: Size of chunks for parallel processing use_float32: Use float32 for memory efficiency cache_results: Cache intermediate results checkpoint_interval: Save checkpoint every N simulations progress_bar: Show progress bar seed: Random seed for reproducibility use_enhanced_parallel: Use enhanced parallel executor for better performance monitor_performance: Track detailed performance metrics adaptive_chunking: Enable adaptive chunk sizing shared_memory: Enable shared memory for read-only data letter_of_credit_rate: Annual LoC rate for collateral costs (default 1.5%) growth_rate: Revenue growth rate per period (default 0.0) time_resolution: Time step resolution, "annual" or "monthly" (default "annual") apply_stochastic: Whether to apply stochastic shocks (default False) enable_ledger_pruning: Prune old ledger entries each year to bound memory (default False) crn_base_seed: Common Random Numbers base seed for cross-scenario comparison. When set, the loss generator and stochastic process are reseeded at each (sim_id, year) boundary using SeedSequence([crn_base_seed, sim_id, year]). This ensures that compared scenarios (e.g. different deductibles) experience the same underlying random draws each year, dramatically reducing estimator variance for growth-lift metrics. (default None, disabled) """ n_simulations: int = 100_000 n_years: int = 10 n_chains: int = 4 convergence_mcse_threshold: float = 0.01 parallel: bool = True n_workers: Optional[int] = None chunk_size: int = 10_000 use_float32: bool = False cache_results: bool = True checkpoint_interval: Optional[int] = None progress_bar: bool = True seed: Optional[int] = None use_enhanced_parallel: bool = True monitor_performance: bool = True adaptive_chunking: bool = True shared_memory: bool = True # Trajectory storage options enable_trajectory_storage: bool = False trajectory_storage_config: Optional[StorageConfig] = None # Aggregation options enable_advanced_aggregation: bool = True aggregation_config: Optional[AggregationConfig] = None generate_summary_report: bool = False summary_report_format: str = "markdown" # Bootstrap confidence interval options compute_bootstrap_ci: bool = False bootstrap_confidence_level: float = 0.95 bootstrap_n_iterations: int = 10000 bootstrap_method: str = "percentile" # Periodic ruin evaluation options ruin_evaluation: Optional[List[int]] = None # Insolvency tolerance # NOTE: insolvency_tolerance is also referenced in _simulate_path_enhanced() # via shared["insolvency_tolerance"]. Keep defaults in sync. insolvency_tolerance: float = 10_000 # Company is insolvent when equity <= this threshold # Simulation step parameters (passed to manufacturer.step()) # NOTE: letter_of_credit_rate and growth_rate defaults are coupled with # _simulate_path_enhanced() shared-data defaults and run_chunk_standalone(). # If you change these defaults, update those functions accordingly. letter_of_credit_rate: float = 0.015 # Annual LoC rate for collateral costs growth_rate: float = 0.0 # Revenue growth rate per period time_resolution: str = "annual" # "annual" or "monthly" apply_stochastic: bool = False # Whether to apply stochastic shocks enable_ledger_pruning: bool = True # Prune old ledger entries to bound memory (Issue #315) crn_base_seed: Optional[int] = None # Common Random Numbers seed for cross-scenario comparison use_gpu: bool = False # Use GPU-accelerated vectorized simulation (Issue #961)
[docs] def __post_init__(self): """Validate configuration parameters. Raises: ValueError: If any parameter is out of its valid range. """ if self.n_simulations <= 0: raise ValueError( f"n_simulations must be positive, got {self.n_simulations}. " "Use at least 1000 for meaningful results." ) if self.n_years <= 0: raise ValueError( f"n_years must be positive, got {self.n_years}. " "Set to the number of years to simulate (e.g. 10)." ) if self.n_chains < 1: raise ValueError( f"n_chains must be >= 1, got {self.n_chains}. " "Use at least 2 chains for convergence diagnostics." ) if self.chunk_size <= 0: raise ValueError( f"chunk_size must be positive, got {self.chunk_size}. " "Typical values are 1000-10000." ) if not (0 < self.convergence_mcse_threshold < 1): raise ValueError( f"convergence_mcse_threshold must be between 0 and 1 (exclusive), " f"got {self.convergence_mcse_threshold}. " "Use 0.01 for 1% relative MCSE threshold." ) if not (0 < self.bootstrap_confidence_level < 1): raise ValueError( f"bootstrap_confidence_level must be between 0 and 1 (exclusive), " f"got {self.bootstrap_confidence_level}. Use e.g. 0.95 for a 95% CI." )
[docs] @dataclass class MonteCarloResults: """Results from Monte Carlo simulation. Attributes: final_assets: Final asset values for each simulation annual_losses: Annual loss amounts insurance_recoveries: Insurance recovery amounts retained_losses: Retained loss amounts growth_rates: Realized growth rates ruin_probability: Probability of ruin metrics: Risk metrics calculated from results convergence: Convergence statistics execution_time: Total execution time in seconds config: Simulation configuration used performance_metrics: Detailed performance metrics (if monitoring enabled) aggregated_results: Advanced aggregation results (if enabled) time_series_aggregation: Time series aggregation results (if enabled) statistical_summary: Complete statistical summary (if enabled) summary_report: Formatted summary report (if generated) bootstrap_confidence_intervals: Bootstrap confidence intervals for key metrics """ final_assets: np.ndarray annual_losses: np.ndarray insurance_recoveries: np.ndarray retained_losses: np.ndarray growth_rates: np.ndarray ruin_probability: Dict[str, float] metrics: Dict[str, float] convergence: Dict[str, ConvergenceStats] execution_time: float config: MonteCarloConfig performance_metrics: Optional[PerformanceMetrics] = None aggregated_results: Optional[Dict[str, Any]] = None time_series_aggregation: Optional[Dict[str, Any]] = None statistical_summary: Optional[Any] = None summary_report: Optional[str] = None bootstrap_confidence_intervals: Optional[Dict[str, Tuple[float, float]]] = None
[docs] def summary(self) -> str: """Generate summary of simulation results.""" # Format ruin probability section # ruin_probability is always a dict now # Sort by year for display ruin_prob_lines = [] for year_str in sorted(self.ruin_probability.keys(), key=int): prob = self.ruin_probability[year_str] ruin_prob_lines.append(f" Year {year_str}: {prob:.2%}") ruin_prob_section = "Ruin Probability:\n" + "\n".join(ruin_prob_lines) base_summary = ( f"Simulation Results Summary\n" f"{'='*50}\n" f"Simulations: {self.config.n_simulations:,}\n" f"Years: {self.config.n_years}\n" f"Execution Time: {self.execution_time:.2f}s\n" f"{ruin_prob_section}\n" f"Mean Final Assets: ${np.mean(self.final_assets):,.0f}\n" f"Mean Growth Rate: {np.mean(self.growth_rates):.4f}\n" f"VaR(99%): ${self.metrics.get('var_99', 0):,.0f}\n" f"TVaR(99%): ${self.metrics.get('tvar_99', 0):,.0f}\n" f"Convergence MCSE: " f"{self.convergence.get('growth_rate', ConvergenceStats(float('nan'),0,0,False,0,0)).mcse:.6f}\n" ) # Add performance metrics if available if self.performance_metrics: base_summary += f"\n{'='*50}\n" base_summary += self.performance_metrics.summary() # Add advanced aggregation results if available if self.aggregated_results: base_summary += f"\n{'='*50}\nAdvanced Aggregation Results:\n" if "percentiles" in self.aggregated_results: for p, val in self.aggregated_results["percentiles"].items(): base_summary += f" {p}: ${val:,.0f}\n" # Add bootstrap confidence intervals if available if self.bootstrap_confidence_intervals: base_summary += f"\n{'='*50}\nBootstrap Confidence Intervals (95%):\n" for metric_name, (lower, upper) in self.bootstrap_confidence_intervals.items(): if ( "assets" in metric_name.lower() or "var" in metric_name.lower() or "tvar" in metric_name.lower() ): # Format as currency for asset-related metrics base_summary += f" {metric_name}: [${lower:,.0f}, ${upper:,.0f}]\n" elif "probability" in metric_name.lower() or "rate" in metric_name.lower(): # Format as percentage for rates and probabilities base_summary += f" {metric_name}: [{lower:.2%}, {upper:.2%}]\n" else: # Default formatting base_summary += f" {metric_name}: [{lower:.4f}, {upper:.4f}]\n" # Add summary report if available if self.summary_report: base_summary += f"\n{'='*50}\n{self.summary_report}" return base_summary
[docs] class MonteCarloEngine: """High-performance Monte Carlo simulation engine for insurance analysis. Provides efficient Monte Carlo simulation with support for parallel processing, convergence monitoring, checkpointing, and comprehensive result aggregation. Optimized for both high-end and budget hardware configurations. Examples: Basic Monte Carlo simulation:: from .monte_carlo import MonteCarloEngine, MonteCarloConfig from .loss_distributions import ManufacturingLossGenerator from .insurance_program import InsuranceProgram from .manufacturer import WidgetManufacturer # Configure simulation config = MonteCarloConfig( n_simulations=10000, n_years=20, parallel=True, n_workers=4 ) # Create components loss_gen = ManufacturingLossGenerator() insurance = InsuranceProgram.create_standard_program() manufacturer = WidgetManufacturer.from_config() # Run Monte Carlo engine = MonteCarloEngine( loss_generator=loss_gen, insurance_program=insurance, manufacturer=manufacturer, config=config ) results = engine.run() print(f"Survival rate: {results.survival_rate:.1%}") print(f"Mean ROE: {results.mean_roe:.2%}") Advanced simulation with convergence monitoring:: # Enable convergence checking config = MonteCarloConfig( n_simulations=100000, check_convergence=True, convergence_tolerance=0.001, min_iterations=1000 ) engine = MonteCarloEngine( loss_generator=loss_gen, insurance_program=insurance, manufacturer=manufacturer, config=config ) # Run with progress tracking results = engine.run(show_progress=True) # Check convergence if results.converged: print(f"Converged after {results.iterations} iterations") print(f"Standard error: {results.standard_error:.4f}") Attributes: loss_generator: Generator for manufacturing loss events insurance_program: Insurance coverage structure manufacturer: Manufacturing company financial model config: Simulation configuration parameters convergence_diagnostics: Convergence monitoring tools See Also: :class:`MonteCarloConfig`: Configuration parameters :class:`MonteCarloResults`: Simulation results container :class:`ParallelExecutor`: Enhanced parallel processing :class:`ConvergenceDiagnostics`: Convergence analysis tools """ def __init__( self, loss_generator: ManufacturingLossGenerator, insurance_program: InsuranceProgram, manufacturer: WidgetManufacturer, config: Optional[MonteCarloConfig] = None, ): """Initialize Monte Carlo engine. Args: loss_generator: Generator for loss events insurance_program: Insurance program structure manufacturer: Manufacturing company model config: Simulation configuration """ self.loss_generator = loss_generator self.insurance_program = insurance_program self.manufacturer = manufacturer self.config = config or MonteCarloConfig() # Set up convergence diagnostics self.convergence_diagnostics = ConvergenceDiagnostics() # Cache directory for checkpoints self.cache_dir = Path("cache/monte_carlo") if self.config.cache_results: self.cache_dir.mkdir(parents=True, exist_ok=True) # NOTE: Global np.random.seed() was removed here (issue #299). # Setting the global seed leaks side effects to other code and does not # affect the loss generator's per-instance RandomState objects. # Per-chunk / per-simulation seeding is handled in the worker functions. # Determine number of workers if self.config.n_workers is None and self.config.parallel: self.config.n_workers = min(os.cpu_count() or 4, 8) # Initialize enhanced parallel executor if enabled self.parallel_executor = None if self.config.use_enhanced_parallel and self.config.parallel: chunking_strategy = ChunkingStrategy( initial_chunk_size=self.config.chunk_size, adaptive=self.config.adaptive_chunking, ) shared_memory_config = SharedMemoryConfig( enable_shared_arrays=self.config.shared_memory, enable_shared_objects=self.config.shared_memory, skip_hmac=True, # ephemeral in-process data — HMAC unnecessary (#1385) ) self.parallel_executor = ParallelExecutor( n_workers=self.config.n_workers, chunking_strategy=chunking_strategy, shared_memory_config=shared_memory_config, monitor_performance=self.config.monitor_performance, ) # Initialize trajectory storage if enabled self.trajectory_storage: Optional[TrajectoryStorage] = None if self.config.enable_trajectory_storage: storage_config = self.config.trajectory_storage_config or StorageConfig() self.trajectory_storage = TrajectoryStorage(storage_config) # Initialize aggregators if enabled self.result_aggregator: Optional[ResultAggregator] = None self.time_series_aggregator: Optional[TimeSeriesAggregator] = None self.summary_statistics: Optional[SummaryStatistics] = None if self.config.enable_advanced_aggregation: agg_config = self.config.aggregation_config or AggregationConfig() self.result_aggregator = ResultAggregator(agg_config) self.time_series_aggregator = TimeSeriesAggregator(agg_config) self.summary_statistics = SummaryStatistics()
[docs] def run( self, progress_callback: Optional[Callable[[int, int, float], None]] = None, cancel_event: Optional[threading.Event] = None, ) -> MonteCarloResults: """Execute Monte Carlo simulation. Args: progress_callback: Optional callback invoked with ``(completed, total, elapsed_seconds)`` after each batch of simulations completes. Useful for GUI progress bars, web dashboards, or any non-terminal environment. cancel_event: Optional :class:`threading.Event`. When set, the engine will stop after the current batch and return partial results. Returns: MonteCarloResults object with all outputs """ start_time = time.time() # Check for cached results cache_key = self._get_cache_key() if self.config.cache_results: cached = self._load_cache(cache_key) if cached is not None: logger.info("Loaded cached results") return cached # Reinitialize parallel executor if config has changed if self.config.use_enhanced_parallel and self.config.parallel: if self.parallel_executor is None or ( self.parallel_executor and self.parallel_executor.n_workers != self.config.n_workers ): # Need to reinitialize with new worker count chunking_strategy = ChunkingStrategy( initial_chunk_size=self.config.chunk_size, adaptive=self.config.adaptive_chunking, ) shared_memory_config = SharedMemoryConfig( enable_shared_arrays=self.config.shared_memory, enable_shared_objects=self.config.shared_memory, skip_hmac=True, # ephemeral in-process data — HMAC unnecessary (#1385) ) self.parallel_executor = ParallelExecutor( n_workers=self.config.n_workers, chunking_strategy=chunking_strategy, shared_memory_config=shared_memory_config, monitor_performance=self.config.monitor_performance, ) # Run simulation with appropriate executor if self.config.use_gpu: results = self._run_gpu( progress_callback=progress_callback, cancel_event=cancel_event, ) elif self.config.parallel: if self.config.use_enhanced_parallel and self.parallel_executor: results = self._run_enhanced_parallel( progress_callback=progress_callback, cancel_event=cancel_event, ) else: results = self._run_parallel( progress_callback=progress_callback, cancel_event=cancel_event, ) else: results = self._run_sequential( progress_callback=progress_callback, cancel_event=cancel_event, ) # Calculate metrics results.metrics = self._calculate_metrics(results) # Check convergence results.convergence = self._check_convergence(results) # Perform advanced aggregation if enabled if self.config.enable_advanced_aggregation: results = self._perform_advanced_aggregation(results) # Set execution time results.execution_time = time.time() - start_time # Add performance metrics if monitoring is enabled if self.config.monitor_performance: if self.config.use_enhanced_parallel and self.parallel_executor: results.performance_metrics = self.parallel_executor.performance_metrics else: # Create basic performance metrics for non-enhanced execution execution_time = time.time() - start_time # Estimate memory usage based on simulation size (basic approximation) memory_estimate = ( self.config.n_simulations * self.config.n_years * 8 * 4 ) # 4 arrays of 8-byte floats results.performance_metrics = PerformanceMetrics( total_time=execution_time, setup_time=0.0, computation_time=execution_time, serialization_time=0.0, reduction_time=0.0, memory_peak=memory_estimate, cpu_utilization=0.0, items_per_second=( self.config.n_simulations / execution_time if execution_time > 0 else 0.0 ), speedup=1.0, ) # Compute bootstrap confidence intervals if requested if self.config.compute_bootstrap_ci: results.bootstrap_confidence_intervals = self.compute_bootstrap_confidence_intervals( results, confidence_level=self.config.bootstrap_confidence_level, n_bootstrap=self.config.bootstrap_n_iterations, method=self.config.bootstrap_method, show_progress=self.config.progress_bar, ) # Cache results if self.config.cache_results: self._save_cache(cache_key, results) return results
def _run_sequential( self, progress_callback: Optional[Callable[[int, int, float], None]] = None, cancel_event: Optional[threading.Event] = None, ) -> MonteCarloResults: """Run simulation sequentially.""" n_sims = self.config.n_simulations n_years = self.config.n_years dtype = np.float32 if self.config.use_float32 else np.float64 # Pre-allocate arrays final_assets = np.zeros(n_sims, dtype=dtype) final_equity = np.zeros(n_sims, dtype=dtype) annual_losses = np.zeros((n_sims, n_years), dtype=dtype) insurance_recoveries = np.zeros((n_sims, n_years), dtype=dtype) retained_losses = np.zeros((n_sims, n_years), dtype=dtype) # Track periodic ruin if requested ruin_at_year_all = [] # Progress bar iterator = range(n_sims) if self.config.progress_bar: iterator = tqdm(iterator, desc="Running simulations") # How often to fire the progress callback / check cancellation callback_interval = max(1, n_sims // 100) seq_start = time.time() # Run simulations completed = 0 for i in iterator: # Check for cancellation if cancel_event is not None and i % callback_interval == 0 and cancel_event.is_set(): logger.info("Cancellation requested at simulation %d/%d", i, n_sims) break sim_results = self._run_single_simulation(i) final_assets[i] = sim_results["final_assets"] final_equity[i] = sim_results["final_equity"] annual_losses[i] = sim_results["annual_losses"] insurance_recoveries[i] = sim_results["insurance_recoveries"] retained_losses[i] = sim_results["retained_losses"] # Collect periodic ruin data if self.config.ruin_evaluation: ruin_at_year_all.append(sim_results["ruin_at_year"]) completed = i + 1 # Fire progress callback if progress_callback is not None and completed % callback_interval == 0: progress_callback(completed, n_sims, time.time() - seq_start) # Checkpoint if needed if ( self.config.checkpoint_interval and i > 0 and i % self.config.checkpoint_interval == 0 ): self._save_checkpoint( i, final_assets[: i + 1], annual_losses[: i + 1], insurance_recoveries[: i + 1], retained_losses[: i + 1], ) # Fire final callback so callers always see (total, total, ...) if progress_callback is not None and completed > 0 and completed != n_sims: progress_callback(completed, n_sims, time.time() - seq_start) # Truncate arrays if cancelled early if completed < n_sims: final_assets = final_assets[:completed] final_equity = final_equity[:completed] annual_losses = annual_losses[:completed] insurance_recoveries = insurance_recoveries[:completed] retained_losses = retained_losses[:completed] n_sims = completed # Calculate growth rates using equity (#355: total assets includes # liabilities, giving misleading growth when leverage changes) growth_rates = self._calculate_growth_rates(final_equity) # Calculate ruin probability ruin_probability = {} if self.config.ruin_evaluation: # Aggregate periodic ruin probabilities for eval_year in self.config.ruin_evaluation: if eval_year <= n_years: ruin_count = sum(r.get(eval_year, False) for r in ruin_at_year_all) ruin_probability[str(eval_year)] = ruin_count / n_sims # Always add final ruin probability (at max runtime) # Count both: companies with low final assets AND companies marked as ruined earlier if self.config.ruin_evaluation or ruin_at_year_all: # Count from ruin_at_year tracking (includes early bankruptcies) final_ruin_count = sum(r.get(n_years, False) for r in ruin_at_year_all) else: # Fallback to equity check if no tracking final_ruin_count = np.sum(np.less_equal(final_assets, self.config.insolvency_tolerance)) ruin_probability[str(n_years)] = float(final_ruin_count / n_sims) return MonteCarloResults( final_assets=final_assets, annual_losses=annual_losses, insurance_recoveries=insurance_recoveries, retained_losses=retained_losses, growth_rates=growth_rates, ruin_probability=ruin_probability, metrics={}, convergence={}, execution_time=0, config=self.config, ) def _run_parallel( self, progress_callback: Optional[Callable[[int, int, float], None]] = None, cancel_event: Optional[threading.Event] = None, ) -> MonteCarloResults: """Run simulation in parallel using multiprocessing.""" # Check if we're on Windows and have scipy import issues # Fall back to sequential execution in these cases try: # Test if we can import scipy successfully (needed for loss distributions) from scipy import stats # noqa: F401 # pylint: disable=unused-import except (ImportError, TypeError) as e: logger.warning( "Scipy import failed in parallel mode: %s. " "Falling back to sequential execution for reliability.", e, ) return self._run_sequential( progress_callback=progress_callback, cancel_event=cancel_event ) n_sims = self.config.n_simulations n_workers = self.config.n_workers chunk_size = self.config.chunk_size # Create chunks chunks = [] for i in range(0, n_sims, chunk_size): chunk_end = min(i + chunk_size, n_sims) chunk_seed = None if self.config.seed is None else self.config.seed + i chunks.append((i, chunk_end, chunk_seed)) # Prepare config dictionary for the standalone function config_dict = { "n_years": self.config.n_years, "use_float32": self.config.use_float32, "ruin_evaluation": self.config.ruin_evaluation, "insolvency_tolerance": self.config.insolvency_tolerance, "letter_of_credit_rate": self.config.letter_of_credit_rate, "growth_rate": self.config.growth_rate, "time_resolution": self.config.time_resolution, "apply_stochastic": self.config.apply_stochastic, } # Run chunks in parallel using standalone function all_results = [] par_start = time.time() completed_sims = 0 try: with ProcessPoolExecutor(max_workers=n_workers) as executor: # Submit all tasks using the standalone function futures = { executor.submit( run_chunk_standalone, chunk, self.loss_generator, self.insurance_program, self.manufacturer, config_dict, ): chunk for chunk in chunks } # Process completed tasks if self.config.progress_bar: pbar = tqdm(total=len(chunks), desc="Processing chunks") for future in as_completed(futures): # Check for cancellation before processing next result if cancel_event is not None and cancel_event.is_set(): logger.info("Cancellation requested during parallel execution") # Cancel remaining futures for f in futures: f.cancel() break chunk_results = future.result() all_results.append(chunk_results) # Track completed simulations for callback chunk_info = futures[future] completed_sims += chunk_info[1] - chunk_info[0] if self.config.progress_bar: pbar.update(1) if progress_callback is not None: progress_callback(completed_sims, n_sims, time.time() - par_start) if self.config.progress_bar: pbar.close() except (OSError, RuntimeError, ValueError, ImportError) as e: logger.warning( "Parallel execution failed: %s. Falling back to sequential execution.", e, ) return self._run_sequential( progress_callback=progress_callback, cancel_event=cancel_event ) # Combine results return self._combine_chunk_results(all_results) def _run_enhanced_parallel( self, progress_callback: Optional[Callable[[int, int, float], None]] = None, cancel_event: Optional[threading.Event] = None, ) -> MonteCarloResults: """Run simulation using enhanced parallel executor. Uses CPU-optimized parallel execution with shared memory and intelligent chunking for better performance on budget hardware. """ n_sims = self.config.n_simulations # Ensure parallel executor is available assert self.parallel_executor is not None, "Enhanced parallel executor not initialized" # Check if we can safely use enhanced parallel execution. # Cache the result so we only pay process-spawn cost once (Issue #1385). global _mp_probe_result # noqa: PLW0603 if _mp_probe_result is None: try: import multiprocessing as mp with ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context()) as executor: future = executor.submit(_test_worker_function) result = future.result(timeout=5) if not result: raise RuntimeError("Worker test failed") _mp_probe_result = True except (ImportError, RuntimeError, TimeoutError) as e: _mp_probe_result = False logger.warning( "Enhanced parallel execution failed: %s. " "Falling back to standard parallel execution.", e, ) if not _mp_probe_result: return self._run_parallel( progress_callback=progress_callback, cancel_event=cancel_event ) # Prepare shared data including the actual loss generator and insurance # program so _simulate_path_enhanced uses the configured model (issue #299). shared_data = { "n_years": self.config.n_years, "use_float32": self.config.use_float32, "ruin_evaluation": self.config.ruin_evaluation, "insolvency_tolerance": self.config.insolvency_tolerance, "enable_ledger_pruning": self.config.enable_ledger_pruning, "manufacturer_config": { "config": self.manufacturer.config, "stochastic_process": self.manufacturer.stochastic_process, }, "loss_generator": self.loss_generator, "insurance_program": self.insurance_program, "base_seed": self.config.seed, "crn_base_seed": self.config.crn_base_seed, # Step parameters for manufacturer.step() (Issue #349) "letter_of_credit_rate": self.config.letter_of_credit_rate, "growth_rate": self.config.growth_rate, "time_resolution": self.config.time_resolution, "apply_stochastic": self.config.apply_stochastic, } # Define reduce function def combine_results_enhanced(chunk_results): """Combine results from enhanced parallel execution.""" # Flatten list of lists all_results = [] for chunk in chunk_results: all_results.extend(chunk) # Extract arrays n_results = len(all_results) n_years = self.config.n_years dtype = np.float32 if self.config.use_float32 else np.float64 final_assets = np.zeros(n_results, dtype=dtype) final_equity = np.zeros(n_results, dtype=dtype) annual_losses = np.zeros((n_results, n_years), dtype=dtype) insurance_recoveries = np.zeros((n_results, n_years), dtype=dtype) retained_losses = np.zeros((n_results, n_years), dtype=dtype) # Track periodic ruin if requested ruin_at_year_all = [] valid_idx = 0 for result in all_results: # Skip None results from failed simulations if result is None: continue # Ensure result is a dictionary with expected keys if isinstance(result, dict) and "final_assets" in result: final_assets[valid_idx] = result["final_assets"] final_equity[valid_idx] = result.get( "final_equity", result["final_assets"] ) # type: ignore[assignment] annual_losses[valid_idx] = result["annual_losses"] insurance_recoveries[valid_idx] = result["insurance_recoveries"] retained_losses[valid_idx] = result["retained_losses"] # Collect periodic ruin data if present if "ruin_at_year" in result: ruin_at_year_all.append(result["ruin_at_year"]) valid_idx += 1 else: logger.warning("Unexpected result format: %s", type(result)) # Trim arrays to only valid results if valid_idx < n_results: final_assets = final_assets[:valid_idx] final_equity = final_equity[:valid_idx] annual_losses = annual_losses[:valid_idx] insurance_recoveries = insurance_recoveries[:valid_idx] retained_losses = retained_losses[:valid_idx] # Calculate derived metrics using equity (#355) growth_rates = self._calculate_growth_rates(final_equity) # Calculate ruin probability ruin_probability = {} total_simulations = len(final_assets) # Guard against division by zero when no valid results if total_simulations == 0: logger.warning( "No valid simulation results from parallel execution. " "Falling back to sequential execution.", ) return self._run_sequential() if self.config.ruin_evaluation and ruin_at_year_all: # Aggregate periodic ruin probabilities for eval_year in self.config.ruin_evaluation: if eval_year <= n_years: ruin_count = sum(r.get(eval_year, False) for r in ruin_at_year_all) ruin_probability[str(eval_year)] = ruin_count / total_simulations # Always add final ruin probability (at max runtime) # Count both: companies with low final assets AND companies marked as ruined earlier if self.config.ruin_evaluation and ruin_at_year_all: # Count from ruin_at_year tracking (includes early bankruptcies) final_ruin_count = sum(r.get(n_years, False) for r in ruin_at_year_all) else: # Fallback to equity check if no tracking final_ruin_count = np.sum( np.less_equal(final_assets, self.config.insolvency_tolerance) ) ruin_probability[str(n_years)] = float(final_ruin_count / total_simulations) return MonteCarloResults( final_assets=final_assets, annual_losses=annual_losses, insurance_recoveries=insurance_recoveries, retained_losses=retained_losses, growth_rates=growth_rates, ruin_probability=ruin_probability, metrics={}, convergence={}, execution_time=0, config=self.config, performance_metrics=None, ) # Execute using enhanced parallel executor results = self.parallel_executor.map_reduce( work_function=_simulate_path_enhanced, work_items=range(n_sims), reduce_function=combine_results_enhanced, shared_data=shared_data, progress_bar=self.config.progress_bar, progress_callback=progress_callback, cancel_event=cancel_event, ) # Add performance metrics from executor if self.config.monitor_performance and self.parallel_executor.performance_metrics: results.performance_metrics = self.parallel_executor.performance_metrics return results # type: ignore[no-any-return] def _run_gpu( self, progress_callback: Optional[Callable[[int, int, float], None]] = None, cancel_event: Optional[threading.Event] = None, ) -> MonteCarloResults: """Run simulation using GPU-accelerated vectorized engine. Falls back to parallel CPU execution if CuPy is not available. """ from .gpu_backend import is_gpu_available from .gpu_mc_engine import extract_params, run_gpu_simulation if not is_gpu_available(): logger.warning( "use_gpu=True but CuPy is not available. Falling back to CPU parallel.", ) return self._run_parallel( progress_callback=progress_callback, cancel_event=cancel_event ) # Extract flat parameters gpu_params = extract_params( self.manufacturer, self.insurance_program, self.loss_generator, self.config, ) # Run vectorized simulation raw_results = run_gpu_simulation( gpu_params, progress_callback=progress_callback, cancel_event=cancel_event, ) # Calculate derived metrics final_assets = raw_results["final_assets"] final_equity = raw_results["final_equity"] growth_rates = self._calculate_growth_rates(final_equity) n_sims = len(final_assets) ruin_probability = { str(self.config.n_years): ( float(np.mean(final_assets <= self.config.insolvency_tolerance)) if n_sims > 0 else 0.0 ) } return MonteCarloResults( final_assets=final_assets, annual_losses=raw_results["annual_losses"], insurance_recoveries=raw_results["insurance_recoveries"], retained_losses=raw_results["retained_losses"], growth_rates=growth_rates, ruin_probability=ruin_probability, metrics={}, convergence={}, execution_time=0, config=self.config, ) def _run_single_simulation(self, sim_id: int) -> Dict[str, Any]: """Run a single simulation path. Args: sim_id: Simulation identifier Returns: Dictionary with simulation results """ n_years = self.config.n_years dtype = np.float32 if self.config.use_float32 else np.float64 # Create a copy of manufacturer for this simulation manufacturer = self.manufacturer.copy() # Arrays to store results annual_losses = np.zeros(n_years, dtype=dtype) insurance_recoveries = np.zeros(n_years, dtype=dtype) retained_losses = np.zeros(n_years, dtype=dtype) # Track ruin at evaluation points ruin_at_year = {} if self.config.ruin_evaluation: for eval_year in self.config.ruin_evaluation: if eval_year <= n_years: # Only track if within simulation period ruin_at_year[eval_year] = False # Always track final year to ensure early bankruptcies propagate ruin_at_year[n_years] = False # Reset insurance program state for this simulation path (Issue #348) self.insurance_program.reset_annual() # Run simulation for each year for year in range(n_years): # Reset insurance program aggregate limits at start of each policy year (Issue #348) if year > 0: self.insurance_program.reset_annual() # CRN: reseed per (sim_id, year) for reproducible cross-scenario comparison if self.config.crn_base_seed is not None: year_ss = np.random.SeedSequence([self.config.crn_base_seed, sim_id, year]) children = year_ss.spawn(2) self.loss_generator.reseed(int(children[0].generate_state(1)[0])) if manufacturer.stochastic_process is not None: manufacturer.stochastic_process.reset(int(children[1].generate_state(1)[0])) # Generate losses using ManufacturingLossGenerator revenue = manufacturer.calculate_revenue() if hasattr(self.loss_generator, "generate_losses"): events, _ = self.loss_generator.generate_losses( duration=1.0, revenue=float(revenue) ) else: raise AttributeError( f"Loss generator {type(self.loss_generator).__name__} has no generate_losses method" ) # Calculate total loss total_loss = sum(event.amount for event in events) annual_losses[year] = total_loss # Apply insurance PER OCCURRENCE (not aggregate) and create # ClaimLiability objects with LoC collateral (Issue #342). total_recovery = 0.0 total_retained = 0.0 for event in events: # Process each event separately through insurance claim_result = self.insurance_program.process_claim(event.amount) event_recovery = claim_result.insurance_recovery event_retained = event.amount - event_recovery total_recovery += event_recovery total_retained += event_retained # Create ClaimLiability and post collateral for the retained # portion. The liability reduces equity via the balance sheet # (equity = assets - liabilities). We do NOT also call # record_insurance_loss() to avoid double-counting: the # liability already reduces equity, so reducing operating # income would penalise equity a second time. if event_retained > 0: manufacturer.process_insurance_claim( claim_amount=event.amount, insurance_recovery=event_recovery, ) insurance_recoveries[year] = total_recovery retained_losses[year] = total_retained # Calculate insurance premium scaled by revenue # Premium should scale with exposure (revenue) current_revenue = manufacturer.calculate_revenue() base_revenue = ( self.manufacturer.config.initial_assets * self.manufacturer.config.asset_turnover_ratio ) revenue_scaling_factor = ( float(current_revenue) / float(base_revenue) if base_revenue > 0 else 1.0 ) # Calculate scaled annual premium base_annual_premium = self.insurance_program.calculate_premium() annual_premium = base_annual_premium * revenue_scaling_factor if annual_premium > 0: manufacturer.record_insurance_premium(annual_premium) # Update manufacturer state with annual step # Apply stochastic if the manufacturer has a stochastic process apply_stochastic = ( manufacturer.stochastic_process is not None or self.config.apply_stochastic ) # Use config parameters instead of hardcoded values (Issue #349) manufacturer.step( letter_of_credit_rate=self.config.letter_of_credit_rate, growth_rate=self.config.growth_rate, time_resolution=self.config.time_resolution, apply_stochastic=apply_stochastic, ) # Prune old ledger entries to bound memory (Issue #315) if self.config.enable_ledger_pruning and year > 0: manufacturer.ledger.prune_entries(before_date=year) # Check for ruin using insolvency tolerance or is_ruined flag if ( float(manufacturer.equity) <= self.config.insolvency_tolerance or manufacturer.is_ruined ): # Mark ruin for all future evaluation points if self.config.ruin_evaluation: for eval_year in ruin_at_year: if year < eval_year: ruin_at_year[eval_year] = True ruin_occurred = True ruin_year = year break else: ruin_occurred = False ruin_year = None # Store trajectory if storage is enabled if self.trajectory_storage: self.trajectory_storage.store_simulation( sim_id=sim_id, annual_losses=annual_losses[: year + 1] if ruin_occurred else annual_losses, insurance_recoveries=( insurance_recoveries[: year + 1] if ruin_occurred else insurance_recoveries ), retained_losses=retained_losses[: year + 1] if ruin_occurred else retained_losses, final_assets=float(manufacturer.total_assets), initial_assets=float(self.manufacturer.total_assets), ruin_occurred=ruin_occurred, ruin_year=ruin_year, ) return { "final_assets": float(manufacturer.total_assets), "final_equity": float(manufacturer.equity), "annual_losses": annual_losses, "insurance_recoveries": insurance_recoveries, "retained_losses": retained_losses, "ruin_at_year": ruin_at_year, # New field for periodic ruin tracking } def _combine_chunk_results(self, chunk_results: List[Dict[str, Any]]) -> MonteCarloResults: """Combine results from parallel chunks. Args: chunk_results: List of chunk result dictionaries Returns: Combined MonteCarloResults """ # Handle empty chunk results if not chunk_results: ruin_probability = {} if self.config.ruin_evaluation: for eval_year in self.config.ruin_evaluation: if eval_year <= self.config.n_years: ruin_probability[str(eval_year)] = 0.0 ruin_probability[str(self.config.n_years)] = 0.0 return MonteCarloResults( final_assets=np.array([]), annual_losses=np.array([]).reshape(0, self.config.n_years), insurance_recoveries=np.array([]).reshape(0, self.config.n_years), retained_losses=np.array([]).reshape(0, self.config.n_years), growth_rates=np.array([]), ruin_probability=ruin_probability, metrics={}, # Empty metrics for empty simulation convergence={}, # Empty convergence for empty simulation execution_time=0.0, config=self.config, time_series_aggregation=None, ) # Concatenate arrays final_assets = np.concatenate([r["final_assets"] for r in chunk_results]) final_equity = np.concatenate( [r["final_equity"] for r in chunk_results] if all("final_equity" in r for r in chunk_results) else [r["final_assets"] for r in chunk_results] ) annual_losses = np.vstack([r["annual_losses"] for r in chunk_results]) insurance_recoveries = np.vstack([r["insurance_recoveries"] for r in chunk_results]) retained_losses = np.vstack([r["retained_losses"] for r in chunk_results]) # Calculate derived metrics using equity (#355) growth_rates = self._calculate_growth_rates(final_equity) # Aggregate periodic ruin probabilities ruin_probability = {} total_simulations = len(final_assets) if self.config.ruin_evaluation: for eval_year in self.config.ruin_evaluation: if eval_year <= self.config.n_years: # Count ruin occurrences for this evaluation year across all simulations ruin_count = 0 for chunk in chunk_results: if "ruin_at_year" in chunk: # ruin_at_year is a list of dictionaries, one for each simulation in the chunk for sim_ruin_data in chunk["ruin_at_year"]: if sim_ruin_data.get(eval_year, False): ruin_count += 1 ruin_probability[str(eval_year)] = ruin_count / total_simulations # Always add final ruin probability # Count both: companies with low final assets AND companies marked as ruined earlier if self.config.ruin_evaluation and chunk_results: # Count from ruin_at_year tracking (includes early bankruptcies) final_ruin_count = 0 for chunk in chunk_results: if "ruin_at_year" in chunk: for sim_ruin_data in chunk["ruin_at_year"]: if sim_ruin_data.get(self.config.n_years, False): final_ruin_count += 1 else: # Fallback to equity check if no tracking final_ruin_count = np.sum(np.less_equal(final_assets, self.config.insolvency_tolerance)) ruin_probability[str(self.config.n_years)] = float(final_ruin_count / total_simulations) return MonteCarloResults( final_assets=final_assets, annual_losses=annual_losses, insurance_recoveries=insurance_recoveries, retained_losses=retained_losses, growth_rates=growth_rates, ruin_probability=ruin_probability, metrics={}, convergence={}, execution_time=0, config=self.config, ) def _calculate_growth_rates(self, final_equity: np.ndarray) -> np.ndarray: """Calculate annualized growth rates based on equity. Uses equity rather than total assets so that growth reflects changes in owner value, consistent with how ruin detection uses equity (#355). Args: final_equity: Final equity values for each simulation Returns: Array of annualized log growth rates """ initial_equity = float(self.manufacturer.equity) n_years = self.config.n_years # Avoid division by zero and log of negative numbers valid_mask = (final_equity > 0) & (initial_equity > 0) growth_rates = np.zeros_like(final_equity, dtype=np.float64) if np.any(valid_mask): growth_rates[valid_mask] = np.log(final_equity[valid_mask] / initial_equity) / n_years return growth_rates def _calculate_metrics(self, results: MonteCarloResults) -> Dict[str, float]: """Calculate risk metrics from simulation results. Args: results: Simulation results Returns: Dictionary of risk metrics """ # Handle empty results gracefully if ( len(results.final_assets) == 0 or results.annual_losses is None or results.annual_losses.size == 0 ): # Return default metrics for empty simulations return { "mean_loss": 0.0, "median_loss": 0.0, "std_loss": 0.0, "var_95": 0.0, "var_99": 0.0, "var_995": 0.0, "tvar_95": 0.0, "tvar_99": 0.0, "tvar_995": 0.0, "expected_shortfall_99": 0.0, "max_loss": 0.0, "mean_recovery": 0.0, "mean_retained": 0.0, "mean_growth_rate": 0.0, "survival_rate": 1.0, "ruin_probability": 0.0, } # Total losses across all years total_losses = np.sum(results.annual_losses, axis=1) # Initialize risk metrics calculator risk_metrics = RiskMetrics(total_losses) metrics = { "mean_loss": np.mean(total_losses), "median_loss": np.median(total_losses), "std_loss": np.std(total_losses), "var_95": risk_metrics.var(0.95), "var_99": risk_metrics.var(0.99), "var_995": risk_metrics.var(0.995), "tvar_95": risk_metrics.tvar(0.95), "tvar_99": risk_metrics.tvar(0.99), "tvar_995": risk_metrics.tvar(0.995), "expected_shortfall_99": risk_metrics.expected_shortfall(0.99), "max_loss": np.max(total_losses), "mean_recovery": np.mean(np.sum(results.insurance_recoveries, axis=1)), "mean_retained": np.mean(np.sum(results.retained_losses, axis=1)), "mean_growth_rate": np.mean(results.growth_rates), "std_growth_rate": np.std(results.growth_rates), "sharpe_ratio": ( np.mean(results.growth_rates) / np.std(results.growth_rates) if np.std(results.growth_rates) > 0 else 0 ), } return metrics def _check_convergence(self, results: MonteCarloResults) -> Dict[str, ConvergenceStats]: """Check convergence of IID Monte Carlo simulation results using MCSE. For IID Monte Carlo simulations, the Gelman-Rubin R-hat statistic is not an appropriate diagnostic because it requires independently initialized chains (Gelman & Rubin, 1992). Splitting a single IID sample into consecutive segments produces R-hat ~ 1.0 by construction, regardless of sample-size adequacy (issue #1353). Convergence is instead assessed via Monte Carlo Standard Error (MCSE). For IID samples MCSE = std / sqrt(n). The simulation is declared converged when relative MCSE (MCSE / |mean|) < threshold *and* the effective sample size exceeds ``min_ess``. Args: results: Simulation results Returns: Dictionary of convergence statistics """ n = len(results.growth_rates) if n < 10: return {} metrics_data = { "growth_rate": results.growth_rates, "total_losses": np.sum(results.annual_losses, axis=1), } convergence_stats: Dict[str, ConvergenceStats] = {} for name, data in metrics_data.items(): ess = self.convergence_diagnostics.calculate_ess(data) mcse = self.convergence_diagnostics.calculate_mcse(data, ess) mean_val = float(np.mean(data)) if mcse == 0: relative_mcse = 0.0 else: relative_mcse = mcse / abs(mean_val) if mean_val != 0 else float("inf") autocorr_arr = self.convergence_diagnostics._calculate_autocorrelation(data, 1) lag1_autocorr = float(autocorr_arr[1]) if len(autocorr_arr) > 1 else 0.0 converged = ( ess >= self.convergence_diagnostics.min_ess and relative_mcse < self.config.convergence_mcse_threshold ) convergence_stats[name] = ConvergenceStats( r_hat=float("nan"), # Not computed for single-run IID MC (#1353) ess=ess, mcse=mcse, converged=converged, n_iterations=n, autocorrelation=lag1_autocorr, ) return convergence_stats def _perform_advanced_aggregation(self, results: MonteCarloResults) -> MonteCarloResults: """Perform advanced aggregation on simulation results. Args: results: Initial simulation results Returns: Enhanced results with aggregation data """ # Aggregate final assets if self.result_aggregator: results.aggregated_results = self.result_aggregator.aggregate(results.final_assets) # Time series aggregation if self.time_series_aggregator: # Aggregate annual losses results.time_series_aggregation = { "losses": self.time_series_aggregator.aggregate( results.annual_losses.T # Transpose to have time in rows ), "recoveries": self.time_series_aggregator.aggregate(results.insurance_recoveries.T), "retained": self.time_series_aggregator.aggregate(results.retained_losses.T), } # Statistical summary if self.summary_statistics: results.statistical_summary = self.summary_statistics.calculate_summary( results.final_assets ) # Generate report if requested if self.config.generate_summary_report: report_generator = SummaryReportGenerator(style=self.config.summary_report_format) results.summary_report = report_generator.generate_report( results.statistical_summary, title="Monte Carlo Simulation Summary", metadata={ "Simulations": self.config.n_simulations, "Years": self.config.n_years, "Ruin Probability": f"{results.ruin_probability.get(str(self.config.n_years), 0.0):.2%}", "Mean Growth Rate": f"{np.mean(results.growth_rates):.4f}", }, ) return results
[docs] def export_results( self, results: MonteCarloResults, filepath: Path, file_format: str = "csv" ) -> None: """Export simulation results to file. Args: results: Simulation results to export filepath: Output file path file_format: Export format ('csv', 'json', 'hdf5') """ if not results.aggregated_results: # Perform aggregation if not already done if self.result_aggregator: results.aggregated_results = self.result_aggregator.aggregate(results.final_assets) else: # Use default aggregation agg_config = AggregationConfig() aggregator = ResultAggregator(agg_config) results.aggregated_results = aggregator.aggregate(results.final_assets) # Export based on format if file_format.lower() == "csv": ResultExporter.to_csv(results.aggregated_results, filepath) elif file_format.lower() == "json": ResultExporter.to_json(results.aggregated_results, filepath) elif file_format.lower() == "hdf5": ResultExporter.to_hdf5(results.aggregated_results, filepath) else: raise ValueError(f"Unsupported export format: {file_format}")
[docs] def compute_bootstrap_confidence_intervals( self, results: MonteCarloResults, confidence_level: float = 0.95, n_bootstrap: int = 10000, method: str = "percentile", show_progress: bool = False, ) -> Dict[str, Tuple[float, float]]: """Compute bootstrap confidence intervals for key simulation metrics. All metrics share a single set of bootstrap resampling indices per iteration, reducing total work from ``7 * n_bootstrap`` to ``n_bootstrap`` iterations. Args: results: Simulation results to analyze. confidence_level: Confidence level for intervals (default 0.95). n_bootstrap: Number of bootstrap iterations (default 10000). method: Bootstrap method ('percentile' or 'bca'). show_progress: Whether to show progress bar. Returns: Dictionary mapping metric names to (lower, upper) confidence bounds. """ from .bootstrap_analysis import BootstrapAnalyzer analyzer = BootstrapAnalyzer( n_bootstrap=n_bootstrap, confidence_level=confidence_level, seed=self.config.seed, show_progress=show_progress, ) # Prepare derived arrays ruin_indicator = (results.final_assets <= self.config.insolvency_tolerance).astype(float) mean_annual_losses = np.mean(results.annual_losses, axis=1) mean_recoveries = np.mean(results.insurance_recoveries, axis=1) # Build metrics dict: name -> (data, statistic) metrics: Dict[str, Tuple[np.ndarray, Callable[[np.ndarray], float]]] = { "Mean Final Assets": (results.final_assets, np.mean), "Median Final Assets": (results.final_assets, np.median), "Mean Growth Rate": (results.growth_rates, np.mean), "Ruin Probability": (ruin_indicator, np.mean), "Mean Annual Losses": (mean_annual_losses, np.mean), "Mean Insurance Recoveries": (mean_recoveries, np.mean), } if "var_99" in results.metrics: def var_99(x: np.ndarray) -> float: return float(np.percentile(x, 99)) metrics["VaR(99%)"] = (results.final_assets, var_99) # Compute all CIs with shared bootstrap indices (single pass) bootstrap_results = analyzer.multi_confidence_interval( metrics, confidence_level=confidence_level, method=method ) return {name: res.confidence_interval for name, res in bootstrap_results.items()}
def _get_cache_key(self) -> str: """Generate cache key for current configuration. Returns: Cache key string """ key_parts = [ f"n_sims_{self.config.n_simulations}", f"n_years_{self.config.n_years}", f"seed_{self.config.seed}", f"ins_{hashlib.sha256(str(self.insurance_program).encode()).hexdigest()[:16]}", f"mfg_{hashlib.sha256(str(self.manufacturer).encode()).hexdigest()[:16]}", ] return "_".join(key_parts) def _save_cache(self, cache_key: str, results: MonteCarloResults) -> None: """Save results to cache. Args: cache_key: Cache key results: Results to cache """ cache_file = self.cache_dir / f"{cache_key}.pkl" try: with open(cache_file, "wb") as f: safe_dump(results, f) except (IOError, OSError, pickle.PickleError) as e: logger.warning("Failed to save cache: %s", e) def _load_cache(self, cache_key: str) -> Optional[MonteCarloResults]: """Load results from cache. Args: cache_key: Cache key Returns: Cached results or None """ cache_file = self.cache_dir / f"{cache_key}.pkl" if cache_file.exists(): try: with open(cache_file, "rb") as f: loaded_data = safe_load(f) return loaded_data # type: ignore except (IOError, OSError, pickle.PickleError, EOFError, ValueError) as e: logger.warning("Failed to load cache: %s", e) return None def _save_checkpoint(self, iteration: int, *arrays) -> None: """Save checkpoint during simulation. Args: iteration: Current iteration number arrays: Arrays to save """ checkpoint_file = self.cache_dir / f"checkpoint_{iteration}.npz" try: np.savez_compressed(checkpoint_file, iteration=iteration, *arrays) except (IOError, OSError, ValueError) as e: logger.warning("Failed to save checkpoint: %s", e) def _initialize_simulation_arrays(self) -> Dict[str, np.ndarray]: """Initialize arrays for simulation results.""" n_sims = self.config.n_simulations n_years = self.config.n_years dtype = np.float32 if self.config.use_float32 else np.float64 return { "final_assets": np.zeros(n_sims, dtype=dtype), "annual_losses": np.zeros((n_sims, n_years), dtype=dtype), "insurance_recoveries": np.zeros((n_sims, n_years), dtype=dtype), "retained_losses": np.zeros((n_sims, n_years), dtype=dtype), } def _check_convergence_at_interval( self, completed_iterations: int, final_assets: np.ndarray ) -> float: """Check convergence at specified interval using relative MCSE. Returns the relative MCSE (MCSE / |mean|) for the growth-rate metric. Smaller values indicate better convergence. The previous pseudo-chain R-hat approach was removed because splitting a single IID sample into consecutive segments is not a valid Gelman-Rubin diagnostic (#1353). """ partial_growth = self._calculate_growth_rates(final_assets[:completed_iterations]) if len(partial_growth) < 2: return float("inf") mcse = self.convergence_diagnostics.calculate_mcse(partial_growth) if mcse == 0: return 0.0 mean_val = float(np.mean(partial_growth)) relative_mcse = mcse / abs(mean_val) if mean_val != 0 else float("inf") return relative_mcse def _run_simulation_batch( self, batch_range: range, arrays: Dict[str, np.ndarray], batch_config: Dict[str, Any] ) -> int: """Run a batch of simulations with monitoring.""" monitor = batch_config["monitor"] check_intervals = batch_config["check_intervals"] early_stopping = batch_config["early_stopping"] show_progress = batch_config["show_progress"] completed_iterations = 0 for i in batch_range: sim_results = self._run_single_simulation(i) arrays["final_assets"][i] = sim_results["final_assets"] arrays["annual_losses"][i] = sim_results["annual_losses"] arrays["insurance_recoveries"][i] = sim_results["insurance_recoveries"] arrays["retained_losses"][i] = sim_results["retained_losses"] completed_iterations = i + 1 # Check if we should perform convergence check if completed_iterations in check_intervals and completed_iterations >= 1000: r_hat = self._check_convergence_at_interval( completed_iterations, arrays["final_assets"] ) should_continue = monitor.update(completed_iterations, r_hat) if early_stopping and not should_continue: if show_progress: logger.info( "Early stopping: Convergence achieved at %s iterations", f"{completed_iterations:,}", ) break else: monitor.update(completed_iterations) return completed_iterations
[docs] def run_with_progress_monitoring( self, check_intervals: Optional[List[int]] = None, convergence_threshold: float = 1.1, early_stopping: bool = True, show_progress: bool = True, ) -> MonteCarloResults: """Run simulation with progress tracking and convergence monitoring.""" if check_intervals is None: check_intervals = [10_000, 25_000, 50_000, 100_000] check_intervals = [i for i in check_intervals if i <= self.config.n_simulations] # Initialize monitor and arrays monitor = ProgressMonitor( total_iterations=self.config.n_simulations, check_intervals=check_intervals, update_frequency=max(self.config.n_simulations // 100, 100), show_console=show_progress, convergence_threshold=convergence_threshold, ) start_time = time.time() arrays = self._initialize_simulation_arrays() # Run simulations in batches batch_size = min(1000, self.config.n_simulations // 10) completed_iterations = 0 for batch_start in range(0, self.config.n_simulations, batch_size): batch_end = min(batch_start + batch_size, self.config.n_simulations) completed_iterations = self._run_simulation_batch( range(batch_start, batch_end), arrays, { "monitor": monitor, "check_intervals": check_intervals, "early_stopping": early_stopping, "show_progress": show_progress, }, ) if early_stopping and monitor.converged: break monitor.finalize() # Trim arrays if stopped early if completed_iterations < self.config.n_simulations: for key in arrays: if key == "final_assets": arrays[key] = arrays[key][:completed_iterations] else: arrays[key] = arrays[key][:completed_iterations] # Create results growth_rates = self._calculate_growth_rates(arrays["final_assets"]) # Calculate ruin probability using insolvency_tolerance for consistency # (issue #299: unify threshold across all execution paths) ruin_probability = { str(self.config.n_years): float( np.mean(arrays["final_assets"] <= self.config.insolvency_tolerance) ) } results = MonteCarloResults( final_assets=arrays["final_assets"], annual_losses=arrays["annual_losses"], insurance_recoveries=arrays["insurance_recoveries"], retained_losses=arrays["retained_losses"], growth_rates=growth_rates, ruin_probability=ruin_probability, metrics={}, convergence={}, execution_time=time.time() - start_time, config=self.config, ) results.metrics = self._calculate_metrics(results) results.convergence = self._check_convergence(results) # Add progress summary to metrics monitor.get_stats() convergence_summary = monitor.generate_convergence_summary() results.metrics["actual_iterations"] = completed_iterations results.metrics["convergence_achieved"] = monitor.converged results.metrics["convergence_iteration"] = ( monitor.converged_at if monitor.converged_at else 0 ) results.metrics["monitoring_overhead_pct"] = convergence_summary.get( "performance_overhead_pct", 0 ) # Store ESS information if results.convergence: for metric_name, conv_stats in results.convergence.items(): results.metrics[f"ess_{metric_name}"] = conv_stats.ess return results
[docs] def run_with_convergence_monitoring( self, target_r_hat: float = 1.05, check_interval: int = 10000, max_iterations: Optional[int] = None, ) -> MonteCarloResults: """Run simulation with automatic convergence monitoring. Args: target_r_hat: Target R-hat for convergence check_interval: Check convergence every N simulations max_iterations: Maximum iterations (None for no limit) Returns: Converged simulation results """ if max_iterations is None: max_iterations = self.config.n_simulations * 10 batch_seed = self.config.seed # Work on a shallow copy so the caller's config object is never # mutated (issue #1018). Only self.config is swapped; the # original dataclass instance stays untouched. original_config = self.config self.config = replace(original_config, n_simulations=check_interval) all_results = [] total_iterations = 0 converged = False try: while not converged and total_iterations < max_iterations: # Set seed for this batch (mutates only the local copy) self.config.seed = batch_seed # Run batch batch_results = self.run() all_results.append(batch_results) total_iterations += check_interval # Combine all results so far if len(all_results) > 1: combined = self._combine_multiple_results(all_results) else: combined = batch_results # Check convergence convergence = self._check_convergence(combined) if convergence: converged = all(stat.converged for stat in convergence.values()) if self.config.progress_bar: max_mcse = max(stat.mcse for stat in convergence.values()) logger.debug( "Iteration %d: max MCSE = %.6f, converged = %s", total_iterations, max_mcse, converged, ) # Advance seed for next batch if batch_seed is not None: batch_seed += check_interval finally: # Restore original config reference self.config = original_config return combined
def _combine_multiple_results(self, results_list: List[MonteCarloResults]) -> MonteCarloResults: """Combine multiple simulation results. Args: results_list: List of MonteCarloResults to combine Returns: Combined MonteCarloResults """ # Concatenate arrays final_assets = np.concatenate([r.final_assets for r in results_list]) annual_losses = np.vstack([r.annual_losses for r in results_list]) insurance_recoveries = np.vstack([r.insurance_recoveries for r in results_list]) retained_losses = np.vstack([r.retained_losses for r in results_list]) growth_rates = np.concatenate([r.growth_rates for r in results_list]) # Recalculate metrics combined = MonteCarloResults( final_assets=final_assets, annual_losses=annual_losses, insurance_recoveries=insurance_recoveries, retained_losses=retained_losses, growth_rates=growth_rates, ruin_probability={ str(self.config.n_years): float( np.mean(final_assets <= self.config.insolvency_tolerance) ) }, metrics={}, convergence={}, execution_time=sum(r.execution_time for r in results_list), config=self.config, ) combined.metrics = self._calculate_metrics(combined) combined.convergence = self._check_convergence(combined) return combined
[docs] def estimate_ruin_probability( self, config: Optional[RuinProbabilityConfig] = None, ) -> RuinProbabilityResults: """Estimate ruin probability over multiple time horizons. Delegates to RuinProbabilityAnalyzer for specialized analysis. Args: config: Configuration for ruin probability estimation Returns: RuinProbabilityResults with comprehensive bankruptcy analysis """ analyzer = RuinProbabilityAnalyzer( manufacturer=self.manufacturer, loss_generator=self.loss_generator, insurance_program=self.insurance_program, config=self.config, ) return analyzer.analyze_ruin_probability(config)
def __getattr__(name): if name == "SimulationConfig": warnings.warn( "SimulationConfig has been renamed to MonteCarloConfig. " "Please update your imports. The old name will be removed in a future version.", ErgodicInsuranceDeprecationWarning, stacklevel=2, ) return MonteCarloConfig raise AttributeError(f"module {__name__!r} has no attribute {name!r}")