Source code for ergodic_insurance.monte_carlo

"""High-performance Monte Carlo simulation engine for insurance optimization."""

from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
import hashlib
import os
from pathlib import Path
import pickle
import time
from typing import Any, Dict, List, Optional, Tuple
import warnings

import numpy as np
from tqdm import tqdm

from .convergence import ConvergenceDiagnostics, ConvergenceStats
from .insurance_program import InsuranceProgram
from .loss_distributions import ManufacturingLossGenerator
from .manufacturer import WidgetManufacturer
from .monte_carlo_worker import run_chunk_standalone
from .parallel_executor import (
    ChunkingStrategy,
    ParallelExecutor,
    PerformanceMetrics,
    SharedMemoryConfig,
)
from .progress_monitor import ProgressMonitor
from .result_aggregator import (
    AggregationConfig,
    ResultAggregator,
    ResultExporter,
    TimeSeriesAggregator,
)
from .risk_metrics import RiskMetrics
from .ruin_probability import RuinProbabilityAnalyzer, RuinProbabilityConfig, RuinProbabilityResults
from .safe_pickle import safe_dump, safe_load
from .summary_statistics import SummaryReportGenerator, SummaryStatistics
from .trajectory_storage import StorageConfig, TrajectoryStorage


def _create_manufacturer(config_dict: Dict[str, Any]) -> Any:
    """Create manufacturer instance from config dictionary."""
    # WidgetManufacturer is already imported at module level
    if "config" in config_dict and hasattr(config_dict["config"], "__dict__"):
        return WidgetManufacturer(config_dict["config"])
    # Create from raw values
    manufacturer = WidgetManufacturer.__new__(WidgetManufacturer)
    for key, value in config_dict.items():
        setattr(manufacturer, key, value)
    return manufacturer


def _simulate_year_losses(sim_id: int, year: int) -> Tuple[float, float, float]:
    """Simulate losses for a single year.

    .. deprecated::
        This stub is unused. Enhanced parallel paths now use the configured
        loss generator passed via shared data. Retained only for backward
        compatibility with any external callers.
    """
    rng = np.random.default_rng(sim_id * 1000 + year)
    n_events = rng.poisson(3)

    if n_events == 0:
        total_loss = 0.0
    else:
        event_amounts = rng.lognormal(10, 2, n_events)
        total_loss = float(np.sum(event_amounts))

    recovery = min(total_loss, 1_000_000) * 0.9
    retained = total_loss - recovery

    return total_loss, recovery, retained


def _test_worker_function() -> bool:
    """Test function to check if multiprocessing works with scipy imports.

    Returns:
        True if the worker can execute successfully
    """
    try:
        # Try importing scipy to check if it causes issues
        from scipy import stats  # noqa: F401  # pylint: disable=unused-import

        # Test numpy is available (already imported at module level)
        _ = np.array([1, 2, 3])

        return True
    except ImportError:
        return False


def _simulate_path_enhanced(sim_id: int, **shared) -> Dict[str, Any]:
    """Enhanced simulation function for parallel execution.

    Module-level function for pickle compatibility in multiprocessing.
    Uses the configured loss generator and insurance program passed via
    shared data instead of a hardcoded stub (see issue #299).

    Args:
        sim_id: Simulation ID for seeding
        **shared: Shared data from parallel executor

    Returns:
        Dict with simulation results
    """
    import sys

    module_path = Path(__file__).parent.parent.parent
    if str(module_path) not in sys.path:
        sys.path.insert(0, str(module_path))

    # Create manufacturer instance
    manufacturer = _create_manufacturer(shared["manufacturer_config"])

    # Deep-copy the insurance program so each simulation gets fresh state (Issue #348)
    import copy

    loss_generator = shared["loss_generator"]
    insurance_program = copy.deepcopy(shared["insurance_program"])

    # Re-seed the loss generator for this simulation to ensure independence
    base_seed = shared.get("base_seed")
    if base_seed is not None:
        loss_generator.reseed(base_seed + sim_id)

    # Initialize simulation arrays
    n_years = shared["n_years"]
    dtype = np.float32 if shared["use_float32"] else np.float64

    result_arrays = {
        "annual_losses": np.zeros(n_years, dtype=dtype),
        "insurance_recoveries": np.zeros(n_years, dtype=dtype),
        "retained_losses": np.zeros(n_years, dtype=dtype),
    }

    # Track ruin at evaluation points if requested
    ruin_evaluation = shared.get("ruin_evaluation", None)
    ruin_at_year = {}
    if ruin_evaluation:
        for eval_year in ruin_evaluation:
            if eval_year <= n_years:
                ruin_at_year[eval_year] = False

    # CRN: look up once before the loop
    crn_base_seed = shared.get("crn_base_seed")

    # Simulate years using the configured loss generator
    for year in range(n_years):
        # Reset insurance program aggregate limits at start of each policy year (Issue #348)
        if year > 0:
            insurance_program.reset_annual()

        # CRN: reseed per (sim_id, year) for reproducible cross-scenario comparison
        if crn_base_seed is not None:
            year_ss = np.random.SeedSequence([crn_base_seed, sim_id, year])
            children = year_ss.spawn(2)
            loss_generator.reseed(int(children[0].generate_state(1)[0]))
            if manufacturer.stochastic_process is not None:
                manufacturer.stochastic_process.reset(int(children[1].generate_state(1)[0]))

        revenue = manufacturer.calculate_revenue()

        # Generate losses using configured loss generator
        if hasattr(loss_generator, "generate_losses"):
            year_losses, _ = loss_generator.generate_losses(duration=1.0, revenue=float(revenue))
        else:
            raise AttributeError(
                f"Loss generator {type(loss_generator).__name__} has no generate_losses method"
            )

        total_loss = sum(loss.amount for loss in year_losses)
        result_arrays["annual_losses"][year] = total_loss

        # Apply insurance PER OCCURRENCE (not aggregate) and create
        # ClaimLiability objects with LoC collateral (Issue #342).
        # Previously this path applied insurance to the aggregate total_loss,
        # which mis-applied per-occurrence deductibles.
        total_recovery = 0.0
        total_retained = 0.0

        for loss_event in year_losses:
            if loss_event.amount > 0:
                claim_result = insurance_program.process_claim(loss_event.amount)
                event_recovery = claim_result.get("insurance_recovery", 0)
                event_retained = loss_event.amount - event_recovery

                total_recovery += event_recovery
                total_retained += event_retained

                # Create ClaimLiability and post collateral for the retained
                # portion. See Issue #342 for double-counting rationale.
                if event_retained > 0:
                    manufacturer.process_insurance_claim(
                        claim_amount=loss_event.amount,
                        insurance_recovery=event_recovery,
                    )

        result_arrays["insurance_recoveries"][year] = total_recovery
        result_arrays["retained_losses"][year] = total_retained

        # Record insurance premium scaled by revenue (Issue #349)
        current_revenue = manufacturer.calculate_revenue()
        base_revenue = float(
            manufacturer.config.initial_assets * manufacturer.config.asset_turnover_ratio
        )
        revenue_scaling_factor = float(current_revenue) / base_revenue if base_revenue > 0 else 1.0
        base_annual_premium = insurance_program.calculate_annual_premium()
        annual_premium = base_annual_premium * revenue_scaling_factor
        if annual_premium > 0:
            manufacturer.record_insurance_premium(annual_premium)

        # Step with config parameters (Issue #349)
        manufacturer.step(
            letter_of_credit_rate=shared.get("letter_of_credit_rate", 0.015),
            growth_rate=shared.get("growth_rate", 0.0),
            time_resolution=shared.get("time_resolution", "annual"),
            apply_stochastic=shared.get("apply_stochastic", False),
        )

        # Prune old ledger entries to bound memory (Issue #315)
        if shared.get("enable_ledger_pruning", False) and year > 0:
            manufacturer.ledger.prune_entries(before_date=year)

        # Check for ruin - use insolvency tolerance from shared config
        tolerance = shared.get("insolvency_tolerance", 10_000)
        if float(manufacturer.equity) <= tolerance:
            # Mark ruin for all future evaluation points
            if ruin_evaluation:
                for eval_year in ruin_at_year:
                    if year < eval_year:
                        ruin_at_year[eval_year] = True
            break

    result = {"final_assets": float(manufacturer.total_assets), **result_arrays}
    if ruin_evaluation:
        result["ruin_at_year"] = ruin_at_year
    return result


