"""Ruin probability analysis for insurance optimization.
This module provides specialized classes and methods for analyzing bankruptcy
and ruin probabilities in insurance scenarios.
"""
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass, field
import time
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
from tqdm import tqdm
[docs]
@dataclass
class RuinProbabilityConfig:
"""Configuration for ruin probability analysis."""
time_horizons: List[int] = field(default_factory=lambda: [1, 5, 10])
n_simulations: int = 10000
min_assets_threshold: float = 1_000_000
min_equity_threshold: float = 0.0
debt_service_coverage_ratio: float = 1.25
consecutive_negative_periods: int = 3
early_stopping: bool = True
parallel: bool = True
n_workers: Optional[int] = None
seed: Optional[int] = None
n_bootstrap: int = 1000
bootstrap_confidence_level: float = 0.95
[docs]
@dataclass
class RuinProbabilityResults:
"""Results from ruin probability analysis.
Attributes:
time_horizons: Array of time horizons analyzed (in years).
ruin_probabilities: Probability of ruin at each time horizon.
confidence_intervals: Bootstrap confidence intervals for probabilities.
bankruptcy_causes: Distribution of bankruptcy causes by horizon.
survival_curves: Survival probability curves over time.
execution_time: Total execution time in seconds.
n_simulations: Number of simulations run.
convergence_achieved: Whether convergence criteria were met.
mid_year_ruin_count: Number of simulations with mid-year ruin (Issue #279).
ruin_month_distribution: Distribution of ruin events by month (0-11).
"""
time_horizons: np.ndarray
ruin_probabilities: np.ndarray
confidence_intervals: np.ndarray
bankruptcy_causes: Dict[str, np.ndarray]
survival_curves: np.ndarray
execution_time: float
n_simulations: int
convergence_achieved: bool
# Issue #279: Track mid-year ruin events
mid_year_ruin_count: int = 0
ruin_month_distribution: Optional[Dict[int, int]] = None
[docs]
def summary(self) -> str:
"""Generate summary report."""
lines = [
"Ruin Probability Analysis Results",
"=" * 40,
f"Simulations: {self.n_simulations:,}",
f"Execution time: {self.execution_time:.2f} seconds",
f"Convergence achieved: {self.convergence_achieved}",
"",
"Ruin Probabilities by Time Horizon:",
]
for i, horizon in enumerate(self.time_horizons):
prob = self.ruin_probabilities[i]
ci_lower, ci_upper = self.confidence_intervals[i]
lines.append(f" {horizon:3d} years: {prob:6.2%} [{ci_lower:6.2%}, {ci_upper:6.2%}]")
lines.append("")
lines.append("Bankruptcy Causes (at 10 years):")
if len(self.time_horizons) > 0:
idx_10 = np.where(self.time_horizons == 10)[0]
if len(idx_10) > 0:
idx = idx_10[0]
for cause, probs in self.bankruptcy_causes.items():
if idx < len(probs):
lines.append(f" {cause:20s}: {probs[idx]:6.2%}")
# Issue #279: Add mid-year ruin statistics
if self.mid_year_ruin_count > 0:
lines.append("")
lines.append("Mid-Year Ruin Analysis:")
mid_year_pct = self.mid_year_ruin_count / self.n_simulations * 100
lines.append(
f" Mid-year ruin events: {self.mid_year_ruin_count:,} ({mid_year_pct:.1f}%)"
)
if self.ruin_month_distribution:
lines.append(" Ruin by month:")
month_names = [
"Jan",
"Feb",
"Mar",
"Apr",
"May",
"Jun",
"Jul",
"Aug",
"Sep",
"Oct",
"Nov",
"Dec",
]
for month, count in sorted(self.ruin_month_distribution.items()):
if count > 0:
lines.append(f" {month_names[month]}: {count:,}")
return "\n".join(lines)
[docs]
class RuinProbabilityAnalyzer:
"""Analyzer for ruin probability calculations."""
def __init__(self, manufacturer, loss_generator, insurance_program, config):
"""Initialize analyzer.
Args:
manufacturer: WidgetManufacturer instance
loss_generator: ManufacturingLossGenerator instance
insurance_program: InsuranceProgram instance
config: SimulationConfig instance
"""
self.manufacturer = manufacturer
self.loss_generator = loss_generator
self.insurance_program = insurance_program
self.config = config
[docs]
def analyze_ruin_probability(
self,
config: Optional[RuinProbabilityConfig] = None,
) -> RuinProbabilityResults:
"""Analyze ruin probability across multiple time horizons.
Args:
config: Configuration for analysis
Returns:
RuinProbabilityResults with analysis results
"""
config = config or RuinProbabilityConfig()
start_time = time.time()
# Run simulations
if config.parallel and config.n_simulations > 1000:
simulation_results = self._run_ruin_simulations_parallel(config)
else:
simulation_results = self._run_ruin_simulations_sequential(config)
# Calculate results for each time horizon
horizon_analysis = self._analyze_horizons(simulation_results, config)
# Calculate bootstrap confidence intervals
confidence_intervals = self._calculate_bootstrap_ci(
simulation_results["bankruptcy_years"],
config.time_horizons,
config.n_bootstrap,
config.bootstrap_confidence_level,
)
# Check convergence
convergence_achieved = self._check_ruin_convergence(simulation_results["bankruptcy_years"])
# Convert survival curves to padded array
padded_curves = self._pad_survival_curves(horizon_analysis["survival_curves"])
return RuinProbabilityResults(
time_horizons=np.array(config.time_horizons),
ruin_probabilities=horizon_analysis["ruin_probs"],
confidence_intervals=confidence_intervals,
bankruptcy_causes=horizon_analysis["bankruptcy_causes"],
survival_curves=padded_curves,
execution_time=time.time() - start_time,
n_simulations=config.n_simulations,
convergence_achieved=convergence_achieved,
# Issue #279: Include mid-year ruin statistics
mid_year_ruin_count=simulation_results.get("mid_year_ruin_count", 0),
ruin_month_distribution=simulation_results.get("ruin_month_distribution"),
)
def _analyze_horizons(self, simulation_results, config):
"""Analyze ruin data for each time horizon.
Args:
simulation_results: Results from simulations
config: Configuration
Returns:
Dict with ruin_probs, bankruptcy_causes, survival_curves
"""
ruin_probs = np.zeros(len(config.time_horizons))
bankruptcy_causes = {
"asset_threshold": np.zeros(len(config.time_horizons)),
"equity_threshold": np.zeros(len(config.time_horizons)),
"consecutive_negative": np.zeros(len(config.time_horizons)),
"debt_service": np.zeros(len(config.time_horizons)),
}
survival_curves = []
for i, horizon in enumerate(config.time_horizons):
# Calculate ruin probability
horizon_data = simulation_results["bankruptcy_years"] <= horizon
ruin_probs[i] = np.mean(horizon_data)
# Track bankruptcy causes
for cause, cause_data in bankruptcy_causes.items():
cause_mask = simulation_results["bankruptcy_causes"][cause][:, horizon - 1]
# Handle empty array when no bankruptcies occurred
bankrupted_subset = cause_mask[horizon_data]
if len(bankrupted_subset) > 0:
cause_data[i] = np.mean(bankrupted_subset)
else:
cause_data[i] = 0.0
# Calculate survival curve
bankruptcy_at_year = np.zeros(horizon)
for y in range(1, horizon + 1):
bankruptcy_at_year[y - 1] = np.sum(simulation_results["bankruptcy_years"] == y)
survival_curve = 1.0 - np.cumsum(bankruptcy_at_year) / config.n_simulations
survival_curves.append(survival_curve)
return {
"ruin_probs": ruin_probs,
"bankruptcy_causes": bankruptcy_causes,
"survival_curves": survival_curves,
}
def _pad_survival_curves(self, survival_curves):
"""Pad survival curves to uniform length.
Args:
survival_curves: List of survival curves
Returns:
Numpy array with padded curves
"""
max_len = max(len(curve) for curve in survival_curves) if survival_curves else 0
padded_curves = np.zeros((len(survival_curves), max_len))
for i, curve in enumerate(survival_curves):
padded_curves[i, : len(curve)] = curve
return padded_curves
def _run_ruin_simulations_sequential(self, config: RuinProbabilityConfig) -> Dict[str, Any]:
"""Run ruin probability simulations sequentially."""
max_horizon = max(config.time_horizons)
n_sims = config.n_simulations
# Pre-allocate arrays
bankruptcy_years = np.full(n_sims, max_horizon + 1, dtype=np.int32)
bankruptcy_causes = {
"asset_threshold": np.zeros((n_sims, max_horizon), dtype=bool),
"equity_threshold": np.zeros((n_sims, max_horizon), dtype=bool),
"consecutive_negative": np.zeros((n_sims, max_horizon), dtype=bool),
"debt_service": np.zeros((n_sims, max_horizon), dtype=bool),
}
# Issue #279: Track mid-year ruin events
mid_year_ruin_count = 0
ruin_month_distribution: Dict[int, int] = {}
iterator = range(n_sims)
if self.config.progress_bar:
iterator = tqdm(iterator, desc="Ruin probability simulations")
for sim_id in iterator:
result = self._run_single_ruin_simulation(sim_id, max_horizon, config)
bankruptcy_years[sim_id] = result["bankruptcy_year"]
for cause, cause_data in bankruptcy_causes.items():
cause_data[sim_id] = result["causes"][cause]
# Issue #279: Collect mid-year ruin statistics
if result.get("is_mid_year_ruin", False):
mid_year_ruin_count += 1
ruin_month = result.get("ruin_month")
if ruin_month is not None:
ruin_month_distribution[ruin_month] = (
ruin_month_distribution.get(ruin_month, 0) + 1
)
return {
"bankruptcy_years": bankruptcy_years,
"bankruptcy_causes": bankruptcy_causes,
"mid_year_ruin_count": mid_year_ruin_count,
"ruin_month_distribution": ruin_month_distribution,
}
def _create_simulation_chunks(
self, config: RuinProbabilityConfig, max_horizon: int
) -> List[Tuple[int, int, int, RuinProbabilityConfig, Optional[int]]]:
"""Create chunks for parallel processing."""
n_sims = config.n_simulations
n_workers = config.n_workers or 4
chunk_size = max(100, n_sims // (n_workers * 10))
chunks = []
for i in range(0, n_sims, chunk_size):
chunk_end = min(i + chunk_size, n_sims)
chunk_seed = None if config.seed is None else config.seed + i
chunks.append((i, chunk_end, max_horizon, config, chunk_seed))
return chunks
def _run_ruin_simulations_parallel(self, config: RuinProbabilityConfig) -> Dict[str, Any]:
"""Run ruin probability simulations in parallel."""
max_horizon = max(config.time_horizons)
chunks = self._create_simulation_chunks(config, max_horizon)
# Run chunks in parallel
all_results = []
with ProcessPoolExecutor(max_workers=config.n_workers or 4) as executor:
futures = {executor.submit(self._run_ruin_chunk, chunk): chunk for chunk in chunks}
if self.config.progress_bar:
pbar = tqdm(total=len(chunks), desc="Processing ruin chunks")
for future in as_completed(futures):
all_results.append(future.result())
if self.config.progress_bar:
pbar.update(1)
if self.config.progress_bar:
pbar.close()
# Combine results
return self._combine_ruin_results(all_results)
def _run_ruin_chunk(
self,
chunk: Tuple[int, int, int, RuinProbabilityConfig, Optional[int]],
) -> Dict[str, Any]:
"""Run a chunk of ruin simulations."""
start_idx, end_idx, max_horizon, config, seed = chunk
n_sims = end_idx - start_idx
# Create local RNG for this chunk (loss generator has its own internal RNG)
_rng = np.random.default_rng(seed)
# Pre-allocate arrays
bankruptcy_years = np.full(n_sims, max_horizon + 1, dtype=np.int32)
bankruptcy_causes = {
"asset_threshold": np.zeros((n_sims, max_horizon), dtype=bool),
"equity_threshold": np.zeros((n_sims, max_horizon), dtype=bool),
"consecutive_negative": np.zeros((n_sims, max_horizon), dtype=bool),
"debt_service": np.zeros((n_sims, max_horizon), dtype=bool),
}
# Issue #279: Track mid-year ruin events
mid_year_ruin_count = 0
ruin_month_distribution: Dict[int, int] = {}
for i in range(n_sims):
result = self._run_single_ruin_simulation(start_idx + i, max_horizon, config)
bankruptcy_years[i] = result["bankruptcy_year"]
for cause, cause_data in bankruptcy_causes.items():
cause_data[i] = result["causes"][cause]
# Issue #279: Collect mid-year ruin statistics
if result.get("is_mid_year_ruin", False):
mid_year_ruin_count += 1
ruin_month = result.get("ruin_month")
if ruin_month is not None:
ruin_month_distribution[ruin_month] = (
ruin_month_distribution.get(ruin_month, 0) + 1
)
return {
"bankruptcy_years": bankruptcy_years,
"bankruptcy_causes": bankruptcy_causes,
"mid_year_ruin_count": mid_year_ruin_count,
"ruin_month_distribution": ruin_month_distribution,
}
def _check_bankruptcy_conditions(
self,
manufacturer: Any,
metrics: Dict[str, float],
year: int,
config: RuinProbabilityConfig,
causes: Dict[str, np.ndarray],
consecutive_negative_count: int,
) -> Tuple[bool, int]:
"""Check bankruptcy conditions and update causes.
Returns:
Tuple of (is_bankrupt, updated_consecutive_negative_count)
"""
is_bankrupt = False
current_assets = manufacturer.total_assets
current_equity = metrics.get("equity", 0)
# 0. Check manufacturer's is_ruined flag (set by check_solvency)
# This is the authoritative insolvency status from the manufacturer
if hasattr(manufacturer, "is_ruined") and manufacturer.is_ruined:
# Mark equity threshold as the primary cause since that's what triggers is_ruined
causes["equity_threshold"][year] = True
is_bankrupt = True
# 1. Asset threshold
if current_assets <= config.min_assets_threshold:
causes["asset_threshold"][year] = True
is_bankrupt = True
# 2. Equity threshold
if current_equity <= config.min_equity_threshold:
causes["equity_threshold"][year] = True
is_bankrupt = True
# 3. Consecutive negative equity
if current_equity < 0:
consecutive_negative_count += 1
if consecutive_negative_count >= config.consecutive_negative_periods:
causes["consecutive_negative"][year] = True
is_bankrupt = True
else:
consecutive_negative_count = 0
# 4. Debt service coverage (simplified)
if hasattr(manufacturer, "debt") and manufacturer.debt > 0:
debt_service = manufacturer.debt * 0.08 # Assume 8% debt service
operating_income = metrics.get("operating_income", 0)
if operating_income > 0 and debt_service > 0:
coverage_ratio = operating_income / debt_service
if coverage_ratio < config.debt_service_coverage_ratio:
causes["debt_service"][year] = True
is_bankrupt = True
return is_bankrupt, consecutive_negative_count
def _process_simulation_year(
self,
manufacturer: Any,
year: int,
) -> Dict[str, float]:
"""Process a single year of simulation."""
# Update state FIRST (process year's normal operations)
metrics: Dict[str, float] = manufacturer.step(growth_rate=0.0)
# Then apply losses at END of year
# This prevents newly-created liabilities from being paid in the same year
revenue = manufacturer.calculate_revenue()
events, _ = self.loss_generator.generate_losses(duration=1.0, revenue=revenue)
# Calculate and apply insurance
total_loss = sum(event.amount for event in events)
claim_result = self.insurance_program.process_claim(total_loss)
recovery = claim_result.get("total_recovery", 0)
retained = total_loss - recovery
# Apply retained loss using proper claim processing to enforce limited liability
# This ensures cash never goes negative and equity is properly floored at $0
if retained > 0:
manufacturer.process_uninsured_claim(
claim_amount=retained,
immediate_payment=False, # Create liability with payment schedule starting next year
)
return metrics
def _run_single_ruin_simulation(
self,
sim_id: int,
max_horizon: int,
config: RuinProbabilityConfig,
) -> Dict[str, Any]:
"""Run a single ruin probability simulation.
Tracks multiple bankruptcy conditions with early stopping.
Also tracks mid-year ruin events (Issue #279).
"""
manufacturer = self.manufacturer.copy()
causes = {
"asset_threshold": np.zeros(max_horizon, dtype=bool),
"equity_threshold": np.zeros(max_horizon, dtype=bool),
"consecutive_negative": np.zeros(max_horizon, dtype=bool),
"debt_service": np.zeros(max_horizon, dtype=bool),
}
consecutive_negative_count = 0
bankruptcy_year = max_horizon + 1
is_bankrupt = False
# Issue #279: Track mid-year ruin events
is_mid_year_ruin = False
ruin_month: Optional[int] = None
for year in range(max_horizon):
metrics = self._process_simulation_year(manufacturer, year)
# Issue #279: Check if this was a mid-year ruin event
if manufacturer.is_ruined and manufacturer.ruin_month is not None:
is_mid_year_ruin = True
ruin_month = manufacturer.ruin_month
# Check bankruptcy
is_bankrupt, consecutive_negative_count = self._check_bankruptcy_conditions(
manufacturer, metrics, year, config, causes, consecutive_negative_count
)
# Early stopping if bankrupt
if is_bankrupt:
bankruptcy_year = year + 1
if config.early_stopping:
break
# If no bankruptcy occurred, keep the default year beyond horizon
if not is_bankrupt:
bankruptcy_year = max_horizon + 1
return {
"bankruptcy_year": bankruptcy_year,
"causes": causes,
"is_mid_year_ruin": is_mid_year_ruin,
"ruin_month": ruin_month,
}
def _combine_ruin_results(self, chunk_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Combine ruin simulation results from parallel chunks."""
bankruptcy_years = np.concatenate([r["bankruptcy_years"] for r in chunk_results])
# Combine bankruptcy causes
all_causes = {}
for cause in [
"asset_threshold",
"equity_threshold",
"consecutive_negative",
"debt_service",
]:
cause_arrays = [r["bankruptcy_causes"][cause] for r in chunk_results]
all_causes[cause] = np.vstack(cause_arrays)
# Issue #279: Combine mid-year ruin statistics
total_mid_year_ruin = sum(r.get("mid_year_ruin_count", 0) for r in chunk_results)
combined_ruin_months: Dict[int, int] = {}
for r in chunk_results:
chunk_months = r.get("ruin_month_distribution", {})
for month, count in chunk_months.items():
combined_ruin_months[month] = combined_ruin_months.get(month, 0) + count
return {
"bankruptcy_years": bankruptcy_years,
"bankruptcy_causes": all_causes,
"mid_year_ruin_count": total_mid_year_ruin,
"ruin_month_distribution": combined_ruin_months,
}
def _calculate_bootstrap_ci(
self,
bankruptcy_years: np.ndarray,
time_horizons: List[int],
n_bootstrap: int,
confidence_level: float,
) -> np.ndarray:
"""Calculate bootstrap confidence intervals for ruin probabilities."""
rng = np.random.default_rng()
n_sims = len(bankruptcy_years)
confidence_intervals = np.zeros((len(time_horizons), 2))
for i, horizon in enumerate(time_horizons):
bootstrap_probs = []
for _ in range(n_bootstrap):
# Resample with replacement
bootstrap_sample = rng.choice(bankruptcy_years, size=n_sims, replace=True)
prob = np.mean(bootstrap_sample <= horizon)
bootstrap_probs.append(prob)
# Calculate percentiles
alpha = 1 - confidence_level
lower = np.percentile(bootstrap_probs, alpha / 2 * 100)
upper = np.percentile(bootstrap_probs, (1 - alpha / 2) * 100)
confidence_intervals[i] = [lower, upper]
return confidence_intervals
def _check_ruin_convergence(
self,
bankruptcy_years: np.ndarray,
n_chains: int = 4,
) -> bool:
"""Check convergence using R-hat statistic."""
if len(bankruptcy_years) < n_chains * 100:
return False
chain_size = len(bankruptcy_years) // n_chains
chains = []
for i in range(n_chains):
chain_data = bankruptcy_years[i * chain_size : (i + 1) * chain_size]
# Convert to binary (bankrupt within 10 years)
chain_binary = (chain_data <= 10).astype(float)
chains.append(chain_binary)
chains = np.array(chains) # type: ignore
# Calculate R-hat statistic
chain_means = np.mean(chains, axis=1)
chain_vars = np.var(chains, axis=1, ddof=1)
# Between-chain variance
B = np.var(chain_means, ddof=1) * chain_size
# Within-chain variance
W = np.mean(chain_vars)
# If W is zero (no within-chain variance), check if means differ
if W < 1e-10:
# If all chains have the same mean, converged
# Chains have different means but no variance, not converged
return bool(np.allclose(chain_means, chain_means[0]))
var_plus = ((chain_size - 1) * W + B) / chain_size
r_hat = np.sqrt(var_plus / W)
return bool(r_hat < 1.05) # Converged if R-hat < 1.05