"""Algorithmic insurance decision engine for optimal coverage selection.
This module implements a comprehensive decision framework that optimizes
insurance purchasing decisions using multi-objective optimization to balance
growth targets with bankruptcy risk constraints.
"""
import copy
from dataclasses import dataclass, field
from enum import Enum
import logging
from typing import Any, Dict, List, Optional, Set, Tuple
import numpy as np
from scipy.optimize import Bounds, OptimizeResult, differential_evolution, minimize
from .config import DEFAULT_RISK_FREE_RATE, DecisionEngineConfig
from .config_loader import ConfigLoader
from .ergodic_analyzer import ErgodicAnalyzer
from .insurance_program import EnhancedInsuranceLayer as Layer
from .loss_distributions import LossDistribution
from .manufacturer import WidgetManufacturer
from .optimization import (
AugmentedLagrangianOptimizer,
EnhancedSLSQPOptimizer,
MultiStartOptimizer,
PenaltyMethodOptimizer,
TrustRegionOptimizer,
)
from .risk_metrics import RiskMetrics
logger = logging.getLogger(__name__)
[docs]
class OptimizationMethod(Enum):
"""Available optimization methods."""
SLSQP = "SLSQP" # Sequential Least Squares Programming
ENHANCED_SLSQP = "enhanced_slsqp" # Enhanced SLSQP with adaptive features
DIFFERENTIAL_EVOLUTION = "differential_evolution" # Global optimization
WEIGHTED_SUM = "weighted_sum" # Multi-objective via weighted sum
TRUST_REGION = "trust_region" # Trust-region constrained optimization
PENALTY_METHOD = "penalty_method" # Penalty method with adaptive penalties
AUGMENTED_LAGRANGIAN = "augmented_lagrangian" # Augmented Lagrangian method
MULTI_START = "multi_start" # Multi-start global optimization
[docs]
@dataclass
class OptimizationConstraints:
"""Constraints for insurance optimization."""
max_premium_budget: float = field(default=1_000_000)
min_coverage_limit: float = field(default=5_000_000)
max_coverage_limit: float = field(default=100_000_000)
max_bankruptcy_probability: float = field(default=0.01)
min_retained_limit: float = field(default=100_000)
max_retained_limit: float = field(default=10_000_000)
max_layers: int = field(default=5)
min_layers: int = field(default=1)
required_roi_improvement: float = field(default=0.0) # Minimum ROI improvement
max_debt_to_equity: float = field(default=2.0) # Maximum debt-to-equity ratio
max_insurance_cost_ratio: float = field(default=0.03) # Max insurance cost as % of revenue
min_coverage_requirement: float = field(default=0.0) # Minimum required coverage
max_retention_limit: float = field(default=float("inf")) # Maximum retention allowed
[docs]
@dataclass
class InsuranceDecision:
"""Represents an insurance purchasing decision."""
retained_limit: float # Self-insured retention
layers: List[Layer] # Insurance layers to purchase
total_premium: float # Total annual premium
total_coverage: float # Total coverage limit
pricing_scenario: str # Market pricing scenario used
optimization_method: str # Method used to find this decision
convergence_iterations: int = 0 # Iterations to converge
objective_value: float = 0.0 # Final objective function value
[docs]
def __post_init__(self):
"""Calculate derived fields."""
if self.total_coverage == 0 and self.layers:
self.total_coverage = self.retained_limit + sum(layer.limit for layer in self.layers)
if self.total_premium == 0 and self.layers:
self.total_premium = sum(layer.limit * layer.base_premium_rate for layer in self.layers)
[docs]
@dataclass
class DecisionMetrics:
"""Comprehensive metrics for evaluating an insurance decision."""
ergodic_growth_rate: float # Time-average growth with insurance
bankruptcy_probability: float # Probability of ruin
expected_roe: float # Expected return on equity
roe_improvement: float # Change in ROE vs no insurance
premium_to_limit_ratio: float # Premium efficiency
coverage_adequacy: float # Coverage vs expected losses
capital_efficiency: float # Benefit per dollar of premium
value_at_risk_95: float # 95th percentile loss
conditional_value_at_risk: float # Expected loss beyond VaR
decision_score: float = 0.0 # Overall decision quality score
# Enhanced ROE metrics
time_weighted_roe: float = 0.0 # Time-weighted average ROE
roe_volatility: float = 0.0 # ROE standard deviation
roe_sharpe_ratio: float = 0.0 # ROE risk-adjusted performance
roe_downside_deviation: float = 0.0 # Downside risk measure
roe_1yr_rolling: float = 0.0 # 1-year rolling average ROE
roe_3yr_rolling: float = 0.0 # 3-year rolling average ROE
roe_5yr_rolling: float = 0.0 # 5-year rolling average ROE
# ROE component breakdown
operating_roe: float = 0.0 # ROE from operations
insurance_impact_roe: float = 0.0 # ROE impact from insurance
tax_effect_roe: float = 0.0 # Tax impact on ROE
[docs]
def calculate_score(self, weights: Optional[Dict[str, float]] = None) -> float:
"""Calculate weighted decision score.
Args:
weights: Weights for each metric (default: equal weights)
Returns:
Weighted score between 0 and 1
"""
if weights is None:
weights = {
"growth": 0.3,
"risk": 0.3,
"efficiency": 0.2,
"adequacy": 0.2,
}
# Normalize metrics to [0, 1] scale
growth_score = min(max(self.ergodic_growth_rate / 0.2, 0), 1) # Target 20% growth
risk_score = 1 - min(self.bankruptcy_probability / 0.05, 1) # Lower is better
efficiency_score = min(self.capital_efficiency, 1)
adequacy_score = min(self.coverage_adequacy, 1)
self.decision_score = (
weights["growth"] * growth_score
+ weights["risk"] * risk_score
+ weights["efficiency"] * efficiency_score
+ weights["adequacy"] * adequacy_score
)
return self.decision_score
[docs]
@dataclass
class SensitivityReport:
"""Results of sensitivity analysis."""
base_decision: InsuranceDecision
base_metrics: DecisionMetrics
parameter_sensitivities: Dict[str, Dict[str, float]] # param -> metric -> change
key_drivers: List[str] # Most influential parameters
robust_range: Dict[str, Tuple[float, float]] # Parameter ranges for stability
stress_test_results: Dict[str, DecisionMetrics] # Scenario -> metrics
[docs]
@dataclass
class Recommendations:
"""Executive-ready recommendations from the decision engine."""
primary_recommendation: InsuranceDecision
primary_rationale: str
alternative_options: List[Tuple[InsuranceDecision, str]] # (decision, rationale)
implementation_timeline: List[str]
risk_considerations: List[str]
expected_benefits: Dict[str, float]
confidence_level: float # 0-1 confidence in recommendation
[docs]
class InsuranceDecisionEngine:
"""Algorithmic engine for optimizing insurance decisions."""
def __init__(
self,
manufacturer: WidgetManufacturer,
loss_distribution: LossDistribution,
pricing_scenario: str = "baseline",
config_loader: Optional[ConfigLoader] = None,
engine_config: Optional[DecisionEngineConfig] = None,
):
"""Initialize decision engine with company context.
Args:
manufacturer: Company profile and financials
loss_distribution: Loss model for the company
pricing_scenario: Market pricing scenario to use
config_loader: Configuration loader (creates default if None)
engine_config: Decision engine calibration parameters (creates default if None)
"""
self.manufacturer = manufacturer
self.loss_distribution = loss_distribution
self.pricing_scenario = pricing_scenario
self.config_loader = config_loader or ConfigLoader()
self.engine_config = engine_config or DecisionEngineConfig()
# Load pricing scenarios
self.pricing_config = self.config_loader.load_pricing_scenarios()
self.current_scenario = self.pricing_config.get_scenario(pricing_scenario)
# Initialize components
# InsuranceProgram will be created with layers when needed
self.insurance_program = None
self.ergodic_analyzer = ErgodicAnalyzer()
# Cache for performance
self._decision_cache: Dict[str, InsuranceDecision] = {}
self._metrics_cache: Dict[str, DecisionMetrics] = {}
# Ordered fallback sequence for optimization methods
_FALLBACK_SEQUENCE: List[OptimizationMethod] = [
OptimizationMethod.SLSQP,
OptimizationMethod.DIFFERENTIAL_EVOLUTION,
OptimizationMethod.ENHANCED_SLSQP,
OptimizationMethod.PENALTY_METHOD,
OptimizationMethod.AUGMENTED_LAGRANGIAN,
OptimizationMethod.MULTI_START,
OptimizationMethod.TRUST_REGION,
OptimizationMethod.WEIGHTED_SUM,
]
[docs]
def optimize_insurance_decision(
self,
constraints: OptimizationConstraints,
method: OptimizationMethod = OptimizationMethod.SLSQP,
weights: Optional[Dict[str, float]] = None,
_attempted_methods: Optional[Set[OptimizationMethod]] = None,
) -> InsuranceDecision:
"""Find optimal insurance structure given constraints.
Uses multi-objective optimization to balance growth, risk, and cost.
Falls back through alternative methods if validation fails, tracking
attempted methods to prevent infinite recursion.
Args:
constraints: Optimization constraints
method: Optimization method to use
weights: Objective function weights (default: balanced)
_attempted_methods: Internal set tracking methods already tried
(prevents infinite recursion). Callers should not set this.
Returns:
Optimal insurance decision
"""
if weights is None:
weights = {"growth": 0.4, "risk": 0.4, "cost": 0.2}
if _attempted_methods is None:
_attempted_methods = set()
_attempted_methods.add(method)
logger.info(f"Starting optimization with method: {method.value}")
# Check cache
cache_key = f"{constraints}_{method}_{weights}"
if cache_key in self._decision_cache:
logger.info("Returning cached decision")
return self._decision_cache[cache_key]
# Run optimization based on method
result = self._run_optimization_method(method, constraints, weights)
# Create decision from optimization result
decision = self._create_decision_from_result(result, method)
# Validate decision meets constraints
if not self._validate_decision(decision, constraints):
logger.warning(
f"Decision from {method.value} violates constraints, "
f"attempting fallback (tried {len(_attempted_methods)} methods)"
)
# Try fallback methods in sequence, skipping already-attempted ones
for fallback_method in self._FALLBACK_SEQUENCE:
if fallback_method not in _attempted_methods:
decision = self.optimize_insurance_decision(
constraints, fallback_method, weights, _attempted_methods
)
# If fallback found a valid decision, use it
if self._validate_decision(decision, constraints):
break
else:
# All methods exhausted — return best-effort result
logger.warning(
"All optimization methods exhausted without meeting constraints. "
"Returning best-effort result."
)
# Cache result
self._decision_cache[cache_key] = decision
logger.info(
f"Optimization complete: {len(decision.layers)} layers, "
f"${decision.total_premium:,.0f} premium"
)
return decision
def _run_optimization_method(
self,
method: OptimizationMethod,
constraints: OptimizationConstraints,
weights: Dict[str, float],
) -> OptimizeResult:
"""Dispatch to the appropriate optimization method.
Args:
method: Optimization method to use
constraints: Optimization constraints
weights: Objective function weights
Returns:
Optimization result
"""
dispatch = {
OptimizationMethod.SLSQP: self._optimize_slsqp,
OptimizationMethod.ENHANCED_SLSQP: self._optimize_enhanced_slsqp,
OptimizationMethod.DIFFERENTIAL_EVOLUTION: self._optimize_differential_evolution,
OptimizationMethod.TRUST_REGION: self._optimize_trust_region,
OptimizationMethod.PENALTY_METHOD: self._optimize_penalty_method,
OptimizationMethod.AUGMENTED_LAGRANGIAN: self._optimize_augmented_lagrangian,
OptimizationMethod.MULTI_START: self._optimize_multi_start,
OptimizationMethod.WEIGHTED_SUM: self._optimize_weighted_sum,
}
return dispatch[method](constraints, weights)
def _optimize_slsqp(
self, constraints: OptimizationConstraints, weights: Dict[str, float]
) -> OptimizeResult:
"""Optimize using Sequential Least Squares Programming.
Fast convergence for smooth, constrained problems.
"""
# Define decision variables: [retained_limit, layer1_limit, layer2_limit, ...]
n_vars = 1 + constraints.max_layers # retention + layer limits
# Initial guess: equal layer spacing
x0 = np.zeros(n_vars)
x0[0] = constraints.min_retained_limit # Start with minimum retention
if constraints.max_layers > 0:
layer_size = (
constraints.max_coverage_limit - constraints.min_retained_limit
) / constraints.max_layers
for i in range(1, min(3, n_vars)): # Start with 2-3 layers
x0[i] = layer_size
# Bounds for decision variables
bounds = [(constraints.min_retained_limit, constraints.max_retained_limit)]
for _ in range(constraints.max_layers):
bounds.append((0, constraints.max_coverage_limit / constraints.max_layers))
# Define objective function
def objective(x):
"""Objective function for optimization.
Args:
x: Decision variables [retained_limit, layer1_limit, ...]
Returns:
Objective value (negative for maximization)
"""
return self._calculate_objective(x, weights)
# Define constraints
constraint_list = []
# Premium budget constraint
def premium_constraint(x):
"""Constraint function for premium budget.
Args:
x: Decision variables [retained_limit, layer1_limit, ...]
Returns:
Constraint value (positive when satisfied)
"""
premium = self._calculate_premium(x)
return constraints.max_premium_budget - premium
constraint_list.append({"type": "ineq", "fun": premium_constraint})
# Coverage limit constraints
def coverage_min_constraint(x):
"""Constraint function for minimum coverage.
Args:
x: Decision variables [retained_limit, layer1_limit, ...]
Returns:
Constraint value (positive when satisfied)
"""
total_coverage = sum(x)
return total_coverage - constraints.min_coverage_limit
def coverage_max_constraint(x):
"""Constraint function for maximum coverage.
Args:
x: Decision variables [retained_limit, layer1_limit, ...]
Returns:
Constraint value (positive when satisfied)
"""
total_coverage = sum(x)
return constraints.max_coverage_limit - total_coverage
constraint_list.extend(
[
{"type": "ineq", "fun": coverage_min_constraint},
{"type": "ineq", "fun": coverage_max_constraint},
]
)
# Bankruptcy probability constraint
def bankruptcy_constraint(x):
"""Constraint function for bankruptcy probability.
Args:
x: Decision variables [retained_limit, layer1_limit, ...]
Returns:
Constraint value (positive when satisfied)
"""
prob = self._estimate_bankruptcy_probability(x)
return constraints.max_bankruptcy_probability - prob
constraint_list.append({"type": "ineq", "fun": bankruptcy_constraint})
# Run optimization
result = minimize(
objective,
x0,
method="SLSQP",
bounds=bounds,
constraints=constraint_list,
options={"maxiter": 1000, "ftol": 1e-6},
)
return result
def _optimize_differential_evolution(
self, constraints: OptimizationConstraints, weights: Dict[str, float]
) -> OptimizeResult:
"""Optimize using Differential Evolution.
Global optimization method, robust to local optima.
"""
# Bounds for decision variables
bounds = [(constraints.min_retained_limit, constraints.max_retained_limit)]
for _ in range(constraints.max_layers):
bounds.append((0, constraints.max_coverage_limit / constraints.max_layers))
# Define objective with penalty for constraint violations
def objective_with_penalty(x):
"""Objective function with penalty for constraint violations.
Args:
x: Decision variables [retained_limit, layer1_limit, ...]
Returns:
Penalized objective value
"""
obj = self._calculate_objective(x, weights)
# Add penalties for constraint violations
penalty = 0.0
# Premium constraint
premium = self._calculate_premium(x)
if premium > constraints.max_premium_budget:
penalty += 1000 * (premium - constraints.max_premium_budget)
# Coverage constraints
total_coverage = sum(x)
if total_coverage < constraints.min_coverage_limit:
penalty += 1000 * (constraints.min_coverage_limit - total_coverage)
if total_coverage > constraints.max_coverage_limit:
penalty += 1000 * (total_coverage - constraints.max_coverage_limit)
# Bankruptcy constraint
bankruptcy_prob = self._estimate_bankruptcy_probability(x)
if bankruptcy_prob > constraints.max_bankruptcy_probability:
penalty += 10000 * (bankruptcy_prob - constraints.max_bankruptcy_probability)
return obj + penalty
# Run differential evolution
result = differential_evolution(
objective_with_penalty,
bounds,
strategy="best1bin",
maxiter=500,
popsize=15,
tol=1e-6,
seed=42,
)
return result
def _optimize_weighted_sum(
self, constraints: OptimizationConstraints, weights: Dict[str, float]
) -> OptimizeResult:
"""Optimize using weighted sum scalarization with Pareto sampling.
Generates multiple candidate solutions by sweeping weight combinations,
then selects the best according to the user-specified weights. This
approach explores a broader Pareto front than single-shot SLSQP.
"""
# Generate weight sweep: vary growth/risk emphasis
weight_sets = [
{"growth": 0.6, "risk": 0.3, "cost": 0.1},
{"growth": 0.3, "risk": 0.5, "cost": 0.2},
{"growth": 0.2, "risk": 0.3, "cost": 0.5},
weights, # Include caller's weights
]
best_result: Optional[OptimizeResult] = None
best_score = float("inf")
for w in weight_sets:
result = self._optimize_slsqp(constraints, w)
# Re-score using the caller's weights for fair comparison
score = self._calculate_objective(result.x, weights)
if score < best_score:
best_score = score
best_result = result
assert best_result is not None # At least one result from the sweep
return best_result
def _optimize_enhanced_slsqp(
self, constraints: OptimizationConstraints, weights: Dict[str, float]
) -> OptimizeResult:
"""Optimize using enhanced SLSQP with adaptive features."""
n_vars = 1 + constraints.max_layers
# Initial guess
x0 = np.zeros(n_vars)
x0[0] = constraints.min_retained_limit
if constraints.max_layers > 0:
layer_size = (
constraints.max_coverage_limit - constraints.min_retained_limit
) / constraints.max_layers
for i in range(1, min(3, n_vars)):
x0[i] = layer_size
# Create bounds
bounds = Bounds(
lb=[constraints.min_retained_limit] + [0] * constraints.max_layers,
ub=[constraints.max_retained_limit]
+ [constraints.max_coverage_limit / constraints.max_layers] * constraints.max_layers,
)
# Create constraints list
constraint_list = self._create_constraint_list(constraints)
# Create enhanced SLSQP optimizer
optimizer = EnhancedSLSQPOptimizer(
lambda x: self._calculate_objective(x, weights),
constraints=constraint_list,
bounds=bounds,
)
# Run optimization
result = optimizer.optimize(x0, adaptive_step=True, max_iter=1000)
return result
def _optimize_trust_region(
self, constraints: OptimizationConstraints, weights: Dict[str, float]
) -> OptimizeResult:
"""Optimize using trust-region method."""
n_vars = 1 + constraints.max_layers
# Initial guess
x0 = np.zeros(n_vars)
x0[0] = constraints.min_retained_limit
if constraints.max_layers > 0:
layer_size = (
constraints.max_coverage_limit - constraints.min_retained_limit
) / constraints.max_layers
for i in range(1, min(3, n_vars)):
x0[i] = layer_size
# Create bounds
bounds = Bounds(
lb=[constraints.min_retained_limit] + [0] * constraints.max_layers,
ub=[constraints.max_retained_limit]
+ [constraints.max_coverage_limit / constraints.max_layers] * constraints.max_layers,
)
# Create constraints list
constraint_list = self._create_constraint_list(constraints)
# Create trust-region optimizer
optimizer = TrustRegionOptimizer(
lambda x: self._calculate_objective(x, weights),
constraints=constraint_list,
bounds=bounds,
)
# Run optimization
result = optimizer.optimize(x0, initial_radius=1.0, max_radius=10.0)
return result
def _optimize_penalty_method(
self, constraints: OptimizationConstraints, weights: Dict[str, float]
) -> OptimizeResult:
"""Optimize using penalty method with adaptive penalties."""
n_vars = 1 + constraints.max_layers
# Initial guess
x0 = np.zeros(n_vars)
x0[0] = constraints.min_retained_limit
if constraints.max_layers > 0:
layer_size = (
constraints.max_coverage_limit - constraints.min_retained_limit
) / constraints.max_layers
for i in range(1, min(3, n_vars)):
x0[i] = layer_size
# Create bounds
bounds = Bounds(
lb=[constraints.min_retained_limit] + [0] * constraints.max_layers,
ub=[constraints.max_retained_limit]
+ [constraints.max_coverage_limit / constraints.max_layers] * constraints.max_layers,
)
# Create constraints list
constraint_list = self._create_constraint_list(constraints)
# Create penalty method optimizer
optimizer = PenaltyMethodOptimizer(
lambda x: self._calculate_objective(x, weights), constraint_list, bounds
)
# Run optimization
result = optimizer.optimize(x0, max_outer_iter=50, max_inner_iter=100)
return result
def _optimize_augmented_lagrangian(
self, constraints: OptimizationConstraints, weights: Dict[str, float]
) -> OptimizeResult:
"""Optimize using augmented Lagrangian method."""
n_vars = 1 + constraints.max_layers
# Initial guess
x0 = np.zeros(n_vars)
x0[0] = constraints.min_retained_limit
if constraints.max_layers > 0:
layer_size = (
constraints.max_coverage_limit - constraints.min_retained_limit
) / constraints.max_layers
for i in range(1, min(3, n_vars)):
x0[i] = layer_size
# Create bounds
bounds = Bounds(
lb=[constraints.min_retained_limit] + [0] * constraints.max_layers,
ub=[constraints.max_retained_limit]
+ [constraints.max_coverage_limit / constraints.max_layers] * constraints.max_layers,
)
# Create constraints list
constraint_list = self._create_constraint_list(constraints)
# Create augmented Lagrangian optimizer
optimizer = AugmentedLagrangianOptimizer(
lambda x: self._calculate_objective(x, weights), constraint_list, bounds
)
# Run optimization
result = optimizer.optimize(x0, max_outer_iter=50, max_inner_iter=100)
return result
def _optimize_multi_start(
self, constraints: OptimizationConstraints, weights: Dict[str, float]
) -> OptimizeResult:
"""Optimize using multi-start approach for global optimization."""
n_vars = 1 + constraints.max_layers
# Initial guess
x0 = np.zeros(n_vars)
x0[0] = constraints.min_retained_limit
if constraints.max_layers > 0:
layer_size = (
constraints.max_coverage_limit - constraints.min_retained_limit
) / constraints.max_layers
for i in range(1, min(3, n_vars)):
x0[i] = layer_size
# Create bounds
bounds = Bounds(
lb=[constraints.min_retained_limit] + [0] * constraints.max_layers,
ub=[constraints.max_retained_limit]
+ [constraints.max_coverage_limit / constraints.max_layers] * constraints.max_layers,
)
# Create constraints list
constraint_list = self._create_constraint_list(constraints)
# Create multi-start optimizer using enhanced SLSQP as base
optimizer = MultiStartOptimizer(
lambda x: self._calculate_objective(x, weights),
bounds,
constraint_list,
base_optimizer="enhanced-slsqp",
)
# Run optimization with multiple starts
result = optimizer.optimize(n_starts=10, x0=x0, seed=42)
return result
def _create_constraint_list(self, constraints: OptimizationConstraints) -> List[Dict[str, Any]]:
"""Create list of constraints for optimizers."""
constraint_list = []
# Premium budget constraint
def premium_constraint(x):
premium = self._calculate_premium(x)
return constraints.max_premium_budget - premium
constraint_list.append({"type": "ineq", "fun": premium_constraint})
# Coverage limit constraints
def coverage_min_constraint(x):
total_coverage = sum(x)
return total_coverage - constraints.min_coverage_limit
def coverage_max_constraint(x):
total_coverage = sum(x)
return constraints.max_coverage_limit - total_coverage
constraint_list.extend(
[
{"type": "ineq", "fun": coverage_min_constraint},
{"type": "ineq", "fun": coverage_max_constraint},
]
)
# Bankruptcy probability constraint
def bankruptcy_constraint(x):
prob = self._estimate_bankruptcy_probability(x)
return constraints.max_bankruptcy_probability - prob
constraint_list.append({"type": "ineq", "fun": bankruptcy_constraint})
# Additional constraints from enhanced framework
# Debt-to-equity ratio constraint
def debt_equity_constraint(x):
# Estimate based on coverage and retention
retained_limit = x[0]
coverage = sum(x)
# Simplified: higher retention increases debt risk
debt_equity_ratio = retained_limit / coverage if coverage > 0 else 0
return constraints.max_debt_to_equity - debt_equity_ratio
constraint_list.append({"type": "ineq", "fun": debt_equity_constraint})
# Insurance cost ceiling constraint
def insurance_cost_constraint(x):
premium = self._calculate_premium(x)
revenue = (
float(self.manufacturer.total_assets) * self.manufacturer.asset_turnover_ratio
) # Boundary: float for scipy.optimize
cost_ratio = premium / revenue if revenue > 0 else 0
return constraints.max_insurance_cost_ratio - cost_ratio
constraint_list.append({"type": "ineq", "fun": insurance_cost_constraint})
# Minimum coverage requirement
if constraints.min_coverage_requirement > 0:
def min_coverage_constraint(x):
coverage = sum(x[1:]) # Exclude retention
return coverage - constraints.min_coverage_requirement
constraint_list.append({"type": "ineq", "fun": min_coverage_constraint})
# Maximum retention limit
if constraints.max_retention_limit < float("inf"):
def max_retention_constraint(x):
return constraints.max_retention_limit - x[0]
constraint_list.append({"type": "ineq", "fun": max_retention_constraint})
return constraint_list
def _calculate_objective(self, x: np.ndarray, weights: Dict[str, float]) -> float:
"""Calculate weighted objective function value.
Lower values are better.
"""
# Parse decision variables
retained_limit = x[0]
layer_limits = x[1:]
# Skip if no actual layers
active_layers = [l for l in layer_limits if l > 1000] # Min $1k layer
if not active_layers:
return 1e10 # Penalize no insurance
# Calculate components
try:
# Growth component (negative because we maximize)
growth_rate = self._estimate_growth_rate(retained_limit, active_layers)
growth_obj = -weights["growth"] * growth_rate
# Risk component (minimize bankruptcy probability)
bankruptcy_prob = self._estimate_bankruptcy_probability(x)
risk_obj = weights["risk"] * bankruptcy_prob * 100 # Scale for balance
# Cost component (minimize premium as % of assets)
premium = self._calculate_premium(x)
assets = float(self.manufacturer.total_assets) # Boundary: float for scipy.optimize
cost_obj = weights["cost"] * (premium / assets) * 10 # Scale
total = growth_obj + risk_obj + cost_obj
return float(total) # Boundary: float for scipy.optimize
except Exception as e:
logger.error(f"Error calculating objective: {e}")
return 1e10
def _calculate_premium(self, x: np.ndarray) -> float:
"""Calculate total premium for given structure."""
retained_limit = x[0]
layer_limits = x[1:]
total_premium = 0
current_attachment = retained_limit
for limit in layer_limits:
if limit > 1000: # Minimum meaningful layer
# Use pricing scenario rates
if current_attachment < 5_000_000:
rate = self.current_scenario.primary_layer_rate
elif current_attachment < 25_000_000:
rate = self.current_scenario.first_excess_rate
else:
rate = self.current_scenario.higher_excess_rate
premium = limit * rate
total_premium += premium
current_attachment += limit
return total_premium
def _estimate_growth_rate(self, retained_limit: float, layer_limits: List[float]) -> float:
"""Estimate ergodic (time-average) growth rate for given insurance structure.
Uses the fundamental ergodic economics formula:
g_time = g_ensemble - σ²/2
where σ² is the variance of log-returns. Insurance reduces the effective
volatility of net losses, improving the time-average growth rate even
when the expected (ensemble) cost exceeds expected losses.
Args:
retained_limit: Self-insured retention level.
layer_limits: List of insurance layer limits.
Returns:
Estimated time-average growth rate.
"""
assets = float(self.manufacturer.total_assets)
revenue = assets * self.manufacturer.asset_turnover_ratio
operating_income = revenue * self.manufacturer.base_operating_margin
# Ensemble (expected) growth rate from operations
g_ensemble = operating_income / assets if assets > 0 else 0.0
# Estimate loss volatility using loss distribution
if hasattr(self.loss_distribution, "expected_value"):
expected_loss = self.loss_distribution.expected_value()
else:
expected_loss = 0.0
# Compute effective loss volatility (σ of loss/assets ratio)
# Use CV=0.5 as default coefficient of variation for loss severity
loss_cv = 0.5
loss_std = expected_loss * loss_cv
# Without insurance: full loss volatility hits the balance sheet
sigma_no_insurance = loss_std / assets if assets > 0 else 0.0
# With insurance: only retained losses create volatility
coverage = sum(layer_limits)
total_coverage = retained_limit + coverage
if total_coverage > 0 and expected_loss > 0:
# Fraction of loss variance retained (not covered by insurance)
# Losses above the retention up to coverage are transferred
retained_fraction = min(retained_limit / total_coverage, 1.0)
# Excess losses beyond total coverage also hit the balance sheet
excess_fraction = max(1.0 - total_coverage / (expected_loss * 10), 0.0)
effective_fraction = retained_fraction + excess_fraction
sigma_with_insurance = sigma_no_insurance * min(effective_fraction, 1.0)
else:
sigma_with_insurance = sigma_no_insurance
# Premium drag on growth
premium = 0.0
current_attachment = retained_limit
for limit in layer_limits:
if limit > 1000:
if current_attachment < 5_000_000:
rate = self.current_scenario.primary_layer_rate
elif current_attachment < 25_000_000:
rate = self.current_scenario.first_excess_rate
else:
rate = self.current_scenario.higher_excess_rate
premium += limit * rate
current_attachment += limit
premium_drag = premium / assets if assets > 0 else 0.0
# Ergodic (time-average) growth rate: g_time = g_ensemble - σ²/2 - premium_drag
g_time = g_ensemble - (sigma_with_insurance**2) / 2 - premium_drag
return float(g_time)
def _calculate_cvar(self, losses: np.ndarray, percentile: float) -> float:
"""Calculate Conditional Value at Risk (CVaR).
.. deprecated::
Use ``RiskMetrics(losses).tvar(confidence)`` instead for new code.
This method is retained only for backward compatibility and
will be removed in a future release.
Args:
losses: Array of loss values
percentile: Percentile threshold (e.g., 95)
Returns:
CVaR value or 0 if no losses exceed threshold
"""
if len(losses) == 0:
return 0.0
threshold = np.percentile(losses, percentile)
tail_losses = losses[losses > threshold]
if len(tail_losses) == 0:
return float(threshold)
return float(np.mean(tail_losses))
def _estimate_bankruptcy_probability(self, x: np.ndarray) -> float:
"""Estimate bankruptcy probability using Monte Carlo simulation.
This method now uses the enhanced Monte Carlo engine for accurate
ruin probability estimation instead of the simplified heuristic.
"""
retained_limit = x[0]
layer_limits = x[1:]
# Create insurance program from optimization variables
layers = []
current_attachment = retained_limit
for limit in layer_limits:
if limit > 1000: # Minimum meaningful layer
# Determine rate based on attachment
if current_attachment < 5_000_000:
rate = self.current_scenario.primary_layer_rate
elif current_attachment < 25_000_000:
rate = self.current_scenario.first_excess_rate
else:
rate = self.current_scenario.higher_excess_rate
from .insurance_program import EnhancedInsuranceLayer
layer = EnhancedInsuranceLayer(
attachment_point=current_attachment,
limit=limit,
base_premium_rate=rate,
)
layers.append(layer)
current_attachment += limit
# Create insurance program
from .insurance_program import InsuranceProgram
insurance_program = InsuranceProgram(layers=layers)
# Use Monte Carlo if available, otherwise fall back to simple estimation
if hasattr(self, "_monte_carlo_engine"):
# Configure ruin probability estimation
from .monte_carlo import RuinProbabilityConfig
config = RuinProbabilityConfig(
time_horizons=[5], # Use 5-year horizon for optimization
n_simulations=1000, # Fewer simulations for optimization speed
early_stopping=True,
parallel=False, # Sequential for optimization
seed=42,
)
# Create Monte Carlo engine with current insurance structure
from .monte_carlo import MonteCarloEngine
mc_engine = MonteCarloEngine(
loss_generator=self.loss_distribution, # type: ignore
insurance_program=insurance_program,
manufacturer=self.manufacturer,
)
# Estimate ruin probability
try:
results = mc_engine.estimate_ruin_probability(config)
return float(results.ruin_probabilities[0]) # 5-year probability
except Exception as e:
logger.warning(f"Monte Carlo estimation failed: {e}, using fallback")
# Fallback to simple estimation
total_coverage = retained_limit + sum(l for l in layer_limits if l > 1000)
if hasattr(self.loss_distribution, "expected_value"):
expected_max_loss = self.loss_distribution.expected_value() * 10
else:
expected_max_loss = total_coverage
coverage_ratio = total_coverage / expected_max_loss if expected_max_loss > 0 else 1.0
if coverage_ratio >= 1.0:
return 0.001
if coverage_ratio >= 0.8:
return 0.005
if coverage_ratio >= 0.6:
return 0.01
if coverage_ratio >= 0.4:
return 0.02
return 0.05
def _create_decision_from_result(
self, result: OptimizeResult, method: OptimizationMethod
) -> InsuranceDecision:
"""Create InsuranceDecision from optimization result."""
x = result.x
retained_limit = x[0]
layer_limits = x[1:]
# Create Layer objects
layers = []
current_attachment = retained_limit
for limit in layer_limits:
if limit > 1000: # Minimum meaningful layer
# Determine rate based on attachment
if current_attachment < 5_000_000:
rate = self.current_scenario.primary_layer_rate
elif current_attachment < 25_000_000:
rate = self.current_scenario.first_excess_rate
else:
rate = self.current_scenario.higher_excess_rate
layer = Layer(
attachment_point=current_attachment,
limit=limit,
base_premium_rate=rate,
)
layers.append(layer)
current_attachment += limit
return InsuranceDecision(
retained_limit=retained_limit,
layers=layers,
total_premium=sum(l.limit * l.base_premium_rate for l in layers),
total_coverage=retained_limit + sum(l.limit for l in layers),
pricing_scenario=self.pricing_scenario,
optimization_method=method.value,
convergence_iterations=result.nit if hasattr(result, "nit") else 0,
objective_value=result.fun,
)
def _validate_decision(
self, decision: InsuranceDecision, constraints: OptimizationConstraints
) -> bool:
"""Validate that decision meets all constraints."""
# Check premium budget
if decision.total_premium > constraints.max_premium_budget:
logger.warning(f"Premium ${decision.total_premium:,.0f} exceeds budget")
return False
# Check coverage limits
if decision.total_coverage < constraints.min_coverage_limit:
logger.warning(f"Coverage ${decision.total_coverage:,.0f} below minimum")
return False
if decision.total_coverage > constraints.max_coverage_limit:
logger.warning(f"Coverage ${decision.total_coverage:,.0f} above maximum")
return False
# Check retention limits
if decision.retained_limit < constraints.min_retained_limit:
return False
if decision.retained_limit > constraints.max_retained_limit:
return False
# Check layer count
if len(decision.layers) > constraints.max_layers:
return False
return True
[docs]
def calculate_decision_metrics(self, decision: InsuranceDecision) -> DecisionMetrics:
"""Calculate comprehensive metrics for a decision.
Args:
decision: Insurance decision to evaluate
Returns:
Comprehensive metrics
"""
# Check cache
cache_key = f"{decision.retained_limit}_{len(decision.layers)}_{decision.total_premium}"
if cache_key in self._metrics_cache:
return self._metrics_cache[cache_key]
# Compute equity ratio from actual manufacturer financials
equity_ratio = (
float(self.manufacturer.equity) / float(self.manufacturer.total_assets)
if float(self.manufacturer.total_assets) > 0
else 0.3
)
# Run simulations to calculate metrics
n_simulations = 1000
time_horizon = 10 # years
# Simulate with insurance
with_insurance_results = self._run_simulation(decision, n_simulations, time_horizon)
# Simulate without insurance
no_insurance_decision = InsuranceDecision(
retained_limit=float("inf"),
layers=[],
total_premium=0,
total_coverage=0,
pricing_scenario=self.pricing_scenario,
optimization_method="none",
)
without_insurance_results = self._run_simulation(
no_insurance_decision, n_simulations, time_horizon
)
# Import ROEAnalyzer for enhanced metrics
from .risk_metrics import ROEAnalyzer
# Calculate enhanced ROE metrics if we have detailed ROE data
roe_data = with_insurance_results.get("roe_series", None)
equity_data = with_insurance_results.get("equity_series", None)
if roe_data is not None and len(roe_data) > 0:
roe_analyzer = ROEAnalyzer(roe_data, equity_data)
# Get all enhanced metrics
volatility_metrics = roe_analyzer.volatility_metrics()
performance_ratios = roe_analyzer.performance_ratios()
rolling_stats_1yr = roe_analyzer.rolling_statistics(1) if len(roe_data) >= 1 else {}
rolling_stats_3yr = roe_analyzer.rolling_statistics(3) if len(roe_data) >= 3 else {}
rolling_stats_5yr = roe_analyzer.rolling_statistics(5) if len(roe_data) >= 5 else {}
# Calculate component breakdown (simplified version)
_base_margin = (
self.manufacturer.config.base_operating_margin
if hasattr(self.manufacturer, "config")
and hasattr(self.manufacturer.config, "base_operating_margin")
else getattr(self.manufacturer, "base_operating_margin", 0.08)
)
base_operating_roe = _base_margin / equity_ratio
insurance_cost_impact = -decision.total_premium / (float(self.manufacturer.equity))
time_weighted_roe = roe_analyzer.time_weighted_average()
roe_volatility = volatility_metrics.get("standard_deviation", 0.0)
roe_sharpe = performance_ratios.get("sharpe_ratio", 0.0)
roe_downside_dev = volatility_metrics.get("downside_deviation", 0.0)
roe_1yr = np.nanmean(rolling_stats_1yr.get("mean", [0.0]))
roe_3yr = np.nanmean(rolling_stats_3yr.get("mean", [0.0]))
roe_5yr = np.nanmean(rolling_stats_5yr.get("mean", [0.0]))
else:
# Fallback to simple calculations
time_weighted_roe = np.mean(with_insurance_results["roe"])
roe_volatility = np.std(with_insurance_results["roe"])
roe_sharpe = (np.mean(with_insurance_results["roe"]) - DEFAULT_RISK_FREE_RATE) / max(
roe_volatility, 0.001
)
roe_data_arr = np.array(with_insurance_results["roe"])
below_mean = roe_data_arr[roe_data_arr < np.mean(roe_data_arr)]
roe_downside_dev = float(np.std(below_mean)) if len(below_mean) > 0 else 0.0
roe_1yr = np.mean(with_insurance_results["roe"])
roe_3yr = np.mean(with_insurance_results["roe"])
roe_5yr = np.mean(with_insurance_results["roe"])
_base_margin = (
self.manufacturer.config.base_operating_margin
if hasattr(self.manufacturer, "config")
and hasattr(self.manufacturer.config, "base_operating_margin")
else getattr(self.manufacturer, "base_operating_margin", 0.08)
)
base_operating_roe = _base_margin / equity_ratio
insurance_cost_impact = -decision.total_premium / (float(self.manufacturer.equity))
# Calculate metrics
metrics = DecisionMetrics(
ergodic_growth_rate=np.mean(with_insurance_results["growth_rates"]),
bankruptcy_probability=np.mean(with_insurance_results["bankruptcies"]),
expected_roe=np.mean(with_insurance_results["roe"]),
roe_improvement=(
np.mean(with_insurance_results["roe"]) - np.mean(without_insurance_results["roe"])
),
premium_to_limit_ratio=(
decision.total_premium / decision.total_coverage
if decision.total_coverage > 0
else 0
),
coverage_adequacy=(
min(decision.total_coverage / (self.loss_distribution.expected_value() * 10), 1.0)
if decision.total_coverage > 0 and hasattr(self.loss_distribution, "expected_value")
else 0.0
),
capital_efficiency=(
np.mean(with_insurance_results["value"])
- np.mean(without_insurance_results["value"])
)
/ max(decision.total_premium, 1),
value_at_risk_95=(
np.percentile(with_insurance_results["losses"], 95)
if len(with_insurance_results["losses"]) > 0
else 0.0
),
conditional_value_at_risk=(
RiskMetrics(with_insurance_results["losses"]).tvar(0.95)
if len(with_insurance_results["losses"]) > 0
else 0.0
),
# Enhanced ROE metrics
time_weighted_roe=time_weighted_roe,
roe_volatility=roe_volatility,
roe_sharpe_ratio=roe_sharpe,
roe_downside_deviation=roe_downside_dev,
roe_1yr_rolling=float(roe_1yr),
roe_3yr_rolling=float(roe_3yr),
roe_5yr_rolling=float(roe_5yr),
# ROE component breakdown
operating_roe=base_operating_roe,
insurance_impact_roe=insurance_cost_impact,
tax_effect_roe=-(
self.manufacturer.config.tax_rate
if hasattr(self.manufacturer, "config")
and hasattr(self.manufacturer.config, "tax_rate")
else 0.25
)
* np.mean(with_insurance_results["roe"]),
)
# Calculate overall score
metrics.calculate_score()
# Cache result
self._metrics_cache[cache_key] = metrics
return metrics
def _run_simulation(
self,
decision: InsuranceDecision,
n_simulations: int,
time_horizon: int,
seed: Optional[int] = 42,
) -> Dict[str, np.ndarray]:
"""Run Monte Carlo simulation for given decision.
Args:
decision: Insurance decision to simulate.
n_simulations: Number of Monte Carlo paths.
time_horizon: Number of years per path.
seed: Random seed for reproducibility. Pass None for non-deterministic.
"""
equity_ratio = (
float(self.manufacturer.equity) / float(self.manufacturer.total_assets)
if float(self.manufacturer.total_assets) > 0
else 0.3
)
rng = np.random.default_rng(seed)
results = {
"growth_rates": np.zeros(n_simulations),
"bankruptcies": np.zeros(n_simulations),
"roe": np.zeros(n_simulations),
"value": np.zeros(n_simulations),
"losses": np.zeros(n_simulations),
"roe_series": [], # Store full ROE time series
"equity_series": [], # Store equity time series
}
# Collect all ROE and equity series for enhanced analysis
all_roe_series = []
all_equity_series = []
for i in range(n_simulations):
# Initialize company state from manufacturer financials
assets = float(self.manufacturer.total_assets)
equity = float(self.manufacturer.equity)
bankrupt = False
annual_returns = []
sim_roe_series = []
sim_equity_series = []
for _ in range(time_horizon):
# Generate revenue
revenue = assets * self.manufacturer.asset_turnover_ratio
# Generate losses using loss distribution parameters
if hasattr(self.loss_distribution, "expected_value"):
expected = max(self.loss_distribution.expected_value(), 1.0)
# Use distribution's CV if available, otherwise default 0.5
if hasattr(self.loss_distribution, "cv"):
cv = self.loss_distribution.cv()
else:
cv = 0.5
sigma_sq = np.log(1 + cv**2)
mu = np.log(expected) - sigma_sq / 2
annual_losses = rng.lognormal(mu, np.sqrt(sigma_sq))
else:
annual_losses = 0.0
# Apply insurance
retained_losses = min(annual_losses, decision.retained_limit)
insured_losses = 0.0
if decision.layers:
remaining_loss = max(annual_losses - decision.retained_limit, 0)
for layer in decision.layers:
layer_loss = min(remaining_loss, layer.limit)
insured_losses += layer_loss
remaining_loss -= layer_loss
if remaining_loss <= 0:
break
# Calculate net income
operating_income = revenue * self.manufacturer.base_operating_margin
net_losses = retained_losses + max(annual_losses - decision.total_coverage, 0)
net_income = operating_income - net_losses - decision.total_premium
# Calculate ROE before updating equity
if equity > 0:
roe = net_income / equity
annual_returns.append(roe)
sim_roe_series.append(roe)
sim_equity_series.append(equity)
else:
annual_returns.append(net_income / max(equity, 1))
# Update equity
equity += net_income
# Check bankruptcy
if equity <= 0:
bankrupt = True
break
# Update assets for next period
assets = equity / equity_ratio
# Store results
results["growth_rates"][i] = np.mean(annual_returns) if annual_returns else 0 # type: ignore
results["bankruptcies"][i] = 1 if bankrupt else 0 # type: ignore
results["roe"][i] = np.mean(annual_returns) if annual_returns else 0 # type: ignore
results["value"][i] = equity # type: ignore
results["losses"][i] = annual_losses if "annual_losses" in locals() else 0 # type: ignore
# Store series for enhanced analysis
all_roe_series.extend(sim_roe_series)
all_equity_series.extend(sim_equity_series)
# Convert to numpy arrays for analysis
results["roe_series"] = np.array(all_roe_series) if all_roe_series else np.array([])
results["equity_series"] = (
np.array(all_equity_series) if all_equity_series else np.array([])
)
return results # type: ignore
[docs]
def run_sensitivity_analysis(
self,
base_decision: InsuranceDecision,
parameters: Optional[List[str]] = None,
variation_range: float = 0.2,
) -> SensitivityReport:
"""Analyze decision sensitivity to parameter changes.
Args:
base_decision: Base decision to analyze
parameters: Parameters to test (default: key parameters)
variation_range: ±% to vary parameters (default: 20%)
Returns:
Comprehensive sensitivity report
"""
# pylint: disable=too-many-locals
if parameters is None:
parameters = [
"base_premium_rate",
"loss_frequency",
"loss_severity",
"growth_rate",
"capital_base",
]
logger.info(f"Running sensitivity analysis for {len(parameters)} parameters")
# Calculate base metrics
base_metrics = self.calculate_decision_metrics(base_decision)
# Save original state so sensitivity analysis does not corrupt the engine
original_scenario = copy.deepcopy(self.current_scenario)
original_loss_dist = copy.deepcopy(self.loss_distribution)
original_manufacturer = copy.deepcopy(self.manufacturer)
# Initialize results
parameter_sensitivities = {}
stress_test_results = {}
try:
for param in parameters:
param_results = {}
# Test parameter variations
for variation in [-variation_range, variation_range]:
# Modify parameter (works on copies via deepcopy restore below)
self._modify_parameter(param, variation)
# Clear caches so modified parameters take effect
self._decision_cache.clear()
self._metrics_cache.clear()
# Re-optimize with modified parameter
constraints = OptimizationConstraints(
max_premium_budget=base_decision.total_premium * 1.1
)
modified_decision = self.optimize_insurance_decision(constraints)
# Calculate metrics
modified_metrics = self.calculate_decision_metrics(modified_decision)
# Calculate sensitivity
label = "decrease" if variation < 0 else "increase"
param_results[label] = {
"growth_change": (
modified_metrics.ergodic_growth_rate - base_metrics.ergodic_growth_rate
),
"risk_change": (
modified_metrics.bankruptcy_probability
- base_metrics.bankruptcy_probability
),
"roe_change": modified_metrics.expected_roe - base_metrics.expected_roe,
}
# Store stress test results
stress_test_results[f"{param}_{label}"] = modified_metrics
# Restore state after each variation to start clean for next one
self.current_scenario = copy.deepcopy(original_scenario)
self.loss_distribution = copy.deepcopy(original_loss_dist)
self.manufacturer = copy.deepcopy(original_manufacturer)
parameter_sensitivities[param] = param_results
finally:
# Always restore original state, even if analysis fails partway
self.current_scenario = original_scenario
self.loss_distribution = original_loss_dist
self.manufacturer = original_manufacturer
# Clear caches so restored state is used for subsequent calls
self._decision_cache.clear()
self._metrics_cache.clear()
# Flatten parameter sensitivities for the report
flattened_sensitivities = {}
for param, results in parameter_sensitivities.items():
# Calculate average impact across variations
avg_growth_change = np.mean(
[abs(results[v]["growth_change"]) for v in ["decrease", "increase"]]
)
avg_risk_change = np.mean(
[abs(results[v]["risk_change"]) for v in ["decrease", "increase"]]
)
avg_roe_change = np.mean(
[abs(results[v]["roe_change"]) for v in ["decrease", "increase"]]
)
flattened_sensitivities[param] = {
"growth_sensitivity": float(avg_growth_change),
"risk_sensitivity": float(avg_risk_change),
"roe_sensitivity": float(avg_roe_change),
}
# Identify key drivers (parameters with highest impact)
impacts = []
for param, metrics in flattened_sensitivities.items():
total_impact = float(
metrics["growth_sensitivity"]
+ metrics["risk_sensitivity"] * 10
+ metrics["roe_sensitivity"]
)
impacts.append((param, total_impact))
key_drivers = [p for p, _ in sorted(impacts, key=lambda x: float(x[1]), reverse=True)][:3]
# Determine robust ranges
robust_range = {}
for param in parameters:
# Find range where decision remains stable
# Simplified: use fixed range for now
robust_range[param] = (-0.1, 0.1) # ±10% is typically stable
return SensitivityReport(
base_decision=base_decision,
base_metrics=base_metrics,
parameter_sensitivities=flattened_sensitivities,
key_drivers=key_drivers,
robust_range=robust_range,
stress_test_results=stress_test_results,
)
def _modify_parameter(self, parameter: str, variation: float) -> Any:
"""Modify a parameter for sensitivity analysis."""
# Store original state
original_state = {}
if parameter == "base_premium_rate":
# Modify pricing scenario rates
for attr in ["primary_layer_rate", "first_excess_rate", "higher_excess_rate"]:
original = getattr(self.current_scenario, attr)
original_state[attr] = original
setattr(self.current_scenario, attr, original * (1 + variation))
elif parameter == "loss_frequency":
# Modify loss distribution frequency
if hasattr(self.loss_distribution, "frequency"):
original_state["frequency"] = self.loss_distribution.frequency
self.loss_distribution.frequency *= 1 + variation
elif parameter == "capital_base":
# Modify manufacturer capital
original_state["total_assets"] = self.manufacturer.total_assets
# Note: total_assets is Decimal, so we need to use Decimal arithmetic
from decimal import Decimal
self.manufacturer.total_assets = self.manufacturer.total_assets * Decimal(
str(1 + variation)
)
# Return original state for restoration
return original_state
[docs]
def generate_recommendations(
self, analysis_results: List[Tuple[InsuranceDecision, DecisionMetrics]]
) -> Recommendations:
"""Generate executive-ready recommendations.
Args:
analysis_results: List of (decision, metrics) tuples to analyze
Returns:
Comprehensive recommendations
"""
if not analysis_results:
raise ValueError("No analysis results provided")
# Sort by decision score
sorted_results = sorted(analysis_results, key=lambda x: x[1].decision_score, reverse=True)
# Select primary recommendation
primary_decision, primary_metrics = sorted_results[0]
# Generate rationale
primary_rationale = self._generate_rationale(primary_decision, primary_metrics)
# Select alternatives (top 3 after primary)
alternatives = []
for decision, metrics in sorted_results[1:4]:
rationale = self._generate_brief_rationale(decision, metrics)
alternatives.append((decision, rationale))
# Generate implementation timeline
timeline = self._generate_timeline(primary_decision)
# Identify risk considerations
risk_considerations = self._identify_risks(primary_decision, primary_metrics)
# Calculate expected benefits
expected_benefits = {
"ROE Improvement": primary_metrics.roe_improvement,
"Risk Reduction": 1 - primary_metrics.bankruptcy_probability,
"Growth Enhancement": primary_metrics.ergodic_growth_rate,
"Capital Efficiency": primary_metrics.capital_efficiency,
}
# Calculate confidence level
confidence = self._calculate_confidence(primary_metrics, sorted_results)
return Recommendations(
primary_recommendation=primary_decision,
primary_rationale=primary_rationale,
alternative_options=alternatives,
implementation_timeline=timeline,
risk_considerations=risk_considerations,
expected_benefits=expected_benefits,
confidence_level=confidence,
)
def _generate_rationale(self, decision: InsuranceDecision, metrics: DecisionMetrics) -> str:
"""Generate detailed rationale for a decision."""
rationale = f"""
This insurance structure optimizes long-term value creation by:
1. **Growth Enhancement**: Achieves {metrics.ergodic_growth_rate:.1%} ergodic growth rate,
representing a {metrics.roe_improvement:.1%} improvement over self-insurance.
2. **Risk Management**: Reduces bankruptcy probability to {metrics.bankruptcy_probability:.2%},
providing robust downside protection while maintaining upside potential.
3. **Capital Efficiency**: Delivers ${metrics.capital_efficiency:.2f} of value per dollar
of premium spent, with coverage adequacy of {metrics.coverage_adequacy:.0%}.
4. **Structure**: {len(decision.layers)} layers totaling ${decision.total_coverage/1e6:.1f}M
in coverage for ${decision.total_premium/1e3:.0f}K annual premium.
The structure is optimized for {decision.pricing_scenario} market conditions using
{decision.optimization_method} optimization.
"""
return rationale.strip()
def _generate_brief_rationale(
self, decision: InsuranceDecision, metrics: DecisionMetrics
) -> str:
"""Generate brief rationale for alternative option."""
return (
f"{len(decision.layers)} layers, ${decision.total_premium/1e3:.0f}K premium, "
f"{metrics.ergodic_growth_rate:.1%} growth, {metrics.bankruptcy_probability:.2%} risk"
)
def _generate_timeline(self, decision: InsuranceDecision) -> List[str]:
"""Generate implementation timeline."""
return [
"Week 1-2: Finalize insurance specifications and requirements",
"Week 3-4: Solicit quotes from insurance markets",
"Week 5-6: Negotiate terms and pricing",
"Week 7: Execute insurance contracts",
"Week 8: Implement coverage and update risk management procedures",
"Ongoing: Monitor performance and adjust as needed",
]
def _identify_risks(self, decision: InsuranceDecision, metrics: DecisionMetrics) -> List[str]:
"""Identify key risks in the recommendation."""
risks = []
if metrics.bankruptcy_probability > 0.01:
risks.append(
f"Residual bankruptcy risk of {metrics.bankruptcy_probability:.2%} remains"
)
if decision.total_premium > 1_000_000:
risks.append(
f"Significant premium commitment of ${decision.total_premium/1e6:.1f}M annually"
)
if len(decision.layers) > 3:
risks.append(f"Complex structure with {len(decision.layers)} layers to manage")
if metrics.coverage_adequacy < 0.8:
risks.append(
f"Coverage may be insufficient for extreme events (adequacy: {metrics.coverage_adequacy:.0%})"
)
risks.append("Market conditions may change, affecting renewal pricing")
risks.append("Actual losses may differ from modeled distributions")
return risks
def _calculate_confidence(self, primary_metrics: DecisionMetrics, all_results: List) -> float:
"""Calculate confidence level in recommendation."""
confidence = 0.5 # Base confidence
# Higher score increases confidence
if primary_metrics.decision_score > 0.8:
confidence += 0.2
elif primary_metrics.decision_score > 0.6:
confidence += 0.1
# Low bankruptcy probability increases confidence
if primary_metrics.bankruptcy_probability < 0.005:
confidence += 0.15
elif primary_metrics.bankruptcy_probability < 0.01:
confidence += 0.1
# Clear separation from alternatives increases confidence
if len(all_results) > 1:
score_gap = primary_metrics.decision_score - all_results[1][1].decision_score
if score_gap > 0.1:
confidence += 0.1
# Positive ROE improvement increases confidence
if primary_metrics.roe_improvement > 0.02:
confidence += 0.05
return min(confidence, 0.95) # Cap at 95%