"""Multi-layer insurance program with reinstatements and advanced features.
This module provides comprehensive insurance program management including
multi-layer structures, reinstatements, attachment points, and accurate
loss allocation for manufacturing risk transfer optimization.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import numpy as np
import yaml
if TYPE_CHECKING:
from .exposure_base import ExposureBase
from .insurance_pricing import InsurancePricer, MarketCycle
from .loss_distributions import LossEvent, ManufacturingLossGenerator
from .manufacturer import WidgetManufacturer
[docs]
class ReinstatementType(Enum):
"""Types of reinstatement provisions."""
NONE = "none"
PRO_RATA = "pro_rata" # Premium based on time remaining
FULL = "full" # Full premium regardless of timing
FREE = "free" # No additional premium
[docs]
@dataclass
class OptimizationConstraints:
"""Constraints for insurance program optimization."""
max_total_premium: Optional[float] = None # Budget constraint
min_total_coverage: Optional[float] = None # Minimum coverage requirement
max_layers: int = 5 # Maximum number of layers
min_layers: int = 3 # Minimum number of layers
max_attachment_gap: float = 0.0 # Maximum gap between layers (0 = no gaps)
min_roe_improvement: float = 0.15 # Minimum ROE improvement target
max_iterations: int = 1000 # Maximum optimization iterations
convergence_tolerance: float = 1e-6 # Convergence tolerance
[docs]
@dataclass
class OptimalStructure:
"""Result of insurance structure optimization."""
layers: List[EnhancedInsuranceLayer]
deductible: float
total_premium: float
total_coverage: float
ergodic_benefit: float
roe_improvement: float
optimization_metrics: Dict[str, Any]
convergence_achieved: bool
iterations_used: int
[docs]
@dataclass
class EnhancedInsuranceLayer:
"""Insurance layer with reinstatement support and advanced features.
Extends basic layer functionality with reinstatements, tracking,
and more sophisticated premium calculations.
"""
attachment_point: float # Where coverage starts
limit: float # Maximum coverage amount (interpretation depends on limit_type)
base_premium_rate: float # % of limit as base premium
reinstatements: int = 0 # Number of reinstatements available
reinstatement_premium: float = 1.0 # % of original premium per reinstatement
reinstatement_type: ReinstatementType = ReinstatementType.PRO_RATA
aggregate_limit: Optional[float] = None # Annual aggregate limit (for aggregate/hybrid types)
participation_rate: float = 1.0 # % of loss covered by this layer (default 100%)
limit_type: str = "per-occurrence" # Type of limit: "per-occurrence", "aggregate", or "hybrid"
per_occurrence_limit: Optional[float] = None # Per-occurrence limit (for hybrid type)
premium_rate_exposure: Optional["ExposureBase"] = (
None # Exposure object for dynamic premium scaling
)
[docs]
def __post_init__(self):
"""Validate layer parameters."""
if self.attachment_point < 0:
raise ValueError(f"Attachment point must be non-negative, got {self.attachment_point}")
if self.limit <= 0:
raise ValueError(f"Limit must be positive, got {self.limit}")
# Initialize exhausted tracking
self.exhausted = 0.0
if self.base_premium_rate < 0:
raise ValueError(
f"Base premium rate must be non-negative, got {self.base_premium_rate}"
)
if self.reinstatements < 0:
raise ValueError(f"Reinstatements must be non-negative, got {self.reinstatements}")
if self.reinstatement_premium < 0:
raise ValueError(
f"Reinstatement premium must be non-negative, got {self.reinstatement_premium}"
)
# Validate limit type
valid_limit_types = ["per-occurrence", "aggregate", "hybrid"]
if self.limit_type not in valid_limit_types:
raise ValueError(
f"Invalid limit_type: {self.limit_type}. Must be one of {valid_limit_types}"
)
# Handle different limit type configurations
if self.limit_type == "per-occurrence":
# For per-occurrence, the limit field is the per-occurrence limit
# No aggregate limit should be set (will be ignored if set)
if self.reinstatements > 0:
import warnings
warnings.warn(
f"Reinstatements parameter ({self.reinstatements}) is not used for per-occurrence limits.",
UserWarning,
)
elif self.limit_type == "aggregate":
# For aggregate, the limit field is the aggregate limit
# Set aggregate_limit to match limit if not already set
if self.aggregate_limit is None:
self.aggregate_limit = self.limit
elif self.limit_type == "hybrid":
# For hybrid, need both per-occurrence and aggregate limits
if self.per_occurrence_limit is None:
# If not specified, use limit as per-occurrence limit
self.per_occurrence_limit = self.limit
if self.aggregate_limit is None:
# If aggregate not specified, error
raise ValueError(
"Hybrid limit type requires both per_occurrence_limit and aggregate_limit"
)
if self.per_occurrence_limit <= 0:
raise ValueError(
f"Per-occurrence limit must be positive, got {self.per_occurrence_limit}"
)
if self.aggregate_limit <= 0:
raise ValueError(f"Aggregate limit must be positive, got {self.aggregate_limit}")
# Validate aggregate limit if specified
if self.aggregate_limit is not None and self.aggregate_limit <= 0:
raise ValueError(f"Aggregate limit must be positive if set, got {self.aggregate_limit}")
[docs]
def calculate_base_premium(self, time: float = 0.0) -> float:
"""Calculate base premium for this layer.
Args:
time: Time in years for exposure calculation (default 0.0).
Returns:
Base premium amount (rate × limit × exposure_multiplier).
"""
base_premium = self.limit * self.base_premium_rate
# Apply exposure scaling if available
if self.premium_rate_exposure is not None:
multiplier = self.premium_rate_exposure.get_frequency_multiplier(time)
return base_premium * multiplier
return base_premium
[docs]
def calculate_reinstatement_premium(self, timing_factor: float = 1.0) -> float:
"""Calculate premium for a single reinstatement.
Args:
timing_factor: Pro-rata factor based on policy period remaining (0-1).
Returns:
Reinstatement premium amount.
"""
base_premium = self.calculate_base_premium()
if self.reinstatement_type == ReinstatementType.FREE:
return 0.0
if self.reinstatement_type == ReinstatementType.FULL:
return base_premium * self.reinstatement_premium
if self.reinstatement_type == ReinstatementType.PRO_RATA:
return base_premium * self.reinstatement_premium * timing_factor
return 0.0
[docs]
def can_respond(self, loss_amount: float) -> bool:
"""Check if this layer can respond to a loss.
Args:
loss_amount: Total loss amount.
Returns:
True if loss exceeds attachment point.
"""
return loss_amount > self.attachment_point
[docs]
def calculate_layer_loss(self, total_loss: float) -> float:
"""Calculate the portion of loss hitting this layer.
Args:
total_loss: Total loss amount.
Returns:
Amount of loss allocated to this layer (before applying limits).
"""
if total_loss <= self.attachment_point:
return 0.0
excess_loss = total_loss - self.attachment_point
# Apply the appropriate limit based on limit_type
if self.limit_type == "per-occurrence":
# For per-occurrence, limit each individual loss
return min(excess_loss, self.limit)
if self.limit_type == "aggregate":
# For aggregate, limit is handled in process_claim via aggregate tracking
# Here we just return the excess without limiting
return min(excess_loss, self.limit)
if self.limit_type == "hybrid":
# For hybrid, apply per-occurrence limit first
# Aggregate limit is handled in process_claim
return min(excess_loss, self.per_occurrence_limit or self.limit)
return min(excess_loss, self.limit)
[docs]
@dataclass
class LayerState:
"""Tracks the current state of an insurance layer during simulation.
Maintains utilization, reinstatement count, and exhaustion status
for accurate multi-claim processing.
"""
layer: EnhancedInsuranceLayer
current_limit: float = field(init=False)
used_limit: float = 0.0
reinstatements_used: int = 0
total_claims_paid: float = 0.0
reinstatement_premiums_paid: float = 0.0
is_exhausted: bool = False
aggregate_used: float = 0.0 # For annual aggregate tracking
[docs]
def __post_init__(self):
"""Initialize current limit to layer's base limit."""
self.current_limit = self.layer.limit
[docs]
def process_claim(self, claim_amount: float, timing_factor: float = 1.0) -> Tuple[float, float]:
"""Process a claim against this layer.
Args:
claim_amount: Amount of loss allocated to this layer.
timing_factor: Pro-rata factor for reinstatement premium.
Returns:
Tuple of (amount_paid, reinstatement_premium).
"""
if claim_amount <= 0:
return 0.0, 0.0
# Handle different limit types
if self.layer.limit_type == "per-occurrence":
# Per-occurrence: each claim limited but no exhaustion
payment = min(claim_amount, self.layer.limit)
self.total_claims_paid += payment
return payment, 0.0 # No reinstatement premiums for per-occurrence
if self.layer.limit_type == "aggregate":
# Aggregate: track total usage and exhaust when limit reached
if self.is_exhausted:
return 0.0, 0.0
total_payment = 0.0
total_reinstatement_premium = 0.0
remaining_claim = claim_amount
# Process claim with current limit (may trigger reinstatements)
while remaining_claim > 0 and not self.is_exhausted:
# Calculate available limit
available_limit = self.current_limit
# Check aggregate limit if it exists
if self.layer.aggregate_limit is not None:
remaining_aggregate = self.layer.aggregate_limit - self.aggregate_used
if remaining_aggregate <= 0:
self.is_exhausted = True
break
available_limit = min(available_limit, remaining_aggregate)
# Calculate payment from available limit
payment = min(remaining_claim, available_limit)
if payment > 0:
self.used_limit += payment
self.current_limit -= payment
self.total_claims_paid += payment
self.aggregate_used += payment
total_payment += payment
remaining_claim -= payment
# Check if aggregate exhausted
if (
self.layer.aggregate_limit is not None
and self.aggregate_used >= self.layer.aggregate_limit
):
# Check if this is a single aggregate that can be reinstated
# (aggregate_limit == limit means single aggregate with reinstatements)
# (aggregate_limit > limit means overall cap across reinstatements)
if (
self.layer.aggregate_limit == self.layer.limit
and self.reinstatements_used < self.layer.reinstatements
):
# Single aggregate with reinstatements - can reinstate
self.reinstatements_used += 1
self.current_limit = self.layer.limit
self.aggregate_used = 0 # Reset aggregate tracking
reinstatement_premium = self.layer.calculate_reinstatement_premium(
timing_factor
)
self.reinstatement_premiums_paid += reinstatement_premium
total_reinstatement_premium += reinstatement_premium
# Continue processing if there's remaining claim
if remaining_claim > 0:
continue
else:
# Overall cap reached - cannot be reinstated
self.is_exhausted = True
self.current_limit = 0
break
# Check if limit exhausted and can reinstate
if self.current_limit == 0 and not self.is_exhausted:
if self.reinstatements_used < self.layer.reinstatements:
# Trigger reinstatement
self.reinstatements_used += 1
self.current_limit = self.layer.limit
reinstatement_premium = self.layer.calculate_reinstatement_premium(
timing_factor
)
self.reinstatement_premiums_paid += reinstatement_premium
total_reinstatement_premium += reinstatement_premium
# Continue processing if there's remaining claim
if remaining_claim == 0:
break
else:
# No more reinstatements available
self.is_exhausted = True
break
elif remaining_claim == 0:
# Claim fully processed - check if we need reinstatement
if (
self.current_limit == 0
and self.reinstatements_used < self.layer.reinstatements
and not self.is_exhausted
):
# Trigger reinstatement for future use
self.reinstatements_used += 1
self.current_limit = self.layer.limit
reinstatement_premium = self.layer.calculate_reinstatement_premium(
timing_factor
)
self.reinstatement_premiums_paid += reinstatement_premium
total_reinstatement_premium += reinstatement_premium
break
return total_payment, total_reinstatement_premium
if self.layer.limit_type == "hybrid":
# Hybrid: apply both per-occurrence and aggregate constraints
if self.is_exhausted:
return 0.0, 0.0
# First apply per-occurrence limit
per_occurrence_limit = self.layer.per_occurrence_limit or self.layer.limit
max_payment = min(claim_amount, per_occurrence_limit)
# Then check against remaining aggregate
if self.layer.aggregate_limit is not None:
remaining_aggregate = self.layer.aggregate_limit - self.aggregate_used
if remaining_aggregate <= 0:
self.is_exhausted = True
return 0.0, 0.0
max_payment = min(max_payment, remaining_aggregate)
# Make the payment
if max_payment > 0:
self.total_claims_paid += max_payment
self.aggregate_used += max_payment
# Check if aggregate is now exhausted
if (
self.layer.aggregate_limit is not None
and self.aggregate_used >= self.layer.aggregate_limit
):
self.is_exhausted = True
# Calculate reinstatement premium if aggregate portion exhausted
reinstatement_premium = 0.0
if self.is_exhausted and self.reinstatements_used < self.layer.reinstatements:
self.reinstatements_used += 1
self.aggregate_used = 0 # Reset aggregate tracking
self.is_exhausted = False
reinstatement_premium = self.layer.calculate_reinstatement_premium(timing_factor)
self.reinstatement_premiums_paid += reinstatement_premium
return max_payment, reinstatement_premium
# Fallback to aggregate behavior
return self._process_claim_aggregate(claim_amount, timing_factor)
def _process_claim_aggregate(
self, claim_amount: float, timing_factor: float = 1.0
) -> Tuple[float, float]:
"""Original aggregate claim processing logic (for backward compatibility)."""
if self.is_exhausted or claim_amount <= 0:
return 0.0, 0.0
total_payment = 0.0
total_reinstatement_premium = 0.0
remaining_claim = claim_amount
# Process claim with current limit (may trigger reinstatements)
while remaining_claim > 0 and not self.is_exhausted:
# Check aggregate limit
available_limit = self.current_limit
if self.layer.aggregate_limit is not None:
remaining_aggregate = self.layer.aggregate_limit - self.aggregate_used
if remaining_aggregate <= 0:
self.is_exhausted = True
break
available_limit = min(available_limit, remaining_aggregate)
# Calculate payment from available limit
payment = min(remaining_claim, available_limit)
if payment > 0:
self.used_limit += payment
self.current_limit -= payment
self.total_claims_paid += payment
total_payment += payment
remaining_claim -= payment
# Update aggregate if applicable
if self.layer.aggregate_limit is not None:
self.aggregate_used += payment
if self.aggregate_used >= self.layer.aggregate_limit:
self.is_exhausted = True
self.current_limit = 0.0
break
# Check if limit exhausted and can reinstate
if self.current_limit == 0 and not self.is_exhausted:
if self.reinstatements_used < self.layer.reinstatements:
# Trigger reinstatement
self.reinstatements_used += 1
self.current_limit = self.layer.limit
reinstatement_premium = self.layer.calculate_reinstatement_premium(
timing_factor
)
self.reinstatement_premiums_paid += reinstatement_premium
total_reinstatement_premium += reinstatement_premium
else:
# No more reinstatements available
self.is_exhausted = True
break
else:
# Can't process more of this claim
break
return total_payment, total_reinstatement_premium
[docs]
def reset(self):
"""Reset layer state for new policy period."""
self.current_limit = self.layer.limit
self.used_limit = 0.0
self.reinstatements_used = 0
self.total_claims_paid = 0.0
self.reinstatement_premiums_paid = 0.0
self.is_exhausted = False
self.aggregate_used = 0.0
[docs]
def get_available_limit(self) -> float:
"""Get currently available limit.
Returns:
Available limit for claims.
"""
return self.current_limit if not self.is_exhausted else 0.0
[docs]
def get_utilization_rate(self) -> float:
"""Calculate layer utilization rate.
Returns:
Utilization as percentage of total available limit.
"""
total_available = self.layer.limit * (1 + self.layer.reinstatements)
if total_available == 0:
return 0.0
return self.total_claims_paid / total_available
[docs]
class InsuranceProgram:
"""Comprehensive multi-layer insurance program manager.
Handles complex insurance structures with multiple layers,
reinstatements, and sophisticated claim allocation.
"""
def __init__(
self,
layers: List[EnhancedInsuranceLayer],
deductible: float = 0.0,
name: str = "Manufacturing Insurance Program",
pricing_enabled: bool = False,
pricer: Optional["InsurancePricer"] = None,
):
"""Initialize insurance program.
Args:
layers: List of insurance layers (will be sorted by attachment).
deductible: Self-insured retention before insurance.
name: Program identifier.
pricing_enabled: Whether to use dynamic pricing.
pricer: Optional InsurancePricer for dynamic pricing.
"""
self.layers = sorted(layers, key=lambda x: x.attachment_point)
self.deductible = deductible
self.name = name
self.layer_states = [LayerState(layer) for layer in self.layers]
self.total_premiums_paid = 0.0
self.total_claims = 0
self.pricing_enabled = pricing_enabled
self.pricer = pricer
self.pricing_results: List[Any] = []
[docs]
def calculate_annual_premium(self, time: float = 0.0) -> float:
"""Calculate total annual premium for the program.
Args:
time: Time in years for exposure calculation (default 0.0).
Returns:
Total base premium across all layers.
"""
return sum(layer.calculate_base_premium(time) for layer in self.layers)
[docs]
def process_claim(self, claim_amount: float, timing_factor: float = 1.0) -> Dict[str, Any]:
"""Process a single claim through the insurance structure.
Args:
claim_amount: Total claim amount.
timing_factor: Pro-rata factor for reinstatement premiums.
Returns:
Dictionary with claim allocation details.
"""
if claim_amount <= 0:
return {
"total_claim": 0.0,
"deductible_paid": 0.0,
"insurance_recovery": 0.0,
"uncovered_loss": 0.0,
"reinstatement_premiums": 0.0,
"layers_triggered": [],
}
self.total_claims += 1
result: Dict[str, Any] = {
"total_claim": claim_amount,
"deductible_paid": min(claim_amount, self.deductible),
"insurance_recovery": 0.0,
"uncovered_loss": 0.0,
"reinstatement_premiums": 0.0,
"layers_triggered": [],
}
# Maximum recoverable is claim minus deductible
max_recoverable = claim_amount - result["deductible_paid"]
# Process through each layer
for i, state in enumerate(self.layer_states):
if not state.layer.can_respond(claim_amount):
continue
# Calculate loss to this layer
layer_loss = state.layer.calculate_layer_loss(claim_amount)
# Process the claim
payment, reinstatement_premium = state.process_claim(layer_loss, timing_factor)
if payment > 0:
result["insurance_recovery"] += payment
result["reinstatement_premiums"] += reinstatement_premium
result["layers_triggered"].append(
{
"layer_index": i,
"attachment": state.layer.attachment_point,
"payment": payment,
"reinstatement_premium": reinstatement_premium,
"exhausted": state.is_exhausted,
}
)
# Guard: total insurance recovery cannot exceed (claim - deductible)
if result["insurance_recovery"] > max_recoverable:
result["insurance_recovery"] = max_recoverable
# Calculate uncovered loss
total_covered = result["deductible_paid"] + result["insurance_recovery"]
if total_covered < claim_amount:
result["uncovered_loss"] = claim_amount - total_covered
# Company pays uncovered portion
result["deductible_paid"] += result["uncovered_loss"]
self.total_premiums_paid += result["reinstatement_premiums"]
return result
[docs]
def process_annual_claims(
self, claims: List[float], claim_times: Optional[List[float]] = None
) -> Dict[str, Any]:
"""Process all claims for a policy year.
Args:
claims: List of claim amounts.
claim_times: Optional list of claim times (0-1 for year fraction).
Returns:
Dictionary with annual summary statistics.
"""
if claim_times is None:
# Assume uniform distribution through year
claim_times = list(np.linspace(0, 1, len(claims))) if claims else []
results: Dict[str, Any] = {
"total_claims": len(claims),
"total_losses": sum(claims),
"total_deductible": 0.0,
"total_recovery": 0.0,
"total_uncovered": 0.0,
"total_reinstatement_premiums": 0.0,
"base_premium": self.calculate_annual_premium(0.0),
"claim_details": [],
"layer_summaries": [],
}
# Process each claim
for claim, time in zip(claims, claim_times):
timing_factor = 1.0 - time # Remaining portion of year
claim_result = self.process_claim(claim, timing_factor)
results["total_deductible"] += claim_result["deductible_paid"]
results["total_recovery"] += claim_result["insurance_recovery"]
results["total_uncovered"] += claim_result["uncovered_loss"]
results["total_reinstatement_premiums"] += claim_result["reinstatement_premiums"]
results["claim_details"].append(claim_result)
# Compile layer summaries
for i, state in enumerate(self.layer_states):
results["layer_summaries"].append(
{
"layer_index": i,
"attachment_point": state.layer.attachment_point,
"limit": state.layer.limit,
"claims_paid": state.total_claims_paid,
"reinstatements_used": state.reinstatements_used,
"reinstatement_premiums": state.reinstatement_premiums_paid,
"utilization_rate": state.get_utilization_rate(),
"is_exhausted": state.is_exhausted,
}
)
results["total_premium_paid"] = (
results["base_premium"] + results["total_reinstatement_premiums"]
)
results["net_benefit"] = results["total_recovery"] - results["total_premium_paid"]
return results
[docs]
def reset_annual(self):
"""Reset program state for new policy year."""
for state in self.layer_states:
state.reset()
self.total_claims = 0
[docs]
def get_program_summary(self) -> Dict[str, Any]:
"""Get current program state summary.
Returns:
Dictionary with program statistics.
"""
return {
"program_name": self.name,
"deductible": self.deductible,
"num_layers": len(self.layers),
"total_coverage": self.get_total_coverage(),
"annual_base_premium": self.calculate_annual_premium(0.0),
"total_claims_processed": self.total_claims,
"total_premiums_paid": self.total_premiums_paid,
"layers": [
{
"attachment": layer.attachment_point,
"limit": layer.limit,
"exhaustion_point": layer.attachment_point + layer.limit,
"reinstatements": layer.reinstatements,
"base_premium": layer.calculate_base_premium(0.0),
}
for layer in self.layers
],
}
[docs]
def get_total_coverage(self) -> float:
"""Calculate maximum possible coverage.
Returns:
Maximum claim amount that can be covered.
"""
if not self.layers:
return 0.0
# Find highest exhaustion point
max_coverage = max(layer.attachment_point + layer.limit for layer in self.layers)
return max_coverage
def _get_default_manufacturer_profile(self) -> Dict[str, Any]:
"""Get default manufacturer profile."""
return {
"initial_assets": 10_000_000,
"annual_revenue": 15_000_000,
"base_operating_margin": 0.08,
"growth_rate": 0.05,
}
def _calculate_insurance_metrics(self, loss_history: List[List[float]]) -> tuple:
"""Calculate metrics with and without insurance.
Returns:
Tuple of (metrics_with, metrics_without) as numpy arrays.
"""
metrics_with_insurance = []
metrics_without_insurance = []
for annual_losses in loss_history:
# Without insurance: company bears all losses
total_loss_without = sum(annual_losses)
net_impact_without = -total_loss_without
# With insurance: apply structure
result = self.process_annual_claims(annual_losses)
total_loss_with = result["total_deductible"]
total_premium_paid = result["total_premium_paid"]
net_impact_with = -total_loss_with - total_premium_paid
metrics_without_insurance.append(net_impact_without)
metrics_with_insurance.append(net_impact_with)
# Reset for next year
self.reset_annual()
return np.array(metrics_with_insurance), np.array(metrics_without_insurance)
def _calculate_time_average_growth(self, metrics: np.ndarray, initial_assets: float) -> tuple:
"""Calculate time-average growth rate.
Returns:
Tuple of (time_avg_growth, final_assets).
"""
assets = initial_assets + np.cumsum(metrics)
assets = np.maximum(assets, 1.0) # Ensure positive for log
if len(assets) > 1:
time_avg = np.log(assets[-1] / initial_assets) / len(assets)
else:
time_avg = 0.0
return time_avg, assets[-1]
[docs]
def calculate_ergodic_benefit(
self,
loss_history: List[List[float]],
manufacturer_profile: Optional[Dict[str, Any]] = None,
time_horizon: int = 100,
) -> Dict[str, float]:
"""Calculate ergodic benefit of insurance structure.
Quantifies time-average growth improvement from insurance coverage
versus ensemble-average cost.
Args:
loss_history: Historical loss data (list of annual loss lists).
manufacturer_profile: Company profile with assets, revenue, etc.
time_horizon: Time horizon for ergodic calculation.
Returns:
Dictionary with ergodic metrics.
"""
if not loss_history:
return {
"time_average_benefit": 0.0,
"ensemble_average_cost": 0.0,
"ergodic_ratio": 0.0,
"volatility_reduction": 0.0,
}
# Default manufacturer profile
if manufacturer_profile is None:
manufacturer_profile = self._get_default_manufacturer_profile()
# Calculate metrics with and without insurance
metrics_with, metrics_without = self._calculate_insurance_metrics(loss_history)
# Time-average growth rates
initial_assets = manufacturer_profile["initial_assets"]
time_avg_with, _ = self._calculate_time_average_growth(metrics_with, initial_assets)
time_avg_without, _ = self._calculate_time_average_growth(metrics_without, initial_assets)
# Ensemble averages
ensemble_avg_without = np.mean(metrics_without)
ensemble_avg_with = np.mean(metrics_with)
# Volatility metrics
volatility_without = np.std(metrics_without)
volatility_with = np.std(metrics_with)
volatility_reduction = (
(volatility_without - volatility_with) / volatility_without
if volatility_without > 0
else 0.0
)
return {
"time_average_benefit": time_avg_with - time_avg_without,
"ensemble_average_cost": ensemble_avg_with - ensemble_avg_without,
"ergodic_ratio": (
time_avg_with / time_avg_without if time_avg_without != 0 else float("inf")
),
"volatility_reduction": volatility_reduction,
"time_avg_growth_with": time_avg_with,
"time_avg_growth_without": time_avg_without,
"ensemble_avg_with": ensemble_avg_with,
"ensemble_avg_without": ensemble_avg_without,
}
[docs]
def find_optimal_attachment_points(
self, loss_data: List[float], num_layers: int = 4, percentiles: Optional[List[float]] = None
) -> List[float]:
"""Find optimal attachment points based on loss frequency/severity.
Uses data-driven approach to minimize gaps while optimizing cost.
Args:
loss_data: Historical loss amounts.
num_layers: Number of layers to optimize.
percentiles: Optional percentiles for attachment points.
Returns:
List of optimal attachment points.
"""
if not loss_data or num_layers <= 0:
return []
loss_array = np.array(loss_data)
loss_array = loss_array[loss_array > 0] # Filter positive losses
if len(loss_array) == 0:
return []
# Default percentiles based on typical layer structure
if percentiles is None:
if num_layers == 3:
percentiles = [50, 90, 99] # Working, excess, cat
elif num_layers == 4:
percentiles = [40, 80, 95, 99.5]
elif num_layers == 5:
percentiles = [30, 60, 85, 95, 99.5]
else:
# Even distribution
percentiles = np.linspace(100 / (num_layers + 1), 99, num_layers).tolist()
# Calculate attachment points from percentiles
attachment_points = []
for p in percentiles:
attachment = float(np.percentile(loss_array, p))
attachment_points.append(attachment)
# Round to reasonable values
attachment_points = [self._round_attachment_point(ap) for ap in attachment_points]
# Ensure strictly increasing
for i in range(1, len(attachment_points)):
if attachment_points[i] <= attachment_points[i - 1]:
attachment_points[i] = attachment_points[i - 1] * 1.5
return attachment_points
def _round_attachment_point(self, value: float) -> float:
"""Round attachment point to reasonable market value."""
if value < 100_000:
return float(round(value / 10_000) * 10_000)
if value < 1_000_000:
return float(round(value / 50_000) * 50_000)
if value < 10_000_000:
return float(round(value / 250_000) * 250_000)
return float(round(value / 1_000_000) * 1_000_000)
def _get_layer_capacity(self, attachment_point: float) -> float:
"""Get default capacity for a layer based on attachment point."""
capacity_thresholds = [
(1_000_000, 5_000_000),
(10_000_000, 25_000_000),
(50_000_000, 50_000_000),
(float("inf"), 100_000_000),
]
for threshold, capacity in capacity_thresholds:
if attachment_point < threshold:
return capacity
return 100_000_000 # Default fallback
[docs]
def optimize_layer_widths(
self,
attachment_points: List[float],
total_budget: float,
capacity_constraints: Optional[Dict[str, float]] = None,
loss_data: Optional[List[float]] = None,
) -> List[float]:
"""Optimize layer widths given attachment points and constraints.
Args:
attachment_points: Fixed attachment points for layers.
total_budget: Total premium budget.
capacity_constraints: Optional max capacity per layer.
loss_data: Optional loss data for severity analysis.
Returns:
List of optimal layer widths.
"""
if not attachment_points:
return []
num_layers = len(attachment_points)
# Default capacity constraints
if capacity_constraints is None:
capacity_constraints = {
f"layer_{i}": self._get_layer_capacity(ap) for i, ap in enumerate(attachment_points)
}
# Analyze loss severity at each attachment point
severity_weights: List[float] = []
if loss_data:
for ap in attachment_points:
excess_losses = [max(0, loss - ap) for loss in loss_data if loss > ap]
avg_excess = float(np.mean(excess_losses)) if excess_losses else float(ap)
severity_weights.append(avg_excess)
else:
# Default weights based on attachment points
severity_weights = [1.0 / (i + 1) for i in range(num_layers)]
# Normalize weights
total_weight = sum(severity_weights)
if total_weight > 0:
severity_weights = [w / total_weight for w in severity_weights]
else:
severity_weights = [1.0 / num_layers] * num_layers
# Calculate layer widths based on budget and weights
layer_widths = []
for i, (ap, weight) in enumerate(zip(attachment_points, severity_weights)):
# Estimate premium rate (decreasing with attachment)
if ap < 1_000_000:
rate = 0.015
elif ap < 5_000_000:
rate = 0.010
elif ap < 25_000_000:
rate = 0.006
else:
rate = 0.003
# Calculate width from budget allocation
allocated_budget = total_budget * weight
width = allocated_budget / rate
# Apply capacity constraint
max_capacity = capacity_constraints.get(f"layer_{i}", float("inf"))
width = min(float(width), float(max_capacity))
# Ensure minimum width
min_width = ap * 0.5 if i == 0 else attachment_points[i - 1] * 0.3
width = max(float(width), float(min_width))
layer_widths.append(self._round_attachment_point(float(width)))
return layer_widths
def _get_base_premium_rate(self, attachment_point: float) -> float:
"""Get base premium rate based on attachment point."""
rate_thresholds = [
(1_000_000, 0.015),
(5_000_000, 0.010),
(25_000_000, 0.006),
(float("inf"), 0.003),
]
for threshold, rate in rate_thresholds:
if attachment_point < threshold:
return rate
return 0.003 # Default fallback
def _calculate_reinstatements(self, layer_index: int, num_layers: int) -> int:
"""Calculate reinstatements for a layer."""
if layer_index == 0:
return 0 # Primary layer
if layer_index == num_layers - 1:
return 999 # Top layer - unlimited
return max(0, 2 - layer_index // 2) # Decreasing with height
def _create_layer_structure(
self, attachment_points: List[float], layer_widths: List[float]
) -> List[EnhancedInsuranceLayer]:
"""Create insurance layers from attachment points and widths."""
layers = []
num_layers = len(attachment_points)
for i, (ap, width) in enumerate(zip(attachment_points, layer_widths)):
layer = EnhancedInsuranceLayer(
attachment_point=ap,
limit=width,
base_premium_rate=self._get_base_premium_rate(ap),
reinstatements=self._calculate_reinstatements(i, num_layers),
reinstatement_premium=1.0,
reinstatement_type=ReinstatementType.PRO_RATA,
)
layers.append(layer)
return layers
def _calculate_roe_improvement(
self, ergodic_metrics: Dict[str, float], company_profile: Optional[Dict[str, Any]]
) -> float:
"""Calculate ROE improvement from ergodic metrics."""
if company_profile and "initial_assets" in company_profile:
initial_roe = 0.08 # Baseline assumption
improved_roe = initial_roe + ergodic_metrics["time_average_benefit"]
return improved_roe / initial_roe - 1.0
return ergodic_metrics["time_average_benefit"] / 0.08
[docs]
def optimize_layer_structure(
self,
loss_data: List[List[float]],
company_profile: Optional[Dict[str, Any]] = None,
constraints: Optional[OptimizationConstraints] = None,
) -> OptimalStructure:
"""Optimize complete insurance layer structure.
Main optimization method that orchestrates layer count, attachment points,
and widths to maximize ergodic benefit.
Args:
loss_data: Historical loss data (list of annual loss lists).
company_profile: Company financial profile.
constraints: Optimization constraints.
Returns:
Optimal insurance structure.
"""
if constraints is None:
constraints = OptimizationConstraints()
# Flatten loss data for analysis
all_losses = [loss for annual_losses in loss_data for loss in annual_losses]
# Determine optimal number of layers
best_structure = None
best_ergodic_benefit = -float("inf")
for num_layers in range(constraints.min_layers, constraints.max_layers + 1):
# Find attachment points
attachment_points = self.find_optimal_attachment_points(all_losses, num_layers)
if not attachment_points:
continue
# Set deductible and budget
deductible = attachment_points[0] * 0.5
budget = constraints.max_total_premium or (
float(np.mean([sum(annual) for annual in loss_data])) * 0.15
)
# Optimize layer widths
layer_widths = self.optimize_layer_widths(
attachment_points, budget, loss_data=all_losses
)
# Create and test layer structure
layers = self._create_layer_structure(attachment_points, layer_widths)
test_program = InsuranceProgram(layers=layers, deductible=deductible)
ergodic_metrics = test_program.calculate_ergodic_benefit(loss_data, company_profile)
# Check if this is better
if ergodic_metrics["time_average_benefit"] > best_ergodic_benefit:
best_ergodic_benefit = ergodic_metrics["time_average_benefit"]
roe_improvement = self._calculate_roe_improvement(ergodic_metrics, company_profile)
best_structure = OptimalStructure(
layers=layers,
deductible=deductible,
total_premium=test_program.calculate_annual_premium(0.0),
total_coverage=test_program.get_total_coverage(),
ergodic_benefit=best_ergodic_benefit,
roe_improvement=roe_improvement,
optimization_metrics=ergodic_metrics,
convergence_achieved=True,
iterations_used=num_layers - constraints.min_layers + 1,
)
# If no structure found, return a basic one
if best_structure is None:
basic_layers = [
EnhancedInsuranceLayer(
attachment_point=250_000,
limit=4_750_000,
base_premium_rate=0.015,
reinstatements=0,
)
]
basic_program = InsuranceProgram(layers=basic_layers, deductible=250_000)
best_structure = OptimalStructure(
layers=basic_layers,
deductible=250_000,
total_premium=basic_program.calculate_annual_premium(0.0),
total_coverage=5_000_000,
ergodic_benefit=0.0,
roe_improvement=0.0,
optimization_metrics={},
convergence_achieved=False,
iterations_used=0,
)
return best_structure
[docs]
@classmethod
def from_yaml(cls, config_path: str) -> "InsuranceProgram":
"""Load insurance program from YAML configuration.
Args:
config_path: Path to YAML configuration file.
Returns:
Configured InsuranceProgram instance.
"""
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
# Extract program parameters
deductible = config.get("deductible", 0.0)
name = config.get("program_name", "Insurance Program")
# Create layers
layers = []
for layer_config in config.get("layers", []):
# Parse reinstatement type
reinstatement_type_str = layer_config.get("reinstatement_type", "pro_rata")
reinstatement_type = ReinstatementType(reinstatement_type_str)
layer = EnhancedInsuranceLayer(
attachment_point=layer_config["attachment_point"],
limit=layer_config["limit"],
base_premium_rate=layer_config.get(
"base_premium_rate",
layer_config.get("premium_rate", layer_config.get("rate", 0.01)),
),
reinstatements=layer_config.get("reinstatements", 0),
reinstatement_premium=layer_config.get("reinstatement_premium", 1.0),
reinstatement_type=reinstatement_type,
aggregate_limit=layer_config.get("aggregate_limit"),
)
layers.append(layer)
return cls(layers=layers, deductible=deductible, name=name)
[docs]
@classmethod
def create_standard_manufacturing_program(
cls, deductible: float = 250_000
) -> "InsuranceProgram":
"""Create standard manufacturing insurance program.
Args:
deductible: Self-insured retention amount.
Returns:
Standard manufacturing insurance program.
"""
layers = [
# Primary Layer
EnhancedInsuranceLayer(
attachment_point=deductible,
limit=5_000_000 - deductible,
base_premium_rate=0.015, # 1.5% rate
reinstatements=0,
),
# First Excess
EnhancedInsuranceLayer(
attachment_point=5_000_000,
limit=20_000_000,
base_premium_rate=0.008, # 0.8% rate
reinstatements=1,
reinstatement_premium=1.0,
reinstatement_type=ReinstatementType.FULL,
),
# Second Excess
EnhancedInsuranceLayer(
attachment_point=25_000_000,
limit=25_000_000,
base_premium_rate=0.004, # 0.4% rate
reinstatements=2,
reinstatement_premium=1.0,
reinstatement_type=ReinstatementType.PRO_RATA,
),
# Third Excess
EnhancedInsuranceLayer(
attachment_point=50_000_000,
limit=50_000_000,
base_premium_rate=0.002, # 0.2% rate
reinstatements=999, # Effectively unlimited
reinstatement_premium=1.0,
reinstatement_type=ReinstatementType.PRO_RATA,
),
]
return cls(layers=layers, deductible=deductible, name="Standard Manufacturing Program")
[docs]
def apply_pricing(
self,
expected_revenue: float,
market_cycle: Optional["MarketCycle"] = None,
loss_generator: Optional["ManufacturingLossGenerator"] = None,
) -> None:
"""Apply dynamic pricing to all layers in the program.
Updates layer premium rates based on frequency/severity calculations.
Args:
expected_revenue: Expected annual revenue for scaling
market_cycle: Optional market cycle state
loss_generator: Optional loss generator (uses pricer's if not provided)
Raises:
ValueError: If pricing not enabled or pricer not configured
"""
if not self.pricing_enabled:
raise ValueError("Pricing not enabled for this program")
if self.pricer is None:
if loss_generator is None:
raise ValueError("Either pricer or loss_generator must be provided")
# Create a default pricer
from .insurance_pricing import InsurancePricer, MarketCycle
self.pricer = InsurancePricer(
loss_generator=loss_generator,
market_cycle=market_cycle or MarketCycle.NORMAL,
)
# Apply pricing to the program
self.pricer.price_insurance_program(
program=self,
expected_revenue=expected_revenue,
market_cycle=market_cycle,
update_program=True,
)
[docs]
def get_pricing_summary(self) -> Dict[str, Any]:
"""Get summary of current pricing.
Returns:
Dictionary with pricing details for each layer
"""
summary: Dict[str, Any] = {
"program_name": self.name,
"pricing_enabled": self.pricing_enabled,
"total_premium": self.calculate_annual_premium(0.0),
"layers": [],
}
if self.pricing_results:
for i, (layer, pricing) in enumerate(zip(self.layers, self.pricing_results)):
summary["layers"].append(
{
"index": i,
"attachment_point": layer.attachment_point,
"limit": layer.limit,
"base_premium_rate": layer.base_premium_rate,
"market_premium": (
pricing.market_premium
if pricing
else layer.limit * layer.base_premium_rate
),
"pure_premium": pricing.pure_premium if pricing else None,
"expected_frequency": pricing.expected_frequency if pricing else None,
"expected_severity": pricing.expected_severity if pricing else None,
}
)
else:
for i, layer in enumerate(self.layers):
summary["layers"].append(
{
"index": i,
"attachment_point": layer.attachment_point,
"limit": layer.limit,
"base_premium_rate": layer.base_premium_rate,
"premium": layer.calculate_base_premium(0.0),
}
)
return summary
[docs]
@classmethod
def create_with_pricing(
cls,
layers: List[EnhancedInsuranceLayer],
loss_generator: "ManufacturingLossGenerator",
expected_revenue: float,
market_cycle: Optional["MarketCycle"] = None,
deductible: float = 0.0,
name: str = "Priced Insurance Program",
) -> "InsuranceProgram":
"""Create insurance program with dynamic pricing.
Factory method that creates a program with pricing already applied.
Args:
layers: Initial layer structure
loss_generator: Loss generator for pricing
expected_revenue: Expected annual revenue
market_cycle: Market cycle state
deductible: Self-insured retention
name: Program name
Returns:
InsuranceProgram with pricing applied
"""
from .insurance_pricing import InsurancePricer, MarketCycle
# Create pricer
pricer = InsurancePricer(
loss_generator=loss_generator,
market_cycle=market_cycle or MarketCycle.NORMAL,
)
# Create program with pricing enabled
program = cls(
layers=layers,
deductible=deductible,
name=name,
pricing_enabled=True,
pricer=pricer,
)
# Apply pricing
program.apply_pricing(expected_revenue, market_cycle)
return program
[docs]
@dataclass
class ProgramState:
"""Tracks multi-year insurance program state for simulations.
Maintains historical data and statistics across multiple
policy periods for long-term analysis.
"""
program: InsuranceProgram
years_simulated: int = 0
total_claims: List[float] = field(default_factory=list)
total_recoveries: List[float] = field(default_factory=list)
total_premiums: List[float] = field(default_factory=list)
annual_results: List[Dict] = field(default_factory=list)
[docs]
def simulate_year(
self, annual_claims: List[float], claim_times: Optional[List[float]] = None
) -> Dict[str, Any]:
"""Simulate one year of the insurance program.
Args:
annual_claims: List of claims for the year.
claim_times: Optional timing of claims.
Returns:
Annual results dictionary.
"""
# Reset for new year
self.program.reset_annual()
# Process claims
results = self.program.process_annual_claims(annual_claims, claim_times)
# Track statistics
self.years_simulated += 1
self.total_claims.append(sum(annual_claims))
self.total_recoveries.append(results["total_recovery"])
self.total_premiums.append(results["total_premium_paid"])
self.annual_results.append(results)
return results
[docs]
def get_summary_statistics(self) -> Dict[str, Any]:
"""Calculate summary statistics across all simulated years.
Returns:
Dictionary with multi-year statistics.
"""
if self.years_simulated == 0:
return {}
return {
"years_simulated": self.years_simulated,
"average_annual_claims": float(np.mean(self.total_claims)),
"average_annual_recovery": float(np.mean(self.total_recoveries)),
"average_annual_premium": float(np.mean(self.total_premiums)),
"total_claims": sum(self.total_claims),
"total_recoveries": sum(self.total_recoveries),
"total_premiums": sum(self.total_premiums),
"net_benefit": sum(self.total_recoveries) - sum(self.total_premiums),
"recovery_ratio": (
sum(self.total_recoveries) / sum(self.total_claims)
if sum(self.total_claims) > 0
else 0
),
"loss_ratio": (
sum(self.total_recoveries) / sum(self.total_premiums)
if sum(self.total_premiums) > 0
else 0
),
}