Source code for ergodic_insurance.monte_carlo_worker

"""Standalone worker function for multiprocessing Monte Carlo simulations."""

import copy
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np

from .insurance_program import InsuranceProgram
from .loss_distributions import ManufacturingLossGenerator
from .manufacturer import WidgetManufacturer


[docs] def run_chunk_standalone( chunk: Tuple[int, int, Optional[int]], loss_generator: ManufacturingLossGenerator, insurance_program: InsuranceProgram, manufacturer: WidgetManufacturer, config_dict: Dict[str, Any], ) -> Dict[str, Union[np.ndarray, List[Dict[int, bool]]]]: """Standalone function to run a chunk of simulations for multiprocessing. This function is independent of the MonteCarloEngine class and can be pickled for multiprocessing on all platforms including Windows. Args: chunk: Tuple of (start_idx, end_idx, seed) loss_generator: Loss generator instance insurance_program: Insurance program instance manufacturer: Manufacturer instance config_dict: Configuration dictionary with necessary parameters Returns: Dictionary with simulation results for the chunk """ start_idx, end_idx, seed = chunk n_sims = end_idx - start_idx n_years = config_dict["n_years"] dtype = np.float32 if config_dict.get("use_float32", False) else np.float64 # Re-seed the loss generator's internal RandomState objects for this chunk. # np.random.seed() only affects the global numpy state and has no effect on # the loss generator's per-instance RandomState objects that were pickled # from the parent process. Without this, every chunk produces identical # loss sequences. See issue #299. if seed is not None: loss_generator.reseed(seed) # Pre-allocate arrays final_assets = np.zeros(n_sims, dtype=dtype) annual_losses = np.zeros((n_sims, n_years), dtype=dtype) insurance_recoveries = np.zeros((n_sims, n_years), dtype=dtype) retained_losses = np.zeros((n_sims, n_years), dtype=dtype) # Track periodic ruin if requested ruin_evaluation = config_dict.get("ruin_evaluation", None) ruin_at_year_all = [] # Pre-copy stochastic process once outside the hot loop (Issue #366). # Most runs have stochastic_process=None so this is a no-op. _has_stochastic = manufacturer.stochastic_process is not None # Run simulations in chunk for i in range(n_sims): # Create a fresh manufacturer from config instead of deep-copying the # entire object graph (Issue #366). WidgetManufacturer.__init__ builds # a clean ledger and balance sheet from config alone — no traversal of # the existing manufacturer's ledger entries, caches, or history. sim_manufacturer = WidgetManufacturer.create_fresh( manufacturer.config, stochastic_process=( copy.deepcopy(manufacturer.stochastic_process) if _has_stochastic else None ), ) # Deep-copy insurance program per simulation to avoid state leakage (Issue #348) sim_insurance_program = copy.deepcopy(insurance_program) # Run single simulation sim_annual_losses = np.zeros(n_years, dtype=dtype) sim_insurance_recoveries = np.zeros(n_years, dtype=dtype) sim_retained_losses = np.zeros(n_years, dtype=dtype) # Track ruin at evaluation points for this simulation ruin_at_year = {} if ruin_evaluation: for eval_year in ruin_evaluation: if eval_year <= n_years: ruin_at_year[eval_year] = False # Extract settings from config_dict letter_of_credit_rate = config_dict.get("letter_of_credit_rate", 0.015) growth_rate = config_dict.get("growth_rate", 0) time_resolution = config_dict.get("time_resolution", "annual") apply_stochastic = config_dict.get("apply_stochastic", False) for year in range(n_years): # Reset insurance program aggregate limits at start of each policy year (Issue #348) if year > 0: sim_insurance_program.reset_annual() # Generate losses for the year revenue = sim_manufacturer.calculate_revenue() # Unified ordering: losses → claims → premium → step (Issue #349) # Use ManufacturingLossGenerator to generate losses # Note: Loss generator interface requires float for numpy compatibility # This is the documented conversion boundary (R2 from issue #278) if hasattr(loss_generator, "generate_losses"): year_losses, _ = loss_generator.generate_losses( duration=1.0, revenue=float(revenue) ) else: raise AttributeError( f"Loss generator {type(loss_generator).__name__} has no generate_losses method" ) # Sum loss amounts using native float (Issue #368: remove Decimal # from hot loop — results are stored in float64 arrays anyway) total_loss = sum(loss.amount for loss in year_losses) sim_annual_losses[year] = total_loss # Apply insurance PER OCCURRENCE (not aggregate) to correctly apply # per-occurrence deductibles and limits (Issue #348) total_recovery = 0.0 total_retained = 0.0 for loss_event in year_losses: if loss_event.amount > 0: claim_result = sim_insurance_program.process_claim(loss_event.amount) event_recovery = claim_result["insurance_recovery"] event_retained = loss_event.amount - event_recovery total_recovery += event_recovery total_retained += event_retained # Record the insurance loss for proper accounting # The loss will be deducted from operating income in calculate_operating_income # Use public method to maintain encapsulation (Issue #276) if event_retained > 0: sim_manufacturer.record_insurance_loss(event_retained) sim_insurance_recoveries[year] = total_recovery sim_retained_losses[year] = total_retained # Calculate and pay insurance premiums AFTER claims (Issue #349) # Premium scales proportionally with revenue growth base_revenue = float( manufacturer.config.initial_assets * manufacturer.config.asset_turnover_ratio ) revenue_multiplier = float(revenue) / base_revenue if base_revenue > 0 else 1.0 base_premium = sim_insurance_program.calculate_annual_premium() annual_premium = base_premium * revenue_multiplier # Record the insurance premium for accounting purposes # The premium will be deducted from operating income in calculate_operating_income # Use public method to maintain encapsulation (Issue #276) if annual_premium > 0: sim_manufacturer.record_insurance_premium(annual_premium, is_annual=False) # Run business operations (growth, etc.) sim_manufacturer.step( letter_of_credit_rate, growth_rate, time_resolution, apply_stochastic ) # Check for ruin using insolvency tolerance consistent with engine # (issue #299: unify threshold between worker and engine) insolvency_tolerance = config_dict.get("insolvency_tolerance", 10_000) if float(sim_manufacturer.equity) <= insolvency_tolerance: # Mark ruin for all future evaluation points if ruin_evaluation: for eval_year in ruin_at_year: if year < eval_year: ruin_at_year[eval_year] = True break final_assets[i] = float(sim_manufacturer.total_assets) annual_losses[i] = sim_annual_losses insurance_recoveries[i] = sim_insurance_recoveries retained_losses[i] = sim_retained_losses # Store periodic ruin data if ruin_evaluation: ruin_at_year_all.append(ruin_at_year) result: Dict[str, Union[np.ndarray, List[Dict[int, bool]]]] = { "final_assets": final_assets, "annual_losses": annual_losses, "insurance_recoveries": insurance_recoveries, "retained_losses": retained_losses, } # Add periodic ruin data if tracked if ruin_evaluation: result["ruin_at_year"] = ruin_at_year_all return result