[docs] @dataclass class SimulationConfig: """Configuration for Monte Carlo simulation. Attributes: n_simulations: Number of simulation paths n_years: Number of years per simulation n_chains: Number of parallel chains for convergence parallel: Whether to use multiprocessing n_workers: Number of parallel workers (None for auto) chunk_size: Size of chunks for parallel processing use_float32: Use float32 for memory efficiency cache_results: Cache intermediate results checkpoint_interval: Save checkpoint every N simulations progress_bar: Show progress bar seed: Random seed for reproducibility use_enhanced_parallel: Use enhanced parallel executor for better performance monitor_performance: Track detailed performance metrics adaptive_chunking: Enable adaptive chunk sizing shared_memory: Enable shared memory for read-only data letter_of_credit_rate: Annual LoC rate for collateral costs (default 1.5%) growth_rate: Revenue growth rate per period (default 0.0) time_resolution: Time step resolution, "annual" or "monthly" (default "annual") apply_stochastic: Whether to apply stochastic shocks (default False) enable_ledger_pruning: Prune old ledger entries each year to bound memory (default False) crn_base_seed: Common Random Numbers base seed for cross-scenario comparison. When set, the loss generator and stochastic process are reseeded at each (sim_id, year) boundary using SeedSequence([crn_base_seed, sim_id, year]). This ensures that compared scenarios (e.g. different deductibles) experience the same underlying random draws each year, dramatically reducing estimator variance for growth-lift metrics. (default None, disabled) """ n_simulations: int = 100_000 n_years: int = 10 n_chains: int = 4 parallel: bool = True n_workers: Optional[int] = None chunk_size: int = 10_000 use_float32: bool = False cache_results: bool = True checkpoint_interval: Optional[int] = None progress_bar: bool = True seed: Optional[int] = None use_enhanced_parallel: bool = True monitor_performance: bool = True adaptive_chunking: bool = True shared_memory: bool = True # Trajectory storage options enable_trajectory_storage: bool = False trajectory_storage_config: Optional[StorageConfig] = None # Aggregation options enable_advanced_aggregation: bool = True aggregation_config: Optional[AggregationConfig] = None generate_summary_report: bool = False summary_report_format: str = "markdown" # Bootstrap confidence interval options compute_bootstrap_ci: bool = False bootstrap_confidence_level: float = 0.95 bootstrap_n_iterations: int = 10000 bootstrap_method: str = "percentile" # Periodic ruin evaluation options ruin_evaluation: Optional[List[int]] = None # Insolvency tolerance insolvency_tolerance: float = 10_000 # Company is insolvent when equity <= this threshold # Simulation step parameters (passed to manufacturer.step()) letter_of_credit_rate: float = 0.015 # Annual LoC rate for collateral costs growth_rate: float = 0.0 # Revenue growth rate per period time_resolution: str = "annual" # "annual" or "monthly" apply_stochastic: bool = False # Whether to apply stochastic shocks enable_ledger_pruning: bool = False # Prune old ledger entries to bound memory (Issue #315) crn_base_seed: Optional[int] = None # Common Random Numbers seed for cross-scenario comparison
[docs] @dataclass class SimulationResults: """Results from Monte Carlo simulation. Attributes: final_assets: Final asset values for each simulation annual_losses: Annual loss amounts insurance_recoveries: Insurance recovery amounts retained_losses: Retained loss amounts growth_rates: Realized growth rates ruin_probability: Probability of ruin metrics: Risk metrics calculated from results convergence: Convergence statistics execution_time: Total execution time in seconds config: Simulation configuration used performance_metrics: Detailed performance metrics (if monitoring enabled) aggregated_results: Advanced aggregation results (if enabled) time_series_aggregation: Time series aggregation results (if enabled) statistical_summary: Complete statistical summary (if enabled) summary_report: Formatted summary report (if generated) bootstrap_confidence_intervals: Bootstrap confidence intervals for key metrics """ final_assets: np.ndarray annual_losses: np.ndarray insurance_recoveries: np.ndarray retained_losses: np.ndarray growth_rates: np.ndarray ruin_probability: Dict[str, float] metrics: Dict[str, float] convergence: Dict[str, ConvergenceStats] execution_time: float config: SimulationConfig performance_metrics: Optional[PerformanceMetrics] = None aggregated_results: Optional[Dict[str, Any]] = None time_series_aggregation: Optional[Dict[str, Any]] = None statistical_summary: Optional[Any] = None summary_report: Optional[str] = None bootstrap_confidence_intervals: Optional[Dict[str, Tuple[float, float]]] = None
[docs] def summary(self) -> str: """Generate summary of simulation results.""" # Format ruin probability section # ruin_probability is always a dict now # Sort by year for display ruin_prob_lines = [] for year_str in sorted(self.ruin_probability.keys(), key=int): prob = self.ruin_probability[year_str] ruin_prob_lines.append(f" Year {year_str}: {prob:.2%}") ruin_prob_section = "Ruin Probability:\n" + "\n".join(ruin_prob_lines) base_summary = ( f"Simulation Results Summary\n" f"{'='*50}\n" f"Simulations: {self.config.n_simulations:,}\n" f"Years: {self.config.n_years}\n" f"Execution Time: {self.execution_time:.2f}s\n" f"{ruin_prob_section}\n" f"Mean Final Assets: ${np.mean(self.final_assets):,.0f}\n" f"Mean Growth Rate: {np.mean(self.growth_rates):.4f}\n" f"VaR(99%): ${self.metrics.get('var_99', 0):,.0f}\n" f"TVaR(99%): ${self.metrics.get('tvar_99', 0):,.0f}\n" f"Convergence R-hat: " f"{self.convergence.get('growth_rate', ConvergenceStats(0,0,0,False,0,0)).r_hat:.3f}\n" ) # Add performance metrics if available if self.performance_metrics: base_summary += f"\n{'='*50}\n" base_summary += self.performance_metrics.summary() # Add advanced aggregation results if available if self.aggregated_results: base_summary += f"\n{'='*50}\nAdvanced Aggregation Results:\n" if "percentiles" in self.aggregated_results: for p, val in self.aggregated_results["percentiles"].items(): base_summary += f" {p}: ${val:,.0f}\n" # Add bootstrap confidence intervals if available if self.bootstrap_confidence_intervals: base_summary += f"\n{'='*50}\nBootstrap Confidence Intervals (95%):\n" for metric_name, (lower, upper) in self.bootstrap_confidence_intervals.items(): if ( "assets" in metric_name.lower() or "var" in metric_name.lower() or "tvar" in metric_name.lower() ): # Format as currency for asset-related metrics base_summary += f" {metric_name}: [${lower:,.0f}, ${upper:,.0f}]\n" elif "probability" in metric_name.lower() or "rate" in metric_name.lower(): # Format as percentage for rates and probabilities base_summary += f" {metric_name}: [{lower:.2%}, {upper:.2%}]\n" else: # Default formatting base_summary += f" {metric_name}: [{lower:.4f}, {upper:.4f}]\n" # Add summary report if available if self.summary_report: base_summary += f"\n{'='*50}\n{self.summary_report}" return base_summary
[docs] class MonteCarloEngine: """High-performance Monte Carlo simulation engine for insurance analysis. Provides efficient Monte Carlo simulation with support for parallel processing, convergence monitoring, checkpointing, and comprehensive result aggregation. Optimized for both high-end and budget hardware configurations. Examples: Basic Monte Carlo simulation:: from .monte_carlo import MonteCarloEngine, SimulationConfig from .loss_distributions import ManufacturingLossGenerator from .insurance_program import InsuranceProgram from .manufacturer import WidgetManufacturer # Configure simulation config = SimulationConfig( n_simulations=10000, n_years=20, parallel=True, n_workers=4 ) # Create components loss_gen = ManufacturingLossGenerator() insurance = InsuranceProgram.create_standard_program() manufacturer = WidgetManufacturer.from_config() # Run Monte Carlo engine = MonteCarloEngine( loss_generator=loss_gen, insurance_program=insurance, manufacturer=manufacturer, config=config ) results = engine.run() print(f"Survival rate: {results.survival_rate:.1%}") print(f"Mean ROE: {results.mean_roe:.2%}") Advanced simulation with convergence monitoring:: # Enable convergence checking config = SimulationConfig( n_simulations=100000, check_convergence=True, convergence_tolerance=0.001, min_iterations=1000 ) engine = MonteCarloEngine( loss_generator=loss_gen, insurance_program=insurance, manufacturer=manufacturer, config=config ) # Run with progress tracking results = engine.run(show_progress=True) # Check convergence if results.converged: print(f"Converged after {results.iterations} iterations") print(f"Standard error: {results.standard_error:.4f}") Attributes: loss_generator: Generator for manufacturing loss events insurance_program: Insurance coverage structure manufacturer: Manufacturing company financial model config: Simulation configuration parameters convergence_diagnostics: Convergence monitoring tools See Also: :class:`SimulationConfig`: Configuration parameters :class:`MonteCarloResults`: Simulation results container :class:`ParallelExecutor`: Enhanced parallel processing :class:`ConvergenceDiagnostics`: Convergence analysis tools """ def __init__( self, loss_generator: ManufacturingLossGenerator, insurance_program: InsuranceProgram, manufacturer: WidgetManufacturer, config: Optional[SimulationConfig] = None, ): """Initialize Monte Carlo engine. Args: loss_generator: Generator for loss events insurance_program: Insurance program structure manufacturer: Manufacturing company model config: Simulation configuration """ self.loss_generator = loss_generator self.insurance_program = insurance_program self.manufacturer = manufacturer self.config = config or SimulationConfig() # Set up convergence diagnostics self.convergence_diagnostics = ConvergenceDiagnostics() # Cache directory for checkpoints self.cache_dir = Path("cache/monte_carlo") if self.config.cache_results: self.cache_dir.mkdir(parents=True, exist_ok=True) # NOTE: Global np.random.seed() was removed here (issue #299). # Setting the global seed leaks side effects to other code and does not # affect the loss generator's per-instance RandomState objects. # Per-chunk / per-simulation seeding is handled in the worker functions. # Determine number of workers if self.config.n_workers is None and self.config.parallel: self.config.n_workers = min(os.cpu_count() or 4, 8) # Initialize enhanced parallel executor if enabled self.parallel_executor = None if self.config.use_enhanced_parallel and self.config.parallel: chunking_strategy = ChunkingStrategy( initial_chunk_size=self.config.chunk_size, adaptive=self.config.adaptive_chunking, ) shared_memory_config = SharedMemoryConfig( enable_shared_arrays=self.config.shared_memory, enable_shared_objects=self.config.shared_memory, ) self.parallel_executor = ParallelExecutor( n_workers=self.config.n_workers, chunking_strategy=chunking_strategy, shared_memory_config=shared_memory_config, monitor_performance=self.config.monitor_performance, ) # Initialize trajectory storage if enabled self.trajectory_storage: Optional[TrajectoryStorage] = None if self.config.enable_trajectory_storage: storage_config = self.config.trajectory_storage_config or StorageConfig() self.trajectory_storage = TrajectoryStorage(storage_config) # Initialize aggregators if enabled self.result_aggregator: Optional[ResultAggregator] = None self.time_series_aggregator: Optional[TimeSeriesAggregator] = None self.summary_statistics: Optional[SummaryStatistics] = None if self.config.enable_advanced_aggregation: agg_config = self.config.aggregation_config or AggregationConfig() self.result_aggregator = ResultAggregator(agg_config) self.time_series_aggregator = TimeSeriesAggregator(agg_config) self.summary_statistics = SummaryStatistics()
[docs] def run(self) -> SimulationResults: """Execute Monte Carlo simulation. Returns: SimulationResults object with all outputs """ start_time = time.time() # Check for cached results cache_key = self._get_cache_key() if self.config.cache_results: cached = self._load_cache(cache_key) if cached is not None: print("Loaded cached results") return cached # Reinitialize parallel executor if config has changed if self.config.use_enhanced_parallel and self.config.parallel: if self.parallel_executor is None or ( self.parallel_executor and self.parallel_executor.n_workers != self.config.n_workers ): # Need to reinitialize with new worker count chunking_strategy = ChunkingStrategy( initial_chunk_size=self.config.chunk_size, adaptive=self.config.adaptive_chunking, ) shared_memory_config = SharedMemoryConfig( enable_shared_arrays=self.config.shared_memory, enable_shared_objects=self.config.shared_memory, ) self.parallel_executor = ParallelExecutor( n_workers=self.config.n_workers, chunking_strategy=chunking_strategy, shared_memory_config=shared_memory_config, monitor_performance=self.config.monitor_performance, ) # Run simulation with appropriate executor if self.config.parallel: if self.config.use_enhanced_parallel and self.parallel_executor: results = self._run_enhanced_parallel() else: results = self._run_parallel() else: results = self._run_sequential() # Calculate metrics results.metrics = self._calculate_metrics(results) # Check convergence results.convergence = self._check_convergence(results) # Perform advanced aggregation if enabled if self.config.enable_advanced_aggregation: results = self._perform_advanced_aggregation(results) # Set execution time results.execution_time = time.time() - start_time # Add performance metrics if monitoring is enabled if self.config.monitor_performance: if self.config.use_enhanced_parallel and self.parallel_executor: results.performance_metrics = self.parallel_executor.performance_metrics else: # Create basic performance metrics for non-enhanced execution execution_time = time.time() - start_time # Estimate memory usage based on simulation size (basic approximation) memory_estimate = ( self.config.n_simulations * self.config.n_years * 8 * 4 ) # 4 arrays of 8-byte floats results.performance_metrics = PerformanceMetrics( total_time=execution_time, setup_time=0.0, computation_time=execution_time, serialization_time=0.0, reduction_time=0.0, memory_peak=memory_estimate, cpu_utilization=0.0, items_per_second=( self.config.n_simulations / execution_time if execution_time > 0 else 0.0 ), speedup=1.0, ) # Compute bootstrap confidence intervals if requested if self.config.compute_bootstrap_ci: results.bootstrap_confidence_intervals = self.compute_bootstrap_confidence_intervals( results, confidence_level=self.config.bootstrap_confidence_level, n_bootstrap=self.config.bootstrap_n_iterations, method=self.config.bootstrap_method, show_progress=self.config.progress_bar, ) # Cache results if self.config.cache_results: self._save_cache(cache_key, results) return results
def _run_sequential(self) -> SimulationResults: """Run simulation sequentially.""" n_sims = self.config.n_simulations n_years = self.config.n_years dtype = np.float32 if self.config.use_float32 else np.float64 # Pre-allocate arrays final_assets = np.zeros(n_sims, dtype=dtype) annual_losses = np.zeros((n_sims, n_years), dtype=dtype) insurance_recoveries = np.zeros((n_sims, n_years), dtype=dtype) retained_losses = np.zeros((n_sims, n_years), dtype=dtype) # Track periodic ruin if requested ruin_at_year_all = [] # Progress bar iterator = range(n_sims) if self.config.progress_bar: iterator = tqdm(iterator, desc="Running simulations") # Run simulations for i in iterator: sim_results = self._run_single_simulation(i) final_assets[i] = sim_results["final_assets"] annual_losses[i] = sim_results["annual_losses"] insurance_recoveries[i] = sim_results["insurance_recoveries"] retained_losses[i] = sim_results["retained_losses"] # Collect periodic ruin data if self.config.ruin_evaluation: ruin_at_year_all.append(sim_results["ruin_at_year"]) # Checkpoint if needed if ( self.config.checkpoint_interval and i > 0 and i % self.config.checkpoint_interval == 0 ): self._save_checkpoint( i, final_assets[: i + 1], annual_losses[: i + 1], insurance_recoveries[: i + 1], retained_losses[: i + 1], ) # Calculate growth rates growth_rates = self._calculate_growth_rates(final_assets) # Calculate ruin probability ruin_probability = {} if self.config.ruin_evaluation: # Aggregate periodic ruin probabilities for eval_year in self.config.ruin_evaluation: if eval_year <= n_years: ruin_count = sum(r.get(eval_year, False) for r in ruin_at_year_all) ruin_probability[str(eval_year)] = ruin_count / n_sims # Always add final ruin probability (at max runtime) # Count both: companies with low final assets AND companies marked as ruined earlier if self.config.ruin_evaluation or ruin_at_year_all: # Count from ruin_at_year tracking (includes early bankruptcies) final_ruin_count = sum(r.get(n_years, False) for r in ruin_at_year_all) else: # Fallback to equity check if no tracking final_ruin_count = np.sum(np.less_equal(final_assets, self.config.insolvency_tolerance)) ruin_probability[str(n_years)] = float(final_ruin_count / n_sims) return SimulationResults( final_assets=final_assets, annual_losses=annual_losses, insurance_recoveries=insurance_recoveries, retained_losses=retained_losses, growth_rates=growth_rates, ruin_probability=ruin_probability, metrics={}, convergence={}, execution_time=0, config=self.config, ) def _run_parallel(self) -> SimulationResults: """Run simulation in parallel using multiprocessing.""" # Check if we're on Windows and have scipy import issues # Fall back to sequential execution in these cases try: # Test if we can import scipy successfully (needed for loss distributions) from scipy import stats # noqa: F401 # pylint: disable=unused-import except (ImportError, TypeError) as e: warnings.warn( f"Scipy import failed in parallel mode: {e}. " "Falling back to sequential execution for reliability.", RuntimeWarning, stacklevel=2, ) return self._run_sequential() n_sims = self.config.n_simulations n_workers = self.config.n_workers chunk_size = self.config.chunk_size # Create chunks chunks = [] for i in range(0, n_sims, chunk_size): chunk_end = min(i + chunk_size, n_sims) chunk_seed = None if self.config.seed is None else self.config.seed + i chunks.append((i, chunk_end, chunk_seed)) # Prepare config dictionary for the standalone function config_dict = { "n_years": self.config.n_years, "use_float32": self.config.use_float32, "ruin_evaluation": self.config.ruin_evaluation, "insolvency_tolerance": self.config.insolvency_tolerance, "letter_of_credit_rate": self.config.letter_of_credit_rate, "growth_rate": self.config.growth_rate, "time_resolution": self.config.time_resolution, "apply_stochastic": self.config.apply_stochastic, } # Run chunks in parallel using standalone function all_results = [] try: with ProcessPoolExecutor(max_workers=n_workers) as executor: # Submit all tasks using the standalone function futures = { executor.submit( run_chunk_standalone, chunk, self.loss_generator, self.insurance_program, self.manufacturer, config_dict, ): chunk for chunk in chunks } # Process completed tasks if self.config.progress_bar: pbar = tqdm(total=len(chunks), desc="Processing chunks") for future in as_completed(futures): chunk_results = future.result() all_results.append(chunk_results) if self.config.progress_bar: pbar.update(1) if self.config.progress_bar: pbar.close() except (OSError, RuntimeError, ValueError, ImportError) as e: warnings.warn( f"Parallel execution failed: {e}. Falling back to sequential execution.", RuntimeWarning, stacklevel=2, ) return self._run_sequential() # Combine results return self._combine_chunk_results(all_results) def _run_enhanced_parallel(self) -> SimulationResults: """Run simulation using enhanced parallel executor. Uses CPU-optimized parallel execution with shared memory and intelligent chunking for better performance on budget hardware. """ n_sims = self.config.n_simulations # Ensure parallel executor is available assert self.parallel_executor is not None, "Enhanced parallel executor not initialized" # Check if we can safely use enhanced parallel execution # On Windows with scipy import issues, fall back to standard parallel try: # Try to execute a test function to see if multiprocessing works import multiprocessing as mp with ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context()) as executor: future = executor.submit(_test_worker_function) result = future.result(timeout=5) if not result: raise RuntimeError("Worker test failed") except (ImportError, RuntimeError, TimeoutError) as e: warnings.warn( f"Enhanced parallel execution failed: {e}. " "Falling back to standard parallel execution.", RuntimeWarning, stacklevel=2, ) return self._run_parallel() # Prepare shared data including the actual loss generator and insurance # program so _simulate_path_enhanced uses the configured model (issue #299). shared_data = { "n_years": self.config.n_years, "use_float32": self.config.use_float32, "ruin_evaluation": self.config.ruin_evaluation, "insolvency_tolerance": self.config.insolvency_tolerance, "enable_ledger_pruning": self.config.enable_ledger_pruning, "manufacturer_config": self.manufacturer.__dict__.copy(), "loss_generator": self.loss_generator, "insurance_program": self.insurance_program, "base_seed": self.config.seed, "crn_base_seed": self.config.crn_base_seed, # Step parameters for manufacturer.step() (Issue #349) "letter_of_credit_rate": self.config.letter_of_credit_rate, "growth_rate": self.config.growth_rate, "time_resolution": self.config.time_resolution, "apply_stochastic": self.config.apply_stochastic, } # Define reduce function def combine_results_enhanced(chunk_results): """Combine results from enhanced parallel execution.""" # Flatten list of lists all_results = [] for chunk in chunk_results: all_results.extend(chunk) # Extract arrays n_results = len(all_results) n_years = self.config.n_years dtype = np.float32 if self.config.use_float32 else np.float64 final_assets = np.zeros(n_results, dtype=dtype) annual_losses = np.zeros((n_results, n_years), dtype=dtype) insurance_recoveries = np.zeros((n_results, n_years), dtype=dtype) retained_losses = np.zeros((n_results, n_years), dtype=dtype) # Track periodic ruin if requested ruin_at_year_all = [] valid_idx = 0 for result in all_results: # Skip None results from failed simulations if result is None: continue # Ensure result is a dictionary with expected keys if isinstance(result, dict) and "final_assets" in result: final_assets[valid_idx] = result["final_assets"] annual_losses[valid_idx] = result["annual_losses"] insurance_recoveries[valid_idx] = result["insurance_recoveries"] retained_losses[valid_idx] = result["retained_losses"] # Collect periodic ruin data if present if "ruin_at_year" in result: ruin_at_year_all.append(result["ruin_at_year"]) valid_idx += 1 else: # Log warning for unexpected result format # NOTE: Do NOT use ``import warnings`` here – it creates # a local binding that shadows the module-level import and # causes UnboundLocalError later in the function. The # module-level ``import warnings`` (line 11) is sufficient. warnings.warn(f"Unexpected result format: {type(result)}") # Trim arrays to only valid results if valid_idx < n_results: final_assets = final_assets[:valid_idx] annual_losses = annual_losses[:valid_idx] insurance_recoveries = insurance_recoveries[:valid_idx] retained_losses = retained_losses[:valid_idx] # Calculate derived metrics growth_rates = self._calculate_growth_rates(final_assets) # Calculate ruin probability ruin_probability = {} total_simulations = len(final_assets) # Guard against division by zero when no valid results if total_simulations == 0: warnings.warn( "No valid simulation results from parallel execution. " "Falling back to sequential execution.", RuntimeWarning, stacklevel=2, ) return self._run_sequential() if self.config.ruin_evaluation and ruin_at_year_all: # Aggregate periodic ruin probabilities for eval_year in self.config.ruin_evaluation: if eval_year <= n_years: ruin_count = sum(r.get(eval_year, False) for r in ruin_at_year_all) ruin_probability[str(eval_year)] = ruin_count / total_simulations # Always add final ruin probability (at max runtime) # Count both: companies with low final assets AND companies marked as ruined earlier if self.config.ruin_evaluation and ruin_at_year_all: # Count from ruin_at_year tracking (includes early bankruptcies) final_ruin_count = sum(r.get(n_years, False) for r in ruin_at_year_all) else: # Fallback to equity check if no tracking final_ruin_count = np.sum( np.less_equal(final_assets, self.config.insolvency_tolerance) ) ruin_probability[str(n_years)] = float(final_ruin_count / total_simulations) return SimulationResults( final_assets=final_assets, annual_losses=annual_losses, insurance_recoveries=insurance_recoveries, retained_losses=retained_losses, growth_rates=growth_rates, ruin_probability=ruin_probability, metrics={}, convergence={}, execution_time=0, config=self.config, performance_metrics=None, ) # Execute using enhanced parallel executor results = self.parallel_executor.map_reduce( work_function=_simulate_path_enhanced, work_items=range(n_sims), reduce_function=combine_results_enhanced, shared_data=shared_data, progress_bar=self.config.progress_bar, ) # Add performance metrics from executor if self.config.monitor_performance and self.parallel_executor.performance_metrics: results.performance_metrics = self.parallel_executor.performance_metrics return results # type: ignore[no-any-return] def _run_single_simulation(self, sim_id: int) -> Dict[str, Any]: """Run a single simulation path. Args: sim_id: Simulation identifier Returns: Dictionary with simulation results """ n_years = self.config.n_years dtype = np.float32 if self.config.use_float32 else np.float64 # Create a copy of manufacturer for this simulation manufacturer = self.manufacturer.copy() # Arrays to store results annual_losses = np.zeros(n_years, dtype=dtype) insurance_recoveries = np.zeros(n_years, dtype=dtype) retained_losses = np.zeros(n_years, dtype=dtype) # Track ruin at evaluation points ruin_at_year = {} if self.config.ruin_evaluation: for eval_year in self.config.ruin_evaluation: if eval_year <= n_years: # Only track if within simulation period ruin_at_year[eval_year] = False # Always track final year to ensure early bankruptcies propagate ruin_at_year[n_years] = False # Reset insurance program state for this simulation path (Issue #348) self.insurance_program.reset_annual() # Run simulation for each year for year in range(n_years): # Reset insurance program aggregate limits at start of each policy year (Issue #348) if year > 0: self.insurance_program.reset_annual() # CRN: reseed per (sim_id, year) for reproducible cross-scenario comparison if self.config.crn_base_seed is not None: year_ss = np.random.SeedSequence([self.config.crn_base_seed, sim_id, year]) children = year_ss.spawn(2) self.loss_generator.reseed(int(children[0].generate_state(1)[0])) if manufacturer.stochastic_process is not None: manufacturer.stochastic_process.reset(int(children[1].generate_state(1)[0])) # Generate losses using ManufacturingLossGenerator revenue = manufacturer.calculate_revenue() if hasattr(self.loss_generator, "generate_losses"): events, _ = self.loss_generator.generate_losses( duration=1.0, revenue=float(revenue) ) else: raise AttributeError( f"Loss generator {type(self.loss_generator).__name__} has no generate_losses method" ) # Calculate total loss total_loss = sum(event.amount for event in events) annual_losses[year] = total_loss # Apply insurance PER OCCURRENCE (not aggregate) and create # ClaimLiability objects with LoC collateral (Issue #342). total_recovery = 0 total_retained = 0 for event in events: # Process each event separately through insurance claim_result = self.insurance_program.process_claim(event.amount) event_recovery = claim_result.get("insurance_recovery", 0) event_retained = event.amount - event_recovery total_recovery += event_recovery total_retained += event_retained # Create ClaimLiability and post collateral for the retained # portion. The liability reduces equity via the balance sheet # (equity = assets - liabilities). We do NOT also call # record_insurance_loss() to avoid double-counting: the # liability already reduces equity, so reducing operating # income would penalise equity a second time. if event_retained > 0: manufacturer.process_insurance_claim( claim_amount=event.amount, insurance_recovery=event_recovery, ) insurance_recoveries[year] = total_recovery retained_losses[year] = total_retained # Calculate insurance premium scaled by revenue # Premium should scale with exposure (revenue) current_revenue = manufacturer.calculate_revenue() base_revenue = ( self.manufacturer.config.initial_assets * self.manufacturer.config.asset_turnover_ratio ) revenue_scaling_factor = ( float(current_revenue) / float(base_revenue) if base_revenue > 0 else 1.0 ) # Calculate scaled annual premium base_annual_premium = self.insurance_program.calculate_annual_premium() annual_premium = base_annual_premium * revenue_scaling_factor if annual_premium > 0: manufacturer.record_insurance_premium(annual_premium) # Update manufacturer state with annual step # Apply stochastic if the manufacturer has a stochastic process apply_stochastic = ( manufacturer.stochastic_process is not None or self.config.apply_stochastic ) # Use config parameters instead of hardcoded values (Issue #349) manufacturer.step( letter_of_credit_rate=self.config.letter_of_credit_rate, growth_rate=self.config.growth_rate, time_resolution=self.config.time_resolution, apply_stochastic=apply_stochastic, ) # Prune old ledger entries to bound memory (Issue #315) if self.config.enable_ledger_pruning and year > 0: manufacturer.ledger.prune_entries(before_date=year) # Check for ruin using insolvency tolerance or is_ruined flag if ( float(manufacturer.equity) <= self.config.insolvency_tolerance or manufacturer.is_ruined ): # Mark ruin for all future evaluation points if self.config.ruin_evaluation: for eval_year in ruin_at_year: if year < eval_year: ruin_at_year[eval_year] = True ruin_occurred = True ruin_year = year break else: ruin_occurred = False ruin_year = None # Store trajectory if storage is enabled if self.trajectory_storage: self.trajectory_storage.store_simulation( sim_id=sim_id, annual_losses=annual_losses[: year + 1] if ruin_occurred else annual_losses, insurance_recoveries=( insurance_recoveries[: year + 1] if ruin_occurred else insurance_recoveries ), retained_losses=retained_losses[: year + 1] if ruin_occurred else retained_losses, final_assets=float(manufacturer.total_assets), initial_assets=float(self.manufacturer.total_assets), ruin_occurred=ruin_occurred, ruin_year=ruin_year, ) return { "final_assets": float(manufacturer.total_assets), "annual_losses": annual_losses, "insurance_recoveries": insurance_recoveries, "retained_losses": retained_losses, "ruin_at_year": ruin_at_year, # New field for periodic ruin tracking } def _combine_chunk_results(self, chunk_results: List[Dict[str, Any]]) -> SimulationResults: """Combine results from parallel chunks. Args: chunk_results: List of chunk result dictionaries Returns: Combined SimulationResults """ # Handle empty chunk results if not chunk_results: ruin_probability = {} if self.config.ruin_evaluation: for eval_year in self.config.ruin_evaluation: if eval_year <= self.config.n_years: ruin_probability[str(eval_year)] = 0.0 ruin_probability[str(self.config.n_years)] = 0.0 return SimulationResults( final_assets=np.array([]), annual_losses=np.array([]).reshape(0, self.config.n_years), insurance_recoveries=np.array([]).reshape(0, self.config.n_years), retained_losses=np.array([]).reshape(0, self.config.n_years), growth_rates=np.array([]), ruin_probability=ruin_probability, metrics={}, # Empty metrics for empty simulation convergence={}, # Empty convergence for empty simulation execution_time=0.0, config=self.config, time_series_aggregation=None, ) # Concatenate arrays final_assets = np.concatenate([r["final_assets"] for r in chunk_results]) annual_losses = np.vstack([r["annual_losses"] for r in chunk_results]) insurance_recoveries = np.vstack([r["insurance_recoveries"] for r in chunk_results]) retained_losses = np.vstack([r["retained_losses"] for r in chunk_results]) # Calculate derived metrics growth_rates = self._calculate_growth_rates(final_assets) # Aggregate periodic ruin probabilities ruin_probability = {} total_simulations = len(final_assets) if self.config.ruin_evaluation: for eval_year in self.config.ruin_evaluation: if eval_year <= self.config.n_years: # Count ruin occurrences for this evaluation year across all simulations ruin_count = 0 for chunk in chunk_results: if "ruin_at_year" in chunk: # ruin_at_year is a list of dictionaries, one for each simulation in the chunk for sim_ruin_data in chunk["ruin_at_year"]: if sim_ruin_data.get(eval_year, False): ruin_count += 1 ruin_probability[str(eval_year)] = ruin_count / total_simulations # Always add final ruin probability # Count both: companies with low final assets AND companies marked as ruined earlier if self.config.ruin_evaluation and chunk_results: # Count from ruin_at_year tracking (includes early bankruptcies) final_ruin_count = 0 for chunk in chunk_results: if "ruin_at_year" in chunk: for sim_ruin_data in chunk["ruin_at_year"]: if sim_ruin_data.get(self.config.n_years, False): final_ruin_count += 1 else: # Fallback to equity check if no tracking final_ruin_count = np.sum(np.less_equal(final_assets, self.config.insolvency_tolerance)) ruin_probability[str(self.config.n_years)] = float(final_ruin_count / total_simulations) return SimulationResults( final_assets=final_assets, annual_losses=annual_losses, insurance_recoveries=insurance_recoveries, retained_losses=retained_losses, growth_rates=growth_rates, ruin_probability=ruin_probability, metrics={}, convergence={}, execution_time=0, config=self.config, ) def _calculate_growth_rates(self, final_assets: np.ndarray) -> np.ndarray: """Calculate annualized growth rates. Args: final_assets: Final asset values Returns: Array of growth rates """ initial_assets = float(self.manufacturer.total_assets) n_years = self.config.n_years # Avoid division by zero and log of negative numbers valid_mask = (final_assets > 0) & (initial_assets > 0) growth_rates = np.zeros_like(final_assets, dtype=np.float64) if np.any(valid_mask): growth_rates[valid_mask] = np.log(final_assets[valid_mask] / initial_assets) / n_years return growth_rates def _calculate_metrics(self, results: SimulationResults) -> Dict[str, float]: """Calculate risk metrics from simulation results. Args: results: Simulation results Returns: Dictionary of risk metrics """ # Handle empty results gracefully if ( len(results.final_assets) == 0 or results.annual_losses is None or results.annual_losses.size == 0 ): # Return default metrics for empty simulations return { "mean_loss": 0.0, "median_loss": 0.0, "std_loss": 0.0, "var_95": 0.0, "var_99": 0.0, "var_995": 0.0, "tvar_95": 0.0, "tvar_99": 0.0, "tvar_995": 0.0, "expected_shortfall_99": 0.0, "max_loss": 0.0, "mean_recovery": 0.0, "mean_retained": 0.0, "mean_growth_rate": 0.0, "survival_rate": 1.0, "ruin_probability": 0.0, } # Total losses across all years total_losses = np.sum(results.annual_losses, axis=1) # Initialize risk metrics calculator risk_metrics = RiskMetrics(total_losses) metrics = { "mean_loss": np.mean(total_losses), "median_loss": np.median(total_losses), "std_loss": np.std(total_losses), "var_95": risk_metrics.var(0.95), "var_99": risk_metrics.var(0.99), "var_995": risk_metrics.var(0.995), "tvar_95": risk_metrics.tvar(0.95), "tvar_99": risk_metrics.tvar(0.99), "tvar_995": risk_metrics.tvar(0.995), "expected_shortfall_99": risk_metrics.expected_shortfall(0.99), "max_loss": np.max(total_losses), "mean_recovery": np.mean(np.sum(results.insurance_recoveries, axis=1)), "mean_retained": np.mean(np.sum(results.retained_losses, axis=1)), "mean_growth_rate": np.mean(results.growth_rates), "std_growth_rate": np.std(results.growth_rates), "sharpe_ratio": ( np.mean(results.growth_rates) / np.std(results.growth_rates) if np.std(results.growth_rates) > 0 else 0 ), } return metrics def _check_convergence(self, results: SimulationResults) -> Dict[str, ConvergenceStats]: """Check convergence of simulation results. Args: results: Simulation results Returns: Dictionary of convergence statistics """ # Reshape data into chains n_chains = min(self.config.n_chains, results.final_assets.shape[0] // 100) if n_chains < 2: # Not enough data for multi-chain convergence return {} # Split data into chains chain_size = len(results.final_assets) // n_chains chains_growth = np.array( [results.growth_rates[i * chain_size : (i + 1) * chain_size] for i in range(n_chains)] ) chains_losses = np.array( [ np.sum(results.annual_losses[i * chain_size : (i + 1) * chain_size], axis=1) for i in range(n_chains) ] ) # Stack chains for multiple metrics chains = np.stack([chains_growth, chains_losses], axis=2) # Check convergence convergence_stats = self.convergence_diagnostics.check_convergence( chains, metric_names=["growth_rate", "total_losses"] ) return convergence_stats def _perform_advanced_aggregation(self, results: SimulationResults) -> SimulationResults: """Perform advanced aggregation on simulation results. Args: results: Initial simulation results Returns: Enhanced results with aggregation data """ # Aggregate final assets if self.result_aggregator: results.aggregated_results = self.result_aggregator.aggregate(results.final_assets) # Time series aggregation if self.time_series_aggregator: # Aggregate annual losses results.time_series_aggregation = { "losses": self.time_series_aggregator.aggregate( results.annual_losses.T # Transpose to have time in rows ), "recoveries": self.time_series_aggregator.aggregate(results.insurance_recoveries.T), "retained": self.time_series_aggregator.aggregate(results.retained_losses.T), } # Statistical summary if self.summary_statistics: results.statistical_summary = self.summary_statistics.calculate_summary( results.final_assets ) # Generate report if requested if self.config.generate_summary_report: report_generator = SummaryReportGenerator(style=self.config.summary_report_format) results.summary_report = report_generator.generate_report( results.statistical_summary, title="Monte Carlo Simulation Summary", metadata={ "Simulations": self.config.n_simulations, "Years": self.config.n_years, "Ruin Probability": f"{results.ruin_probability.get(str(self.config.n_years), 0.0):.2%}", "Mean Growth Rate": f"{np.mean(results.growth_rates):.4f}", }, ) return results
[docs] def export_results( self, results: SimulationResults, filepath: Path, file_format: str = "csv" ) -> None: """Export simulation results to file. Args: results: Simulation results to export filepath: Output file path file_format: Export format ('csv', 'json', 'hdf5') """ if not results.aggregated_results: # Perform aggregation if not already done if self.result_aggregator: results.aggregated_results = self.result_aggregator.aggregate(results.final_assets) else: # Use default aggregation agg_config = AggregationConfig() aggregator = ResultAggregator(agg_config) results.aggregated_results = aggregator.aggregate(results.final_assets) # Export based on format if file_format.lower() == "csv": ResultExporter.to_csv(results.aggregated_results, filepath) elif file_format.lower() == "json": ResultExporter.to_json(results.aggregated_results, filepath) elif file_format.lower() == "hdf5": ResultExporter.to_hdf5(results.aggregated_results, filepath) else: raise ValueError(f"Unsupported export format: {file_format}")
[docs] def compute_bootstrap_confidence_intervals( self, results: SimulationResults, confidence_level: float = 0.95, n_bootstrap: int = 10000, method: str = "percentile", show_progress: bool = False, ) -> Dict[str, Tuple[float, float]]: """Compute bootstrap confidence intervals for key simulation metrics. Args: results: Simulation results to analyze. confidence_level: Confidence level for intervals (default 0.95). n_bootstrap: Number of bootstrap iterations (default 10000). method: Bootstrap method ('percentile' or 'bca'). show_progress: Whether to show progress bar. Returns: Dictionary mapping metric names to (lower, upper) confidence bounds. """ # Lazy import to avoid scipy issues in worker processes from .bootstrap_analysis import bootstrap_confidence_interval confidence_intervals = {} # Bootstrap CI for mean final assets _, ci = bootstrap_confidence_interval( results.final_assets, np.mean, confidence_level=confidence_level, n_bootstrap=n_bootstrap, method=method, seed=self.config.seed, ) confidence_intervals["Mean Final Assets"] = ci # Bootstrap CI for median final assets _, ci = bootstrap_confidence_interval( results.final_assets, np.median, confidence_level=confidence_level, n_bootstrap=n_bootstrap, method=method, seed=self.config.seed, ) confidence_intervals["Median Final Assets"] = ci # Bootstrap CI for mean growth rate _, ci = bootstrap_confidence_interval( results.growth_rates, np.mean, confidence_level=confidence_level, n_bootstrap=n_bootstrap, method=method, seed=self.config.seed, ) confidence_intervals["Mean Growth Rate"] = ci # Bootstrap CI for ruin probability # Create binary ruin indicator using insolvency tolerance ruin_indicator = (results.final_assets <= self.config.insolvency_tolerance).astype(float) _, ci = bootstrap_confidence_interval( ruin_indicator, np.mean, confidence_level=confidence_level, n_bootstrap=n_bootstrap, method=method, seed=self.config.seed, ) confidence_intervals["Ruin Probability"] = ci # Bootstrap CI for VaR if available if "var_99" in results.metrics: def var_99(x): return np.percentile(x, 99) _, ci = bootstrap_confidence_interval( results.final_assets, var_99, confidence_level=confidence_level, n_bootstrap=n_bootstrap, method=method, seed=self.config.seed, ) confidence_intervals["VaR(99%)"] = ci # Bootstrap CI for mean annual losses mean_annual_losses = np.mean( results.annual_losses, axis=1 ) # Mean across years for each simulation _, ci = bootstrap_confidence_interval( mean_annual_losses, np.mean, confidence_level=confidence_level, n_bootstrap=n_bootstrap, method=method, seed=self.config.seed, ) confidence_intervals["Mean Annual Losses"] = ci # Bootstrap CI for mean insurance recoveries mean_recoveries = np.mean(results.insurance_recoveries, axis=1) _, ci = bootstrap_confidence_interval( mean_recoveries, np.mean, confidence_level=confidence_level, n_bootstrap=n_bootstrap, method=method, seed=self.config.seed, ) confidence_intervals["Mean Insurance Recoveries"] = ci return confidence_intervals
def _get_cache_key(self) -> str: """Generate cache key for current configuration. Returns: Cache key string """ key_parts = [ f"n_sims_{self.config.n_simulations}", f"n_years_{self.config.n_years}", f"seed_{self.config.seed}", f"ins_{hashlib.sha256(str(self.insurance_program).encode()).hexdigest()[:16]}", f"mfg_{hashlib.sha256(str(self.manufacturer).encode()).hexdigest()[:16]}", ] return "_".join(key_parts) def _save_cache(self, cache_key: str, results: SimulationResults) -> None: """Save results to cache. Args: cache_key: Cache key results: Results to cache """ cache_file = self.cache_dir / f"{cache_key}.pkl" try: with open(cache_file, "wb") as f: safe_dump(results, f) except (IOError, OSError, pickle.PickleError) as e: warnings.warn(f"Failed to save cache: {e}") def _load_cache(self, cache_key: str) -> Optional[SimulationResults]: """Load results from cache. Args: cache_key: Cache key Returns: Cached results or None """ cache_file = self.cache_dir / f"{cache_key}.pkl" if cache_file.exists(): try: with open(cache_file, "rb") as f: loaded_data = safe_load(f) return loaded_data # type: ignore except (IOError, OSError, pickle.PickleError, EOFError, ValueError) as e: warnings.warn(f"Failed to load cache: {e}") return None def _save_checkpoint(self, iteration: int, *arrays) -> None: """Save checkpoint during simulation. Args: iteration: Current iteration number arrays: Arrays to save """ checkpoint_file = self.cache_dir / f"checkpoint_{iteration}.npz" try: np.savez_compressed(checkpoint_file, iteration=iteration, *arrays) except (IOError, OSError, ValueError) as e: warnings.warn(f"Failed to save checkpoint: {e}") def _initialize_simulation_arrays(self) -> Dict[str, np.ndarray]: """Initialize arrays for simulation results.""" n_sims = self.config.n_simulations n_years = self.config.n_years dtype = np.float32 if self.config.use_float32 else np.float64 return { "final_assets": np.zeros(n_sims, dtype=dtype), "annual_losses": np.zeros((n_sims, n_years), dtype=dtype), "insurance_recoveries": np.zeros((n_sims, n_years), dtype=dtype), "retained_losses": np.zeros((n_sims, n_years), dtype=dtype), } def _check_convergence_at_interval( self, completed_iterations: int, final_assets: np.ndarray ) -> float: """Check convergence at specified interval.""" partial_growth = self._calculate_growth_rates(final_assets[:completed_iterations]) # Split into chains for convergence check n_chains = min(4, completed_iterations // 250) if n_chains >= 2: chain_size = completed_iterations // n_chains chains = np.array( [partial_growth[j * chain_size : (j + 1) * chain_size] for j in range(n_chains)] ) r_hat = self.convergence_diagnostics.calculate_r_hat(chains) else: r_hat = float("inf") return r_hat def _run_simulation_batch( self, batch_range: range, arrays: Dict[str, np.ndarray], batch_config: Dict[str, Any] ) -> int: """Run a batch of simulations with monitoring.""" monitor = batch_config["monitor"] check_intervals = batch_config["check_intervals"] early_stopping = batch_config["early_stopping"] show_progress = batch_config["show_progress"] completed_iterations = 0 for i in batch_range: sim_results = self._run_single_simulation(i) arrays["final_assets"][i] = sim_results["final_assets"] arrays["annual_losses"][i] = sim_results["annual_losses"] arrays["insurance_recoveries"][i] = sim_results["insurance_recoveries"] arrays["retained_losses"][i] = sim_results["retained_losses"] completed_iterations = i + 1 # Check if we should perform convergence check if completed_iterations in check_intervals and completed_iterations >= 1000: r_hat = self._check_convergence_at_interval( completed_iterations, arrays["final_assets"] ) should_continue = monitor.update(completed_iterations, r_hat) if early_stopping and not should_continue: if show_progress: print( f"\n✓ Early stopping: Convergence achieved at {completed_iterations:,} iterations" ) break else: monitor.update(completed_iterations) return completed_iterations
[docs] def run_with_progress_monitoring( self, check_intervals: Optional[List[int]] = None, convergence_threshold: float = 1.1, early_stopping: bool = True, show_progress: bool = True, ) -> SimulationResults: """Run simulation with progress tracking and convergence monitoring.""" if check_intervals is None: check_intervals = [10_000, 25_000, 50_000, 100_000] check_intervals = [i for i in check_intervals if i <= self.config.n_simulations] # Initialize monitor and arrays monitor = ProgressMonitor( total_iterations=self.config.n_simulations, check_intervals=check_intervals, update_frequency=max(self.config.n_simulations // 100, 100), show_console=show_progress, convergence_threshold=convergence_threshold, ) start_time = time.time() arrays = self._initialize_simulation_arrays() # Run simulations in batches batch_size = min(1000, self.config.n_simulations // 10) completed_iterations = 0 for batch_start in range(0, self.config.n_simulations, batch_size): batch_end = min(batch_start + batch_size, self.config.n_simulations) completed_iterations = self._run_simulation_batch( range(batch_start, batch_end), arrays, { "monitor": monitor, "check_intervals": check_intervals, "early_stopping": early_stopping, "show_progress": show_progress, }, ) if early_stopping and monitor.converged: break monitor.finalize() # Trim arrays if stopped early if completed_iterations < self.config.n_simulations: for key in arrays: if key == "final_assets": arrays[key] = arrays[key][:completed_iterations] else: arrays[key] = arrays[key][:completed_iterations] # Create results growth_rates = self._calculate_growth_rates(arrays["final_assets"]) # Calculate ruin probability using insolvency_tolerance for consistency # (issue #299: unify threshold across all execution paths) ruin_probability = { str(self.config.n_years): float( np.mean(arrays["final_assets"] <= self.config.insolvency_tolerance) ) } results = SimulationResults( final_assets=arrays["final_assets"], annual_losses=arrays["annual_losses"], insurance_recoveries=arrays["insurance_recoveries"], retained_losses=arrays["retained_losses"], growth_rates=growth_rates, ruin_probability=ruin_probability, metrics={}, convergence={}, execution_time=time.time() - start_time, config=self.config, ) results.metrics = self._calculate_metrics(results) results.convergence = self._check_convergence(results) # Add progress summary to metrics monitor.get_stats() convergence_summary = monitor.generate_convergence_summary() results.metrics["actual_iterations"] = completed_iterations results.metrics["convergence_achieved"] = monitor.converged results.metrics["convergence_iteration"] = ( monitor.converged_at if monitor.converged_at else 0 ) results.metrics["monitoring_overhead_pct"] = convergence_summary.get( "performance_overhead_pct", 0 ) # Store ESS information if results.convergence: for metric_name, conv_stats in results.convergence.items(): results.metrics[f"ess_{metric_name}"] = conv_stats.ess return results
[docs] def run_with_convergence_monitoring( self, target_r_hat: float = 1.05, check_interval: int = 10000, max_iterations: Optional[int] = None, ) -> SimulationResults: """Run simulation with automatic convergence monitoring. Args: target_r_hat: Target R-hat for convergence check_interval: Check convergence every N simulations max_iterations: Maximum iterations (None for no limit) Returns: Converged simulation results """ if max_iterations is None: max_iterations = self.config.n_simulations * 10 # Save original config values that will be temporarily modified original_n_sims = self.config.n_simulations original_seed = self.config.seed self.config.n_simulations = check_interval # Use a local variable for seed advancement so config.seed is not # permanently mutated (issue #299). batch_seed = self.config.seed all_results = [] total_iterations = 0 converged = False try: while not converged and total_iterations < max_iterations: # Set seed for this batch self.config.seed = batch_seed # Run batch batch_results = self.run() all_results.append(batch_results) total_iterations += check_interval # Combine all results so far if len(all_results) > 1: combined = self._combine_multiple_results(all_results) else: combined = batch_results # Check convergence convergence = self._check_convergence(combined) if convergence: max_r_hat = max(stat.r_hat for stat in convergence.values()) converged = max_r_hat < target_r_hat if self.config.progress_bar: print(f"Iteration {total_iterations}: R-hat = {max_r_hat:.3f}") # Advance seed for next batch if batch_seed is not None: batch_seed += check_interval finally: # Restore original config regardless of how we exit self.config.n_simulations = original_n_sims self.config.seed = original_seed return combined
def _combine_multiple_results(self, results_list: List[SimulationResults]) -> SimulationResults: """Combine multiple simulation results. Args: results_list: List of SimulationResults to combine Returns: Combined SimulationResults """ # Concatenate arrays final_assets = np.concatenate([r.final_assets for r in results_list]) annual_losses = np.vstack([r.annual_losses for r in results_list]) insurance_recoveries = np.vstack([r.insurance_recoveries for r in results_list]) retained_losses = np.vstack([r.retained_losses for r in results_list]) growth_rates = np.concatenate([r.growth_rates for r in results_list]) # Recalculate metrics combined = SimulationResults( final_assets=final_assets, annual_losses=annual_losses, insurance_recoveries=insurance_recoveries, retained_losses=retained_losses, growth_rates=growth_rates, ruin_probability={ str(self.config.n_years): float( np.mean(final_assets <= self.config.insolvency_tolerance) ) }, metrics={}, convergence={}, execution_time=sum(r.execution_time for r in results_list), config=self.config, ) combined.metrics = self._calculate_metrics(combined) combined.convergence = self._check_convergence(combined) return combined
[docs] def estimate_ruin_probability( self, config: Optional[RuinProbabilityConfig] = None, ) -> RuinProbabilityResults: """Estimate ruin probability over multiple time horizons. Delegates to RuinProbabilityAnalyzer for specialized analysis. Args: config: Configuration for ruin probability estimation Returns: RuinProbabilityResults with comprehensive bankruptcy analysis """ analyzer = RuinProbabilityAnalyzer( manufacturer=self.manufacturer, loss_generator=self.loss_generator, insurance_program=self.insurance_program, config=self.config, ) return analyzer.analyze_ruin_probability(config)