"""Multi-layer insurance program with reinstatements and advanced features.
This module provides comprehensive insurance program management including
multi-layer structures, reinstatements, attachment points, and accurate
loss allocation for manufacturing risk transfer optimization.
"""
from __future__ import annotations
from collections import deque
from dataclasses import dataclass, field
from enum import Enum
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import numpy as np
import yaml
from ._warnings import ErgodicInsuranceDeprecationWarning
from .ergodic_types import ClaimResult, LayerPayment
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from .config.insurance import InsuranceConfig
from .exposure_base import ExposureBase
from .insurance_pricing import InsurancePricer, MarketCycle
from .loss_distributions import LossEvent, ManufacturingLossGenerator
from .manufacturer import WidgetManufacturer
[docs]
class ReinstatementType(Enum):
"""Types of reinstatement provisions."""
NONE = "none"
PRO_RATA = "pro_rata" # Premium based on time remaining
FULL = "full" # Full premium regardless of timing
FREE = "free" # No additional premium
[docs]
@dataclass
class ProgramOptimizationConstraints:
"""Constraints for insurance program optimization."""
max_total_premium: Optional[float] = None # Budget constraint
min_total_coverage: Optional[float] = None # Minimum coverage requirement
max_layers: int = 5 # Maximum number of layers
min_layers: int = 3 # Minimum number of layers
max_attachment_gap: float = 0.0 # Maximum gap between layers (0 = no gaps)
min_roe_improvement: float = 0.15 # Minimum ROE improvement target
max_iterations: int = 1000 # Maximum optimization iterations
convergence_tolerance: float = 1e-6 # Convergence tolerance
[docs]
@dataclass
class OptimalStructure:
"""Result of insurance structure optimization."""
layers: List[EnhancedInsuranceLayer]
deductible: float
total_premium: float
total_coverage: float
ergodic_benefit: float
roe_improvement: float
optimization_metrics: Dict[str, Any]
convergence_achieved: bool
iterations_used: int
[docs]
@dataclass
class EnhancedInsuranceLayer:
"""Insurance layer with reinstatement support and advanced features.
Extends basic layer functionality with reinstatements, tracking,
and more sophisticated premium calculations.
"""
attachment_point: float # Where coverage starts
limit: float # Maximum coverage amount (interpretation depends on limit_type)
base_premium_rate: float # % of limit as base premium
reinstatements: int = 0 # Number of reinstatements available
reinstatement_premium: float = 1.0 # % of original premium per reinstatement
reinstatement_type: ReinstatementType = ReinstatementType.PRO_RATA
aggregate_limit: Optional[float] = None # Annual aggregate limit (for aggregate/hybrid types)
participation_rate: float = 1.0 # % of loss covered by this layer (default 100%)
limit_type: str = "per-occurrence" # Type of limit: "per-occurrence", "aggregate", or "hybrid"
per_occurrence_limit: Optional[float] = None # Per-occurrence limit (for hybrid type)
premium_rate_exposure: Optional["ExposureBase"] = (
None # Exposure object for dynamic premium scaling
)
exhausted: float = field(default=0.0, init=False)
[docs]
def __post_init__(self):
"""Validate layer parameters."""
if self.attachment_point < 0:
raise ValueError(f"Attachment point must be non-negative, got {self.attachment_point}")
if self.limit <= 0:
raise ValueError(f"Limit must be positive, got {self.limit}")
if self.base_premium_rate < 0:
raise ValueError(
f"Base premium rate must be non-negative, got {self.base_premium_rate}"
)
if self.reinstatements < 0:
raise ValueError(f"Reinstatements must be non-negative, got {self.reinstatements}")
if self.reinstatement_premium < 0:
raise ValueError(
f"Reinstatement premium must be non-negative, got {self.reinstatement_premium}"
)
# Validate limit type
valid_limit_types = ["per-occurrence", "aggregate", "hybrid"]
if self.limit_type not in valid_limit_types:
raise ValueError(
f"Invalid limit_type: {self.limit_type}. Must be one of {valid_limit_types}"
)
# Handle different limit type configurations
if self.limit_type == "per-occurrence":
# For per-occurrence, the limit field is the per-occurrence limit
# No aggregate limit should be set (will be ignored if set)
if self.reinstatements > 0:
logger.warning(
"Reinstatements parameter (%s) is not used for per-occurrence limits.",
self.reinstatements,
)
elif self.limit_type == "aggregate":
# For aggregate, the limit field is the aggregate limit
# Set aggregate_limit to match limit if not already set
if self.aggregate_limit is None:
self.aggregate_limit = self.limit
elif self.limit_type == "hybrid":
# For hybrid, need both per-occurrence and aggregate limits
if self.per_occurrence_limit is None:
# If not specified, use limit as per-occurrence limit
self.per_occurrence_limit = self.limit
if self.aggregate_limit is None:
# If aggregate not specified, error
raise ValueError(
"Hybrid limit type requires both per_occurrence_limit and aggregate_limit"
)
if self.per_occurrence_limit <= 0:
raise ValueError(
f"Per-occurrence limit must be positive, got {self.per_occurrence_limit}"
)
if self.aggregate_limit <= 0:
raise ValueError(f"Aggregate limit must be positive, got {self.aggregate_limit}")
# Validate aggregate limit if specified
if self.aggregate_limit is not None and self.aggregate_limit <= 0:
raise ValueError(f"Aggregate limit must be positive if set, got {self.aggregate_limit}")
[docs]
def calculate_base_premium(self, time: float = 0.0) -> float:
"""Calculate base premium for this layer.
Args:
time: Time in years for exposure calculation (default 0.0).
Returns:
Base premium amount (rate × limit × exposure_multiplier).
"""
base_premium = self.limit * self.base_premium_rate
# Apply exposure scaling if available
if self.premium_rate_exposure is not None:
multiplier = self.premium_rate_exposure.get_frequency_multiplier(time)
return base_premium * multiplier
return base_premium
[docs]
def calculate_reinstatement_premium(self, timing_factor: float = 1.0) -> float:
"""Calculate premium for a single reinstatement.
Args:
timing_factor: Pro-rata factor based on policy period remaining (0-1).
Returns:
Reinstatement premium amount.
"""
base_premium = self.calculate_base_premium()
if self.reinstatement_type == ReinstatementType.FREE:
return 0.0
if self.reinstatement_type == ReinstatementType.FULL:
return base_premium * self.reinstatement_premium
if self.reinstatement_type == ReinstatementType.PRO_RATA:
return base_premium * self.reinstatement_premium * timing_factor
return 0.0
[docs]
def can_respond(self, loss_amount: float) -> bool:
"""Check if this layer can respond to a loss.
Args:
loss_amount: Total loss amount.
Returns:
True if loss exceeds attachment point.
"""
return loss_amount > self.attachment_point
[docs]
def calculate_layer_loss(self, total_loss: float) -> float:
"""Calculate the portion of loss hitting this layer.
Args:
total_loss: Total loss amount.
Returns:
Amount of loss allocated to this layer (before applying limits).
"""
if total_loss <= self.attachment_point:
return 0.0
excess_loss = total_loss - self.attachment_point
# Apply the appropriate limit based on limit_type
if self.limit_type == "per-occurrence":
# For per-occurrence, limit each individual loss
return min(excess_loss, self.limit)
if self.limit_type == "aggregate":
# For aggregate, limit is handled in process_claim via aggregate tracking
# Here we just return the excess without limiting
return min(excess_loss, self.limit)
if self.limit_type == "hybrid":
# For hybrid, apply per-occurrence limit first
# Aggregate limit is handled in process_claim
return min(excess_loss, self.per_occurrence_limit or self.limit)
return min(excess_loss, self.limit)
[docs]
@dataclass
class LayerState:
"""Tracks the current state of an insurance layer during simulation.
Maintains utilization, reinstatement count, and exhaustion status
for accurate multi-claim processing.
"""
layer: EnhancedInsuranceLayer
current_limit: float = field(init=False)
used_limit: float = 0.0
reinstatements_used: int = 0
total_claims_paid: float = 0.0
reinstatement_premiums_paid: float = 0.0
is_exhausted: bool = False
aggregate_used: float = 0.0 # For annual aggregate tracking
[docs]
def __post_init__(self):
"""Initialize current limit to layer's base limit."""
self.current_limit = self.layer.limit
[docs]
def process_claim(self, claim_amount: float, timing_factor: float = 1.0) -> Tuple[float, float]:
"""Process a claim against this layer.
Args:
claim_amount: Amount of loss allocated to this layer.
timing_factor: Pro-rata factor for reinstatement premium.
Returns:
Tuple of (amount_paid, reinstatement_premium).
"""
if claim_amount <= 0:
return 0.0, 0.0
# Handle different limit types
if self.layer.limit_type == "per-occurrence":
# Per-occurrence: each claim limited but no exhaustion
payment = min(claim_amount, self.layer.limit)
self.total_claims_paid += payment
return payment, 0.0 # No reinstatement premiums for per-occurrence
if self.layer.limit_type == "aggregate":
# Aggregate: track total usage and exhaust when limit reached
if self.is_exhausted:
return 0.0, 0.0
total_payment = 0.0
total_reinstatement_premium = 0.0
remaining_claim = claim_amount
# Process claim with current limit (may trigger reinstatements)
while remaining_claim > 0 and not self.is_exhausted:
# Calculate available limit
available_limit = self.current_limit
# Check aggregate limit if it exists
if self.layer.aggregate_limit is not None:
remaining_aggregate = self.layer.aggregate_limit - self.aggregate_used
if remaining_aggregate <= 0:
self.is_exhausted = True
break
available_limit = min(available_limit, remaining_aggregate)
# Calculate payment from available limit
payment = min(remaining_claim, available_limit)
if payment > 0:
self.used_limit += payment
self.current_limit -= payment
self.total_claims_paid += payment
self.aggregate_used += payment
total_payment += payment
remaining_claim -= payment
# Check if aggregate exhausted
if (
self.layer.aggregate_limit is not None
and self.aggregate_used >= self.layer.aggregate_limit
):
# Check if this is a single aggregate that can be reinstated
# (aggregate_limit == limit means single aggregate with reinstatements)
# (aggregate_limit > limit means overall cap across reinstatements)
if (
self.layer.aggregate_limit == self.layer.limit
and self.reinstatements_used < self.layer.reinstatements
):
# Single aggregate with reinstatements - can reinstate
self.reinstatements_used += 1
self.current_limit = self.layer.limit
self.aggregate_used = 0 # Reset aggregate tracking
reinstatement_premium = self.layer.calculate_reinstatement_premium(
timing_factor
)
self.reinstatement_premiums_paid += reinstatement_premium
total_reinstatement_premium += reinstatement_premium
# Continue processing if there's remaining claim
if remaining_claim > 0:
continue
else:
# Overall cap reached - cannot be reinstated
self.is_exhausted = True
self.current_limit = 0
break
# Check if limit exhausted and can reinstate
if self.current_limit == 0 and not self.is_exhausted:
if self.reinstatements_used < self.layer.reinstatements:
# Trigger reinstatement
self.reinstatements_used += 1
self.current_limit = self.layer.limit
reinstatement_premium = self.layer.calculate_reinstatement_premium(
timing_factor
)
self.reinstatement_premiums_paid += reinstatement_premium
total_reinstatement_premium += reinstatement_premium
# Continue processing if there's remaining claim
if remaining_claim == 0:
break
else:
# No more reinstatements available
self.is_exhausted = True
break
elif remaining_claim == 0:
# Claim fully processed - check if we need reinstatement
if (
self.current_limit == 0
and self.reinstatements_used < self.layer.reinstatements
and not self.is_exhausted
):
# Trigger reinstatement for future use
self.reinstatements_used += 1
self.current_limit = self.layer.limit
reinstatement_premium = self.layer.calculate_reinstatement_premium(
timing_factor
)
self.reinstatement_premiums_paid += reinstatement_premium
total_reinstatement_premium += reinstatement_premium
break
return total_payment, total_reinstatement_premium
if self.layer.limit_type == "hybrid":
# Hybrid: apply both per-occurrence and aggregate constraints
if self.is_exhausted:
return 0.0, 0.0
# First apply per-occurrence limit
per_occurrence_limit = self.layer.per_occurrence_limit or self.layer.limit
max_payment = min(claim_amount, per_occurrence_limit)
# Then check against remaining aggregate
if self.layer.aggregate_limit is not None:
remaining_aggregate = self.layer.aggregate_limit - self.aggregate_used
if remaining_aggregate <= 0:
self.is_exhausted = True
return 0.0, 0.0
max_payment = min(max_payment, remaining_aggregate)
# Make the payment
if max_payment > 0:
self.total_claims_paid += max_payment
self.aggregate_used += max_payment
# Check if aggregate is now exhausted
if (
self.layer.aggregate_limit is not None
and self.aggregate_used >= self.layer.aggregate_limit
):
self.is_exhausted = True
# Calculate reinstatement premium if aggregate portion exhausted
reinstatement_premium = 0.0
if self.is_exhausted and self.reinstatements_used < self.layer.reinstatements:
self.reinstatements_used += 1
self.aggregate_used = 0 # Reset aggregate tracking
self.is_exhausted = False
reinstatement_premium = self.layer.calculate_reinstatement_premium(timing_factor)
self.reinstatement_premiums_paid += reinstatement_premium
return max_payment, reinstatement_premium
# Fallback to aggregate behavior
return self._process_claim_aggregate(claim_amount, timing_factor)
def _process_claim_aggregate(
self, claim_amount: float, timing_factor: float = 1.0
) -> Tuple[float, float]:
"""Original aggregate claim processing logic (for backward compatibility)."""
if self.is_exhausted or claim_amount <= 0:
return 0.0, 0.0
total_payment = 0.0
total_reinstatement_premium = 0.0
remaining_claim = claim_amount
# Process claim with current limit (may trigger reinstatements)
while remaining_claim > 0 and not self.is_exhausted:
# Check aggregate limit
available_limit = self.current_limit
if self.layer.aggregate_limit is not None:
remaining_aggregate = self.layer.aggregate_limit - self.aggregate_used
if remaining_aggregate <= 0:
self.is_exhausted = True
break
available_limit = min(available_limit, remaining_aggregate)
# Calculate payment from available limit
payment = min(remaining_claim, available_limit)
if payment > 0:
self.used_limit += payment
self.current_limit -= payment
self.total_claims_paid += payment
total_payment += payment
remaining_claim -= payment
# Update aggregate if applicable
if self.layer.aggregate_limit is not None:
self.aggregate_used += payment
if self.aggregate_used >= self.layer.aggregate_limit:
self.is_exhausted = True
self.current_limit = 0.0
break
# Check if limit exhausted and can reinstate
if self.current_limit == 0 and not self.is_exhausted:
if self.reinstatements_used < self.layer.reinstatements:
# Trigger reinstatement
self.reinstatements_used += 1
self.current_limit = self.layer.limit
reinstatement_premium = self.layer.calculate_reinstatement_premium(
timing_factor
)
self.reinstatement_premiums_paid += reinstatement_premium
total_reinstatement_premium += reinstatement_premium
else:
# No more reinstatements available
self.is_exhausted = True
break
else:
# Can't process more of this claim
break
return total_payment, total_reinstatement_premium
[docs]
def reset(self):
"""Reset layer state for new policy period."""
self.current_limit = self.layer.limit
self.used_limit = 0.0
self.reinstatements_used = 0
self.total_claims_paid = 0.0
self.reinstatement_premiums_paid = 0.0
self.is_exhausted = False
self.aggregate_used = 0.0
[docs]
def get_available_limit(self) -> float:
"""Get currently available limit.
Returns:
Available limit for claims.
"""
return self.current_limit if not self.is_exhausted else 0.0
[docs]
def get_utilization_rate(self) -> float:
"""Calculate layer utilization rate.
Returns:
Utilization as percentage of total available limit.
"""
total_available = self.layer.limit * (1 + self.layer.reinstatements)
if total_available == 0:
return 0.0
return self.total_claims_paid / total_available
[docs]
class InsuranceProgram:
"""Comprehensive multi-layer insurance program manager.
Handles complex insurance structures with multiple layers,
reinstatements, and sophisticated claim allocation.
"""
def __init__(
self,
layers: List[EnhancedInsuranceLayer],
deductible: float = 0.0,
name: str = "Manufacturing Insurance Program",
pricing_enabled: bool = False,
pricer: Optional["InsurancePricer"] = None,
max_history_years: int = 50,
):
"""Initialize insurance program.
Args:
layers: List of insurance layers (will be sorted by attachment).
deductible: Self-insured retention before insurance.
name: Program identifier.
pricing_enabled: Whether to use dynamic pricing.
pricer: Optional InsurancePricer for dynamic pricing.
max_history_years: Maximum years of detailed history to retain in
:class:`ProgramState` deques. Older entries are automatically
discarded. Passed through by :meth:`create_fresh`.
"""
self.layers = sorted(layers, key=lambda x: x.attachment_point)
self.deductible = deductible
self.name = name
self.layer_states = [LayerState(layer) for layer in self.layers]
self.total_premiums_paid = 0.0
self.total_claims = 0
self.pricing_enabled = pricing_enabled
self.pricer = pricer
self.pricing_results: List[Any] = []
self.max_history_years = max_history_years
[docs]
@classmethod
def create_fresh(cls, source: "InsuranceProgram") -> "InsuranceProgram":
"""Create a fresh program from an existing program's configuration.
Factory method that avoids ``copy.deepcopy`` by constructing a new
instance directly from immutable layer definitions. Use this in hot
loops (e.g. Monte Carlo workers) where each simulation needs clean
initial state.
The new instance shares the same :class:`EnhancedInsuranceLayer`
objects (which are immutable after construction) but builds fresh
:class:`LayerState` wrappers with zeroed counters.
Args:
source: An existing program whose configuration is reused.
Returns:
A new ``InsuranceProgram`` with pristine mutable state.
"""
return cls(
layers=source.layers,
deductible=source.deductible,
name=source.name,
pricing_enabled=source.pricing_enabled,
pricer=source.pricer,
max_history_years=source.max_history_years,
)
[docs]
@classmethod
def from_config(
cls,
insurance_config: "InsuranceConfig",
name: str = "Insurance Program",
**kwargs,
) -> "InsuranceProgram":
"""Create an insurance program from an :class:`InsuranceConfig` object.
Bridges the config system (Pydantic models loaded from YAML/profiles)
to the runtime simulation by converting each
:class:`InsuranceLayerConfig` into an :class:`EnhancedInsuranceLayer`.
Args:
insurance_config: Validated insurance configuration, typically
obtained via ``Config.from_yaml(...).insurance``.
name: Program identifier.
**kwargs: Additional keyword arguments forwarded to the
``InsuranceProgram`` constructor (e.g. ``pricing_enabled``).
Returns:
Configured InsuranceProgram ready for simulation.
Raises:
TypeError: If *insurance_config* is not an ``InsuranceConfig``.
Examples:
From a YAML file::
config = Config.from_yaml(Path("my_config.yaml"))
program = InsuranceProgram.from_config(config.insurance)
Inline config::
from ergodic_insurance.config.insurance import (
InsuranceConfig, InsuranceLayerConfig,
)
ic = InsuranceConfig(
deductible=500_000,
layers=[
InsuranceLayerConfig(
name="Primary",
attachment=500_000,
limit=5_000_000,
base_premium_rate=0.015,
),
],
)
program = InsuranceProgram.from_config(ic)
"""
from .config.insurance import InsuranceConfig
if not isinstance(insurance_config, InsuranceConfig):
raise TypeError(f"Expected InsuranceConfig, got {type(insurance_config).__name__}")
layers: List[EnhancedInsuranceLayer] = []
for lc in insurance_config.layers:
layer = EnhancedInsuranceLayer(
attachment_point=lc.attachment,
limit=lc.limit,
base_premium_rate=lc.base_premium_rate,
reinstatements=lc.reinstatements,
aggregate_limit=lc.aggregate_limit,
limit_type=lc.limit_type,
per_occurrence_limit=lc.per_occurrence_limit,
)
layers.append(layer)
return cls(
layers=layers,
deductible=insurance_config.deductible,
name=name,
**kwargs,
)
[docs]
@classmethod
def simple(
cls,
deductible: float,
limit: float,
rate: float,
name: str = "Simple Insurance Program",
**kwargs,
) -> "InsuranceProgram":
"""Create a single-layer insurance program from basic parameters.
Convenience factory for the most common use case: a single primary
layer where the attachment point equals the deductible.
Args:
deductible: Self-insured retention in dollars.
limit: Maximum coverage amount in dollars above the deductible.
rate: Annual premium as a fraction of the limit (e.g. 0.025 for 2.5%).
name: Program identifier.
**kwargs: Additional keyword arguments (e.g. pricing_enabled, pricer).
Returns:
InsuranceProgram with a single layer.
Examples:
Quick single-layer program::
program = InsuranceProgram.simple(
deductible=500_000,
limit=10_000_000,
rate=0.025,
)
"""
layer = EnhancedInsuranceLayer(
attachment_point=deductible,
limit=limit,
base_premium_rate=rate,
reinstatements=0,
)
return cls(layers=[layer], deductible=deductible, name=name, **kwargs)
[docs]
def calculate_premium(self, time: float = 0.0) -> float:
"""Calculate total annual premium for the program.
Args:
time: Time in years for exposure calculation (default 0.0).
Returns:
Total base premium across all layers.
"""
return sum(layer.calculate_base_premium(time) for layer in self.layers)
[docs]
def calculate_annual_premium(self, time: float = 0.0) -> float:
"""Calculate total annual premium for the program.
.. deprecated::
Use :meth:`calculate_premium` instead. This method will be
removed in a future release.
Args:
time: Time in years for exposure calculation (default 0.0).
Returns:
Total base premium across all layers.
"""
import warnings
warnings.warn(
"calculate_annual_premium() is deprecated, use calculate_premium() instead",
ErgodicInsuranceDeprecationWarning,
stacklevel=2,
)
return self.calculate_premium(time)
[docs]
def process_claim(self, claim_amount: float, timing_factor: float = 1.0) -> ClaimResult:
"""Process a single claim through the insurance structure.
Args:
claim_amount: Total claim amount.
timing_factor: Pro-rata factor for reinstatement premiums.
Returns:
Typed :class:`ClaimResult` with claim allocation details.
"""
if claim_amount <= 0:
return ClaimResult(
total_claim=0.0,
deductible_paid=0.0,
insurance_recovery=0.0,
uncovered_loss=0.0,
reinstatement_premiums=0.0,
layers_triggered=[],
)
self.total_claims += 1
deductible_paid = min(claim_amount, self.deductible)
insurance_recovery = 0.0
reinstatement_premiums = 0.0
layers_triggered: list[LayerPayment] = []
# Maximum recoverable is claim minus deductible
max_recoverable = claim_amount - deductible_paid
# Process through each layer
for i, state in enumerate(self.layer_states):
if not state.layer.can_respond(claim_amount):
continue
# Calculate loss to this layer
layer_loss = state.layer.calculate_layer_loss(claim_amount)
# Process the claim
payment, reinst_premium = state.process_claim(layer_loss, timing_factor)
if payment > 0:
insurance_recovery += payment
reinstatement_premiums += reinst_premium
layers_triggered.append(
LayerPayment(
layer_index=i,
attachment=state.layer.attachment_point,
payment=payment,
reinstatement_premium=reinst_premium,
exhausted=state.is_exhausted,
)
)
# Guard: total insurance recovery cannot exceed (claim - deductible)
if insurance_recovery > max_recoverable:
insurance_recovery = max_recoverable
# Calculate uncovered loss
uncovered_loss = 0.0
total_covered = deductible_paid + insurance_recovery
if total_covered < claim_amount:
uncovered_loss = claim_amount - total_covered
# Company pays uncovered portion
deductible_paid += uncovered_loss
self.total_premiums_paid += reinstatement_premiums
return ClaimResult(
total_claim=claim_amount,
deductible_paid=deductible_paid,
insurance_recovery=insurance_recovery,
uncovered_loss=uncovered_loss,
reinstatement_premiums=reinstatement_premiums,
layers_triggered=layers_triggered,
)
[docs]
def process_annual_claims(
self, claims: List[float], claim_times: Optional[List[float]] = None
) -> Dict[str, Any]:
"""Process all claims for a policy year.
Args:
claims: List of claim amounts.
claim_times: Optional list of claim times (0-1 for year fraction).
Returns:
Dictionary with annual summary statistics.
"""
if claim_times is None:
# Assume uniform distribution through year
claim_times = list(np.linspace(0, 1, len(claims))) if claims else []
results: Dict[str, Any] = {
"total_claims": len(claims),
"total_losses": sum(claims),
"total_deductible": 0.0,
"total_recovery": 0.0,
"total_uncovered": 0.0,
"total_reinstatement_premiums": 0.0,
"base_premium": self.calculate_premium(0.0),
"claim_details": [],
"layer_summaries": [],
}
# Process each claim
for claim, time in zip(claims, claim_times):
timing_factor = 1.0 - time # Remaining portion of year
claim_result = self.process_claim(claim, timing_factor)
results["total_deductible"] += claim_result.deductible_paid
results["total_recovery"] += claim_result.insurance_recovery
results["total_uncovered"] += claim_result.uncovered_loss
results["total_reinstatement_premiums"] += claim_result.reinstatement_premiums
results["claim_details"].append(claim_result)
# Compile layer summaries
for i, state in enumerate(self.layer_states):
results["layer_summaries"].append(
{
"layer_index": i,
"attachment_point": state.layer.attachment_point,
"limit": state.layer.limit,
"claims_paid": state.total_claims_paid,
"reinstatements_used": state.reinstatements_used,
"reinstatement_premiums": state.reinstatement_premiums_paid,
"utilization_rate": state.get_utilization_rate(),
"is_exhausted": state.is_exhausted,
}
)
results["total_premium_paid"] = (
results["base_premium"] + results["total_reinstatement_premiums"]
)
results["net_benefit"] = results["total_recovery"] - results["total_premium_paid"]
return results
[docs]
def reset_annual(self):
"""Reset program state for new policy year."""
for state in self.layer_states:
state.reset()
self.total_claims = 0
[docs]
def get_program_summary(self) -> Dict[str, Any]:
"""Get current program state summary.
Returns:
Dictionary with program statistics.
"""
return {
"program_name": self.name,
"deductible": self.deductible,
"num_layers": len(self.layers),
"total_coverage": self.get_total_coverage(),
"annual_base_premium": self.calculate_premium(0.0),
"total_claims_processed": self.total_claims,
"total_premiums_paid": self.total_premiums_paid,
"layers": [
{
"attachment": layer.attachment_point,
"limit": layer.limit,
"exhaustion_point": layer.attachment_point + layer.limit,
"reinstatements": layer.reinstatements,
"base_premium": layer.calculate_base_premium(0.0),
}
for layer in self.layers
],
}
[docs]
def get_total_coverage(self) -> float:
"""Calculate maximum possible coverage.
Returns:
Maximum claim amount that can be covered.
"""
if not self.layers:
return 0.0
# Find highest exhaustion point
max_coverage = max(layer.attachment_point + layer.limit for layer in self.layers)
return max(0.0, max_coverage - self.deductible)
def _get_default_manufacturer_profile(self) -> Dict[str, Any]:
"""Get default manufacturer profile."""
return {
"initial_assets": 10_000_000,
"annual_revenue": 15_000_000,
"base_operating_margin": 0.08,
"growth_rate": 0.05,
}
def _calculate_insurance_metrics(self, loss_history: List[List[float]]) -> tuple:
"""Calculate metrics with and without insurance.
Returns:
Tuple of (metrics_with, metrics_without) as numpy arrays.
"""
metrics_with_insurance = []
metrics_without_insurance = []
for annual_losses in loss_history:
# Without insurance: company bears all losses
total_loss_without = sum(annual_losses)
net_impact_without = -total_loss_without
# With insurance: apply structure
result = self.process_annual_claims(annual_losses)
total_loss_with = result["total_deductible"]
total_premium_paid = result["total_premium_paid"]
net_impact_with = -total_loss_with - total_premium_paid
metrics_without_insurance.append(net_impact_without)
metrics_with_insurance.append(net_impact_with)
# Reset for next year
self.reset_annual()
return np.array(metrics_with_insurance), np.array(metrics_without_insurance)
def _calculate_time_average_growth(self, metrics: np.ndarray, initial_assets: float) -> tuple:
"""Calculate time-average growth rate.
Returns:
Tuple of (time_avg_growth, final_assets).
"""
assets = initial_assets + np.cumsum(metrics)
assets = np.maximum(assets, 1.0) # Ensure positive for log
if len(assets) > 1:
time_avg = np.log(assets[-1] / initial_assets) / len(assets)
else:
time_avg = 0.0
return time_avg, assets[-1]
[docs]
def calculate_ergodic_benefit(
self,
loss_history: List[List[float]],
manufacturer_profile: Optional[Dict[str, Any]] = None,
time_horizon: int = 50,
) -> Dict[str, float]:
"""Calculate ergodic benefit of insurance structure.
Quantifies time-average growth improvement from insurance coverage
versus ensemble-average cost.
Args:
loss_history: Historical loss data (list of annual loss lists).
manufacturer_profile: Company profile with assets, revenue, etc.
time_horizon: Time horizon for ergodic calculation (default 50,
matching ``SimulationConfig.time_horizon_years``).
Returns:
Dictionary with ergodic metrics.
"""
if not loss_history:
return {
"time_average_benefit": 0.0,
"ensemble_average_cost": 0.0,
"ergodic_ratio": 0.0,
"volatility_reduction": 0.0,
}
# Default manufacturer profile
if manufacturer_profile is None:
manufacturer_profile = self._get_default_manufacturer_profile()
# Calculate metrics with and without insurance
metrics_with, metrics_without = self._calculate_insurance_metrics(loss_history)
# Time-average growth rates
initial_assets = manufacturer_profile["initial_assets"]
time_avg_with, _ = self._calculate_time_average_growth(metrics_with, initial_assets)
time_avg_without, _ = self._calculate_time_average_growth(metrics_without, initial_assets)
# Ensemble averages
ensemble_avg_without = np.mean(metrics_without)
ensemble_avg_with = np.mean(metrics_with)
# Volatility metrics
volatility_without = np.std(metrics_without)
volatility_with = np.std(metrics_with)
volatility_reduction = (
(volatility_without - volatility_with) / volatility_without
if volatility_without > 0
else 0.0
)
return {
"time_average_benefit": time_avg_with - time_avg_without,
"ensemble_average_cost": ensemble_avg_with - ensemble_avg_without,
"ergodic_ratio": (
time_avg_with / time_avg_without if time_avg_without != 0 else float("inf")
),
"volatility_reduction": volatility_reduction,
"time_avg_growth_with": time_avg_with,
"time_avg_growth_without": time_avg_without,
"ensemble_avg_with": ensemble_avg_with,
"ensemble_avg_without": ensemble_avg_without,
}
[docs]
def find_optimal_attachment_points(
self, loss_data: List[float], num_layers: int = 4, percentiles: Optional[List[float]] = None
) -> List[float]:
"""Find optimal attachment points based on loss frequency/severity.
Uses data-driven approach to minimize gaps while optimizing cost.
Args:
loss_data: Historical loss amounts.
num_layers: Number of layers to optimize.
percentiles: Optional percentiles for attachment points.
Returns:
List of optimal attachment points.
"""
if not loss_data or num_layers <= 0:
return []
loss_array = np.array(loss_data)
loss_array = loss_array[loss_array > 0] # Filter positive losses
if len(loss_array) == 0:
return []
# Default percentiles based on typical layer structure
if percentiles is None:
if num_layers == 3:
percentiles = [50, 90, 99] # Working, excess, cat
elif num_layers == 4:
percentiles = [40, 80, 95, 99.5]
elif num_layers == 5:
percentiles = [30, 60, 85, 95, 99.5]
else:
# Even distribution
percentiles = np.linspace(100 / (num_layers + 1), 99, num_layers).tolist()
# Calculate attachment points from percentiles
attachment_points = []
for p in percentiles:
attachment = float(np.percentile(loss_array, p))
attachment_points.append(attachment)
# Round to reasonable values
attachment_points = [self._round_attachment_point(ap) for ap in attachment_points]
# Ensure strictly increasing
for i in range(1, len(attachment_points)):
if attachment_points[i] <= attachment_points[i - 1]:
attachment_points[i] = attachment_points[i - 1] * 1.5
return attachment_points
def _round_attachment_point(self, value: float) -> float:
"""Round attachment point to reasonable market value."""
if value < 100_000:
return float(round(value / 10_000) * 10_000)
if value < 1_000_000:
return float(round(value / 50_000) * 50_000)
if value < 10_000_000:
return float(round(value / 250_000) * 250_000)
return float(round(value / 1_000_000) * 1_000_000)
def _get_layer_capacity(self, attachment_point: float) -> float:
"""Get default capacity for a layer based on attachment point."""
capacity_thresholds = [
(1_000_000, 5_000_000),
(10_000_000, 25_000_000),
(50_000_000, 50_000_000),
(float("inf"), 100_000_000),
]
for threshold, capacity in capacity_thresholds:
if attachment_point < threshold:
return capacity
return 100_000_000 # Default fallback
[docs]
def optimize_layer_widths(
self,
attachment_points: List[float],
total_budget: float,
capacity_constraints: Optional[Dict[str, float]] = None,
loss_data: Optional[List[float]] = None,
) -> List[float]:
"""Optimize layer widths given attachment points and constraints.
Args:
attachment_points: Fixed attachment points for layers.
total_budget: Total premium budget.
capacity_constraints: Optional max capacity per layer.
loss_data: Optional loss data for severity analysis.
Returns:
List of optimal layer widths.
"""
if not attachment_points:
return []
num_layers = len(attachment_points)
# Default capacity constraints
if capacity_constraints is None:
capacity_constraints = {
f"layer_{i}": self._get_layer_capacity(ap) for i, ap in enumerate(attachment_points)
}
# Analyze loss severity at each attachment point
severity_weights: List[float] = []
if loss_data:
for ap in attachment_points:
excess_losses = [max(0, loss - ap) for loss in loss_data if loss > ap]
avg_excess = float(np.mean(excess_losses)) if excess_losses else float(ap)
severity_weights.append(avg_excess)
else:
# Default weights based on attachment points
severity_weights = [1.0 / (i + 1) for i in range(num_layers)]
# Normalize weights
total_weight = sum(severity_weights)
if total_weight > 0:
severity_weights = [w / total_weight for w in severity_weights]
else:
severity_weights = [1.0 / num_layers] * num_layers
# Calculate layer widths based on budget and weights
layer_widths = []
for i, (ap, weight) in enumerate(zip(attachment_points, severity_weights)):
# Estimate premium rate (decreasing with attachment)
if ap < 1_000_000:
rate = 0.015
elif ap < 5_000_000:
rate = 0.010
elif ap < 25_000_000:
rate = 0.006
else:
rate = 0.003
# Calculate width from budget allocation
allocated_budget = total_budget * weight
width = allocated_budget / rate
# Apply capacity constraint
max_capacity = capacity_constraints.get(f"layer_{i}", float("inf"))
width = min(float(width), float(max_capacity))
# Ensure minimum width
min_width = ap * 0.5 if i == 0 else attachment_points[i - 1] * 0.3
width = max(float(width), float(min_width))
layer_widths.append(self._round_attachment_point(float(width)))
return layer_widths
def _get_base_premium_rate(self, attachment_point: float) -> float:
"""Get base premium rate based on attachment point."""
rate_thresholds = [
(1_000_000, 0.015),
(5_000_000, 0.010),
(25_000_000, 0.006),
(float("inf"), 0.003),
]
for threshold, rate in rate_thresholds:
if attachment_point < threshold:
return rate
return 0.003 # Default fallback
def _calculate_reinstatements(self, layer_index: int, num_layers: int) -> int:
"""Calculate reinstatements for a layer."""
if layer_index == 0:
return 0 # Primary layer
if layer_index == num_layers - 1:
return 999 # Top layer - unlimited
return max(0, 2 - layer_index // 2) # Decreasing with height
def _create_layer_structure(
self, attachment_points: List[float], layer_widths: List[float]
) -> List[EnhancedInsuranceLayer]:
"""Create insurance layers from attachment points and widths."""
layers = []
num_layers = len(attachment_points)
for i, (ap, width) in enumerate(zip(attachment_points, layer_widths)):
layer = EnhancedInsuranceLayer(
attachment_point=ap,
limit=width,
base_premium_rate=self._get_base_premium_rate(ap),
reinstatements=self._calculate_reinstatements(i, num_layers),
reinstatement_premium=1.0,
reinstatement_type=ReinstatementType.PRO_RATA,
)
layers.append(layer)
return layers
def _calculate_roe_improvement(
self, ergodic_metrics: Dict[str, float], company_profile: Optional[Dict[str, Any]]
) -> float:
"""Calculate ROE improvement from ergodic metrics."""
if company_profile and "initial_assets" in company_profile:
initial_roe = 0.08 # Baseline assumption
improved_roe = initial_roe + ergodic_metrics["time_average_benefit"]
return improved_roe / initial_roe - 1.0
return ergodic_metrics["time_average_benefit"] / 0.08
[docs]
def optimize_layer_structure(
self,
loss_data: List[List[float]],
company_profile: Optional[Dict[str, Any]] = None,
constraints: Optional[ProgramOptimizationConstraints] = None,
) -> OptimalStructure:
"""Optimize complete insurance layer structure.
Main optimization method that orchestrates layer count, attachment points,
and widths to maximize ergodic benefit.
Args:
loss_data: Historical loss data (list of annual loss lists).
company_profile: Company financial profile.
constraints: Optimization constraints.
Returns:
Optimal insurance structure.
"""
if constraints is None:
constraints = ProgramOptimizationConstraints()
# Flatten loss data for analysis
all_losses = [loss for annual_losses in loss_data for loss in annual_losses]
# Determine optimal number of layers
best_structure = None
best_ergodic_benefit = -float("inf")
for num_layers in range(constraints.min_layers, constraints.max_layers + 1):
# Find attachment points
attachment_points = self.find_optimal_attachment_points(all_losses, num_layers)
if not attachment_points:
continue
# Set deductible and budget
deductible = attachment_points[0] * 0.5
budget = constraints.max_total_premium or (
float(np.mean([sum(annual) for annual in loss_data])) * 0.15
)
# Optimize layer widths
layer_widths = self.optimize_layer_widths(
attachment_points, budget, loss_data=all_losses
)
# Create and test layer structure
layers = self._create_layer_structure(attachment_points, layer_widths)
test_program = InsuranceProgram(layers=layers, deductible=deductible)
ergodic_metrics = test_program.calculate_ergodic_benefit(loss_data, company_profile)
# Check if this is better
if ergodic_metrics["time_average_benefit"] > best_ergodic_benefit:
best_ergodic_benefit = ergodic_metrics["time_average_benefit"]
roe_improvement = self._calculate_roe_improvement(ergodic_metrics, company_profile)
best_structure = OptimalStructure(
layers=layers,
deductible=deductible,
total_premium=test_program.calculate_premium(0.0),
total_coverage=test_program.get_total_coverage(),
ergodic_benefit=best_ergodic_benefit,
roe_improvement=roe_improvement,
optimization_metrics=ergodic_metrics,
convergence_achieved=True,
iterations_used=num_layers - constraints.min_layers + 1,
)
# If no structure found, return a basic one
if best_structure is None:
basic_layers = [
EnhancedInsuranceLayer(
attachment_point=250_000,
limit=4_750_000,
base_premium_rate=0.015,
reinstatements=0,
)
]
basic_program = InsuranceProgram(layers=basic_layers, deductible=250_000)
best_structure = OptimalStructure(
layers=basic_layers,
deductible=250_000,
total_premium=basic_program.calculate_premium(0.0),
total_coverage=5_000_000,
ergodic_benefit=0.0,
roe_improvement=0.0,
optimization_metrics={},
convergence_achieved=False,
iterations_used=0,
)
return best_structure
[docs]
@classmethod
def from_yaml(cls, config_path: str) -> "InsuranceProgram":
"""Load insurance program from YAML configuration.
Args:
config_path: Path to YAML configuration file.
Returns:
Configured InsuranceProgram instance.
"""
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
# Extract program parameters
deductible = config.get("deductible", 0.0)
name = config.get("program_name", "Insurance Program")
# Create layers
layers = []
for layer_config in config.get("layers", []):
# Parse reinstatement type
reinstatement_type_str = layer_config.get("reinstatement_type", "pro_rata")
reinstatement_type = ReinstatementType(reinstatement_type_str)
layer = EnhancedInsuranceLayer(
attachment_point=layer_config["attachment_point"],
limit=layer_config["limit"],
base_premium_rate=layer_config.get(
"base_premium_rate",
layer_config.get("premium_rate", layer_config.get("rate", 0.01)),
),
reinstatements=layer_config.get("reinstatements", 0),
reinstatement_premium=layer_config.get("reinstatement_premium", 1.0),
reinstatement_type=reinstatement_type,
aggregate_limit=layer_config.get("aggregate_limit"),
)
layers.append(layer)
return cls(layers=layers, deductible=deductible, name=name)
[docs]
@classmethod
def create_standard_manufacturing_program(
cls, deductible: float = 250_000
) -> "InsuranceProgram":
"""Create standard manufacturing insurance program.
Builds a tower with up to 4 layers above the deductible:
Primary ($deductible-$5M), 1st Excess ($5M-$25M),
2nd Excess ($25M-$50M), CAT ($50M-$100M).
Layers whose limit would be zero or negative (because the
deductible exceeds their attachment boundary) are skipped.
Args:
deductible: Self-insured retention amount.
Returns:
Standard manufacturing insurance program.
"""
layer_defs = [
# (attachment, ceiling, rate, reinstatements, reinst_prem, reinst_type)
(deductible, 5_000_000, 0.015, 0, 1.0, ReinstatementType.PRO_RATA),
(5_000_000, 25_000_000, 0.008, 1, 1.0, ReinstatementType.FULL),
(25_000_000, 50_000_000, 0.004, 2, 1.0, ReinstatementType.PRO_RATA),
(50_000_000, 100_000_000, 0.002, 999, 1.0, ReinstatementType.PRO_RATA),
]
layers = []
for attach, ceiling, rate, reinst, reinst_prem, reinst_type in layer_defs:
effective_attach = max(attach, deductible)
limit = ceiling - effective_attach
if limit <= 0:
continue
layers.append(
EnhancedInsuranceLayer(
attachment_point=effective_attach,
limit=limit,
base_premium_rate=rate,
reinstatements=reinst,
reinstatement_premium=reinst_prem,
reinstatement_type=reinst_type,
),
)
if not layers:
raise ValueError(
f"Deductible ${deductible:,.0f} exceeds all layer ceilings; "
f"no insurance coverage possible"
)
return cls(layers=layers, deductible=deductible, name="Standard Manufacturing Program")
[docs]
def apply_pricing(
self,
expected_revenue: float,
market_cycle: Optional["MarketCycle"] = None,
loss_generator: Optional["ManufacturingLossGenerator"] = None,
) -> None:
"""Apply dynamic pricing to all layers in the program.
Updates layer premium rates based on frequency/severity calculations.
Args:
expected_revenue: Expected annual revenue for scaling
market_cycle: Optional market cycle state
loss_generator: Optional loss generator (uses pricer's if not provided)
Raises:
ValueError: If pricing not enabled or pricer not configured
"""
if not self.pricing_enabled:
raise ValueError("Pricing not enabled for this program")
if self.pricer is None:
if loss_generator is None:
raise ValueError("Either pricer or loss_generator must be provided")
# Create a default pricer
from .insurance_pricing import InsurancePricer, MarketCycle
self.pricer = InsurancePricer(
loss_generator=loss_generator,
market_cycle=market_cycle or MarketCycle.NORMAL,
)
# Apply pricing to the program
self.pricer.price_insurance_program(
program=self,
expected_revenue=expected_revenue,
market_cycle=market_cycle,
update_program=True,
)
[docs]
def get_pricing_summary(self) -> Dict[str, Any]:
"""Get summary of current pricing.
Returns:
Dictionary with pricing details for each layer
"""
summary: Dict[str, Any] = {
"program_name": self.name,
"pricing_enabled": self.pricing_enabled,
"total_premium": self.calculate_premium(0.0),
"layers": [],
}
if self.pricing_results:
for i, (layer, pricing) in enumerate(zip(self.layers, self.pricing_results)):
summary["layers"].append(
{
"index": i,
"attachment_point": layer.attachment_point,
"limit": layer.limit,
"base_premium_rate": layer.base_premium_rate,
"market_premium": (
pricing.market_premium
if pricing
else layer.limit * layer.base_premium_rate
),
"pure_premium": pricing.pure_premium if pricing else None,
"expected_frequency": pricing.expected_frequency if pricing else None,
"expected_severity": pricing.expected_severity if pricing else None,
}
)
else:
for i, layer in enumerate(self.layers):
summary["layers"].append(
{
"index": i,
"attachment_point": layer.attachment_point,
"limit": layer.limit,
"base_premium_rate": layer.base_premium_rate,
"premium": layer.calculate_base_premium(0.0),
}
)
return summary
[docs]
@classmethod
def create_with_pricing(
cls,
layers: List[EnhancedInsuranceLayer],
loss_generator: "ManufacturingLossGenerator",
expected_revenue: float,
market_cycle: Optional["MarketCycle"] = None,
deductible: float = 0.0,
name: str = "Priced Insurance Program",
) -> "InsuranceProgram":
"""Create insurance program with dynamic pricing.
Factory method that creates a program with pricing already applied.
Args:
layers: Initial layer structure
loss_generator: Loss generator for pricing
expected_revenue: Expected annual revenue
market_cycle: Market cycle state
deductible: Self-insured retention
name: Program name
Returns:
InsuranceProgram with pricing applied
"""
from .insurance_pricing import InsurancePricer, MarketCycle
# Create pricer
pricer = InsurancePricer(
loss_generator=loss_generator,
market_cycle=market_cycle or MarketCycle.NORMAL,
)
# Create program with pricing enabled
program = cls(
layers=layers,
deductible=deductible,
name=name,
pricing_enabled=True,
pricer=pricer,
)
# Apply pricing
program.apply_pricing(expected_revenue, market_cycle)
return program
[docs]
@dataclass
class ProgramState:
"""Tracks multi-year insurance program state for simulations.
Maintains historical data and statistics across multiple
policy periods for long-term analysis.
History lists use bounded :class:`collections.deque` instances to
prevent unbounded memory growth during long simulations. Running
totals maintain accurate lifetime statistics regardless of the
history window size.
"""
program: InsuranceProgram
max_history_years: Optional[int] = None
years_simulated: int = 0
total_claims: deque = field(default_factory=deque)
total_recoveries: deque = field(default_factory=deque)
total_premiums: deque = field(default_factory=deque)
annual_results: deque = field(default_factory=deque)
_cumulative_claims: float = field(default=0.0, init=False, repr=False)
_cumulative_recoveries: float = field(default=0.0, init=False, repr=False)
_cumulative_premiums: float = field(default=0.0, init=False, repr=False)
[docs]
def __post_init__(self):
"""Re-initialise history deques with the configured *maxlen*."""
maxlen = self.max_history_years
if maxlen is None:
maxlen = getattr(self.program, "max_history_years", 50)
self.total_claims = deque(self.total_claims, maxlen=maxlen)
self.total_recoveries = deque(self.total_recoveries, maxlen=maxlen)
self.total_premiums = deque(self.total_premiums, maxlen=maxlen)
self.annual_results = deque(self.annual_results, maxlen=maxlen)
[docs]
def simulate_year(
self, annual_claims: List[float], claim_times: Optional[List[float]] = None
) -> Dict[str, Any]:
"""Simulate one year of the insurance program.
Args:
annual_claims: List of claims for the year.
claim_times: Optional timing of claims.
Returns:
Annual results dictionary.
"""
# Reset for new year
self.program.reset_annual()
# Process claims
results = self.program.process_annual_claims(annual_claims, claim_times)
# Track statistics
self.years_simulated += 1
annual_claim_total = sum(annual_claims)
annual_recovery = results["total_recovery"]
annual_premium = results["total_premium_paid"]
self.total_claims.append(annual_claim_total)
self.total_recoveries.append(annual_recovery)
self.total_premiums.append(annual_premium)
self.annual_results.append(results)
# Maintain running totals for accurate lifetime statistics
self._cumulative_claims += annual_claim_total
self._cumulative_recoveries += annual_recovery
self._cumulative_premiums += annual_premium
return results
[docs]
def get_summary_statistics(self) -> Dict[str, Any]:
"""Calculate summary statistics across all simulated years.
Uses running totals so that results remain accurate even after
older entries have been evicted from the bounded history deques.
Returns:
Dictionary with multi-year statistics.
"""
if self.years_simulated == 0:
return {}
return {
"years_simulated": self.years_simulated,
"average_annual_claims": self._cumulative_claims / self.years_simulated,
"average_annual_recovery": self._cumulative_recoveries / self.years_simulated,
"average_annual_premium": self._cumulative_premiums / self.years_simulated,
"total_claims": self._cumulative_claims,
"total_recoveries": self._cumulative_recoveries,
"total_premiums": self._cumulative_premiums,
"net_benefit": self._cumulative_recoveries - self._cumulative_premiums,
"recovery_ratio": (
self._cumulative_recoveries / self._cumulative_claims
if self._cumulative_claims > 0
else 0
),
"loss_ratio": (
self._cumulative_recoveries / self._cumulative_premiums
if self._cumulative_premiums > 0
else 0
),
}