"""Enhanced loss distributions for manufacturing risk modeling.
This module provides parametric loss distributions for realistic insurance claim
modeling, including attritional losses, large losses, and catastrophic events
with revenue-dependent frequency scaling.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
import numpy as np
from scipy import stats
if TYPE_CHECKING:
from .ergodic_analyzer import ErgodicData
from .exposure_base import ExposureBase
from .insurance_program import InsuranceProgram
[docs]
class LossDistribution(ABC):
"""Abstract base class for loss severity distributions.
Provides a common interface for generating loss amounts and calculating
statistical properties of the distribution.
"""
def __init__(self, seed: Optional[Union[int, np.random.SeedSequence]] = None):
"""Initialize the loss distribution.
Args:
seed: Random seed for reproducibility.
"""
self.rng = np.random.default_rng(seed)
[docs]
@abstractmethod
def generate_severity(self, n_samples: int) -> np.ndarray:
"""Generate loss severity samples.
Args:
n_samples: Number of samples to generate.
Returns:
Array of loss amounts.
"""
[docs]
@abstractmethod
def expected_value(self) -> float:
"""Calculate the analytical expected value of the distribution.
Returns:
Expected value when analytically available, otherwise estimated.
"""
[docs]
def reset_seed(self, seed) -> None:
"""Reset the random seed for reproducibility.
Args:
seed: New random seed to use (int or SeedSequence).
"""
self.rng = np.random.default_rng(seed)
[docs]
class LognormalLoss(LossDistribution):
"""Lognormal loss severity distribution.
Common for attritional and large losses in manufacturing.
Parameters can be specified as either (mean, cv) or (mu, sigma).
"""
def __init__(
self,
mean: Optional[float] = None,
cv: Optional[float] = None,
mu: Optional[float] = None,
sigma: Optional[float] = None,
seed: Optional[int] = None,
):
"""Initialize lognormal distribution.
Args:
mean: Mean of the lognormal distribution.
cv: Coefficient of variation (std/mean).
mu: Log-space mean parameter (alternative to mean/cv).
sigma: Log-space standard deviation (alternative to mean/cv).
seed: Random seed for reproducibility.
Raises:
ValueError: If invalid parameter combinations are provided.
"""
super().__init__(seed)
if mean is not None and cv is not None:
# Convert from mean/cv parameterization
if mean <= 0:
raise ValueError(f"Mean must be positive, got {mean}")
if cv < 0:
raise ValueError(f"CV must be non-negative, got {cv}")
self.mean = mean
self.cv = cv
variance = (mean * cv) ** 2
# Calculate lognormal parameters
self.sigma = np.sqrt(np.log(1 + variance / mean**2))
self.mu = np.log(mean) - self.sigma**2 / 2
elif mu is not None and sigma is not None:
# Direct lognormal parameters
if sigma < 0:
raise ValueError(f"Sigma must be non-negative, got {sigma}")
self.mu = mu
self.sigma = sigma
self.mean = np.exp(mu + sigma**2 / 2)
self.cv = np.sqrt(np.exp(sigma**2) - 1)
else:
raise ValueError("Must provide either (mean, cv) or (mu, sigma) parameters")
[docs]
def generate_severity(self, n_samples: int) -> np.ndarray:
"""Generate lognormal loss samples.
Args:
n_samples: Number of samples to generate.
Returns:
Array of loss amounts.
"""
if n_samples <= 0:
return np.array([])
return self.rng.lognormal(self.mu, self.sigma, size=n_samples)
[docs]
def expected_value(self) -> float:
"""Calculate expected value of lognormal distribution.
Returns:
Analytical expected value.
"""
return self.mean
[docs]
class ParetoLoss(LossDistribution):
"""Pareto loss severity distribution for catastrophic events.
Heavy-tailed distribution suitable for modeling extreme losses
with potentially unbounded severity.
"""
def __init__(self, alpha: float, xm: float, seed: Optional[int] = None):
"""Initialize Pareto distribution.
Args:
alpha: Shape parameter (tail index). Lower = heavier tail.
xm: Scale parameter (minimum value).
seed: Random seed for reproducibility.
Raises:
ValueError: If parameters are invalid.
"""
super().__init__(seed)
if alpha <= 0:
raise ValueError(f"Alpha must be positive, got {alpha}")
if xm <= 0:
raise ValueError(f"Minimum value xm must be positive, got {xm}")
self.alpha = alpha
self.xm = xm
[docs]
def generate_severity(self, n_samples: int) -> np.ndarray:
"""Generate Pareto loss samples.
Args:
n_samples: Number of samples to generate.
Returns:
Array of loss amounts.
"""
if n_samples <= 0:
return np.array([])
# Generate using inverse transform method
u = self.rng.uniform(0, 1, size=n_samples)
return self.xm / (u ** (1 / self.alpha))
[docs]
def expected_value(self) -> float:
"""Calculate expected value of Pareto distribution.
Returns:
Analytical expected value if it exists (alpha > 1), else inf.
"""
if self.alpha <= 1:
return np.inf
return self.alpha * self.xm / (self.alpha - 1)
[docs]
class GeneralizedParetoLoss(LossDistribution):
"""Generalized Pareto distribution for modeling excesses over threshold.
Implements the GPD using scipy.stats.genpareto for Peaks Over Threshold (POT)
extreme value modeling. According to the Pickands-Balkema-de Haan theorem,
excesses over a sufficiently high threshold asymptotically follow a GPD.
The distribution models: P(X - u | X > u) ~ GPD(ξ, β)
Shape parameter interpretation:
- ξ < 0: Bounded distribution (Type III - short-tailed)
- ξ = 0: Exponential distribution (Type I - medium-tailed)
- ξ > 0: Pareto-type distribution (Type II - heavy-tailed)
"""
def __init__(
self,
severity_shape: float,
severity_scale: float,
seed: Optional[Union[int, np.random.SeedSequence]] = None,
):
"""Initialize Generalized Pareto distribution.
Args:
severity_shape: Shape parameter ξ (any real value).
- Negative: bounded tail
- Zero: exponential tail
- Positive: Pareto-type heavy tail
severity_scale: Scale parameter β (must be positive).
seed: Random seed for reproducibility.
Raises:
ValueError: If severity_scale <= 0.
"""
super().__init__(seed)
if severity_scale <= 0:
raise ValueError(f"Scale parameter must be positive, got {severity_scale}")
self.severity_shape = severity_shape
self.severity_scale = severity_scale
[docs]
def generate_severity(self, n_samples: int) -> np.ndarray:
"""Generate GPD samples (excesses above threshold).
Args:
n_samples: Number of samples to generate.
Returns:
Array of excess amounts above threshold.
"""
if n_samples <= 0:
return np.array([])
# Use scipy.stats.genpareto with c=shape, scale=scale, loc=0
result = stats.genpareto.rvs(
c=self.severity_shape,
scale=self.severity_scale,
loc=0,
size=n_samples,
random_state=self.rng,
)
return np.asarray(result)
[docs]
def expected_value(self) -> float:
"""Calculate expected excess above threshold.
Returns:
Analytical expected value if it exists (ξ < 1), else inf.
E[X - u | X > u] = β / (1 - ξ) for ξ < 1
"""
if self.severity_shape >= 1:
return np.inf
return self.severity_scale / (1 - self.severity_shape)
[docs]
@dataclass
class LossEvent:
"""Represents a single loss event with timing and amount."""
amount: float # Loss amount
time: float = 0.0 # Time of occurrence (in years)
loss_type: str = "operational" # Type of loss (attritional, large, catastrophic)
timestamp: Optional[float] = None # Alternative name for time
event_type: Optional[str] = None # Alternative name for loss_type
description: Optional[str] = None # Optional description
[docs]
def __post_init__(self):
"""Handle alternative parameter names."""
if self.timestamp is not None and self.time == 0.0:
self.time = self.timestamp
if self.event_type is not None and self.loss_type == "operational":
self.loss_type = self.event_type
[docs]
def __le__(self, other):
"""Support ordering by amount."""
if isinstance(other, (int, float)):
return self.amount <= other
if isinstance(other, LossEvent):
return self.amount <= other.amount
return NotImplemented
[docs]
def __lt__(self, other):
"""Support ordering by amount."""
if isinstance(other, (int, float)):
return self.amount < other
if isinstance(other, LossEvent):
return self.amount < other.amount
return NotImplemented
[docs]
@dataclass
class LossData:
"""Unified loss data structure for cross-module compatibility.
This dataclass provides a standardized interface for loss data
that can be used consistently across all modules in the framework.
"""
timestamps: np.ndarray = field(default_factory=lambda: np.array([]))
loss_amounts: np.ndarray = field(default_factory=lambda: np.array([]))
loss_types: List[str] = field(default_factory=list)
claim_ids: List[str] = field(default_factory=list)
development_factors: Optional[np.ndarray] = None
metadata: Dict[str, Any] = field(default_factory=dict)
[docs]
def validate(self) -> bool:
"""Validate data consistency.
Returns:
True if data is valid and consistent, False otherwise.
"""
# Check array lengths match
if len(self.timestamps) != len(self.loss_amounts):
return False
# Check optional fields match array length if provided
n_losses = len(self.timestamps)
# Check each optional field separately for better readability
if self.loss_types and len(self.loss_types) != n_losses:
return False
if self.claim_ids and len(self.claim_ids) != n_losses:
return False
if self.development_factors is not None and len(self.development_factors) != n_losses:
return False
# Check for valid amounts and timestamps (non-negative)
if np.any(self.loss_amounts < 0) or (
len(self.timestamps) > 0 and np.any(self.timestamps < 0)
):
return False
return True
[docs]
def apply_insurance(self, program: "InsuranceProgram") -> "LossData":
"""Apply insurance recoveries to losses.
Args:
program: Insurance program to apply.
Returns:
New LossData with insurance recoveries applied.
"""
# Import here to avoid circular dependency
from .insurance_program import InsuranceProgram
# Create copy of data
recovered_amounts = self.loss_amounts.copy()
recovery_metadata = self.metadata.copy()
# Apply insurance to each loss
total_recoveries = 0.0
total_premiums = 0.0
for i, amount in enumerate(self.loss_amounts):
# Process claim through insurance program
result = program.process_claim(amount)
recovery = result["insurance_recovery"]
recovered_amounts[i] = amount - recovery
total_recoveries += recovery
# Calculate total premiums
if hasattr(program, "calculate_annual_premium"):
total_premiums = program.calculate_annual_premium()
# Update metadata
recovery_metadata.update(
{
"insurance_applied": True,
"total_recoveries": total_recoveries,
"total_premiums": total_premiums,
"net_benefit": total_recoveries - total_premiums,
}
)
return LossData(
timestamps=self.timestamps.copy(),
loss_amounts=recovered_amounts,
loss_types=self.loss_types.copy() if self.loss_types else [],
claim_ids=self.claim_ids.copy() if self.claim_ids else [],
development_factors=(
self.development_factors.copy() if self.development_factors is not None else None
),
metadata=recovery_metadata,
)
[docs]
@classmethod
def from_loss_events(cls, events: List[LossEvent]) -> "LossData":
"""Create LossData from a list of LossEvent objects.
Args:
events: List of LossEvent objects.
Returns:
LossData instance with consolidated event data.
"""
if not events:
return cls()
timestamps = np.array([e.time for e in events])
amounts = np.array([e.amount for e in events])
types = [e.loss_type for e in events]
# Sort by time
sort_idx = np.argsort(timestamps)
return cls(
timestamps=timestamps[sort_idx],
loss_amounts=amounts[sort_idx],
loss_types=[types[i] for i in sort_idx],
claim_ids=[f"claim_{i}" for i in range(len(events))],
metadata={"source": "loss_events", "n_events": len(events)},
)
[docs]
def to_loss_events(self) -> List[LossEvent]:
"""Convert LossData back to LossEvent list.
Returns:
List of LossEvent objects.
"""
events = []
for i, timestamp in enumerate(self.timestamps):
loss_type = self.loss_types[i] if i < len(self.loss_types) else "unknown"
events.append(
LossEvent(time=timestamp, amount=self.loss_amounts[i], loss_type=loss_type)
)
return events
[docs]
def get_annual_aggregates(self, years: int) -> Dict[int, float]:
"""Aggregate losses by year.
Args:
years: Number of years to aggregate over.
Returns:
Dictionary mapping year to total loss amount.
"""
annual_losses = {year: 0.0 for year in range(years)}
for time, amount in zip(self.timestamps, self.loss_amounts):
year = int(time)
if 0 <= year < years:
annual_losses[year] += amount
return annual_losses
[docs]
def calculate_statistics(self) -> Dict[str, float]:
"""Calculate comprehensive statistics for the loss data.
Returns:
Dictionary of statistical metrics.
"""
if len(self.loss_amounts) == 0:
return {
"count": 0,
"total": 0.0,
"mean": 0.0,
"std": 0.0,
"min": 0.0,
"max": 0.0,
"p50": 0.0,
"p95": 0.0,
"p99": 0.0,
}
return {
"count": len(self.loss_amounts),
"total": float(np.sum(self.loss_amounts)), # Boundary: float for NumPy
"mean": float(np.mean(self.loss_amounts)), # Boundary: float for NumPy
"std": float(np.std(self.loss_amounts)), # Boundary: float for NumPy
"min": float(np.min(self.loss_amounts)), # Boundary: float for NumPy
"max": float(np.max(self.loss_amounts)), # Boundary: float for NumPy
"p50": float(np.percentile(self.loss_amounts, 50)), # Boundary: float for NumPy
"p95": float(np.percentile(self.loss_amounts, 95)), # Boundary: float for NumPy
"p99": float(np.percentile(self.loss_amounts, 99)), # Boundary: float for NumPy
}
[docs]
class FrequencyGenerator:
"""Base class for generating loss event frequencies.
Supports revenue-dependent scaling of claim frequencies.
"""
def __init__(
self,
base_frequency: float,
revenue_scaling_exponent: float = 0.0,
reference_revenue: float = 10_000_000,
seed: Optional[int] = None,
):
"""Initialize frequency generator.
Args:
base_frequency: Base expected events per year (at reference revenue).
revenue_scaling_exponent: Exponent for revenue scaling (0 = no scaling).
reference_revenue: Reference revenue level for base frequency.
seed: Random seed for reproducibility.
Raises:
ValueError: If parameters are invalid.
"""
if base_frequency < 0:
raise ValueError(f"Base frequency must be non-negative, got {base_frequency}")
if reference_revenue <= 0:
raise ValueError(f"Reference revenue must be positive, got {reference_revenue}")
self.base_frequency = base_frequency
self.revenue_scaling_exponent = revenue_scaling_exponent
self.reference_revenue = reference_revenue
self.rng = np.random.default_rng(seed)
[docs]
def reseed(self, seed) -> None:
"""Re-seed the random state.
Args:
seed: New random seed (int or SeedSequence).
"""
self.rng = np.random.default_rng(seed)
[docs]
def get_scaled_frequency(self, revenue: float) -> float:
"""Calculate revenue-scaled frequency.
Args:
revenue: Current revenue level (can be float or Decimal).
Returns:
Scaled frequency parameter.
"""
if revenue <= 0:
return 0.0
# Boundary: float for NumPy - power operation requires float
revenue_float = float(revenue)
scaling_factor = (revenue_float / self.reference_revenue) ** self.revenue_scaling_exponent
return float(self.base_frequency * scaling_factor)
[docs]
def generate_event_times(self, duration: float, revenue: float) -> np.ndarray:
"""Generate event times using Poisson process.
Args:
duration: Time period in years.
revenue: Revenue level for frequency scaling.
Returns:
Array of event times.
"""
if duration <= 0 or revenue <= 0:
return np.array([])
frequency = self.get_scaled_frequency(revenue)
if frequency <= 0:
return np.array([])
# Generate number of events
n_events = self.rng.poisson(frequency * duration)
if n_events == 0:
return np.array([])
# Generate uniform event times
event_times = self.rng.uniform(0, duration, size=n_events)
return np.sort(event_times)
[docs]
class AttritionalLossGenerator:
"""Generator for high-frequency, low-severity attritional losses.
Typical for widget manufacturing: worker injuries, quality defects,
minor property damage.
"""
def __init__(
self,
base_frequency: float = 5.0,
severity_mean: float = 25_000,
severity_cv: float = 1.5,
revenue_scaling_exponent: float = 0.5,
reference_revenue: float = 10_000_000,
exposure: Optional["ExposureBase"] = None,
seed: Optional[int] = None,
):
"""Initialize attritional loss generator.
Args:
base_frequency: Base events per year (3-8 typical).
severity_mean: Mean loss amount ($3K-$100K typical).
severity_cv: Coefficient of variation (0.6-1.0 typical).
revenue_scaling_exponent: Revenue scaling power (0.5 = sqrt scaling).
reference_revenue: Reference revenue for base frequency.
exposure: Optional exposure object for dynamic frequency scaling.
seed: Random seed for reproducibility.
"""
self.exposure = exposure
self.frequency_generator = FrequencyGenerator(
base_frequency=base_frequency,
revenue_scaling_exponent=revenue_scaling_exponent,
reference_revenue=reference_revenue,
seed=seed,
)
self.severity_distribution = LognormalLoss(mean=severity_mean, cv=severity_cv, seed=seed)
self.loss_type = "attritional"
[docs]
def reseed(self, seed) -> None:
"""Re-seed all internal random states.
Args:
seed: New random seed (int or SeedSequence). A SeedSequence is
used internally to derive independent child seeds for
frequency and severity.
"""
ss = seed if isinstance(seed, np.random.SeedSequence) else np.random.SeedSequence(seed)
child_seeds = ss.spawn(2)
self.frequency_generator.reseed(child_seeds[0])
self.severity_distribution.reset_seed(child_seeds[1])
[docs]
def generate_losses(self, duration: float, revenue: float) -> List[LossEvent]:
"""Generate attritional loss events.
Args:
duration: Simulation period in years.
revenue: Current revenue level.
Returns:
List of loss events.
"""
event_times = self.frequency_generator.generate_event_times(duration, revenue)
if len(event_times) == 0:
return []
severities = self.severity_distribution.generate_severity(len(event_times))
return [
LossEvent(time=t, amount=s, loss_type=self.loss_type)
for t, s in zip(event_times, severities)
]
[docs]
class LargeLossGenerator:
"""Generator for medium-frequency, medium-severity large losses.
Typical for manufacturing: product recalls, major equipment failures,
litigation settlements.
"""
def __init__(
self,
base_frequency: float = 0.3,
severity_mean: float = 2_000_000,
severity_cv: float = 2.0,
revenue_scaling_exponent: float = 0.7,
reference_revenue: float = 10_000_000,
exposure: Optional["ExposureBase"] = None,
seed: Optional[int] = None,
):
"""Initialize large loss generator.
Args:
base_frequency: Base events per year (0.1-0.5 typical).
severity_mean: Mean loss amount ($500K-$50M typical).
severity_cv: Coefficient of variation (1.5-2.0 typical).
revenue_scaling_exponent: Revenue scaling power (0.7 typical).
reference_revenue: Reference revenue for base frequency.
exposure: Optional exposure object for dynamic frequency scaling.
seed: Random seed for reproducibility.
"""
self.exposure = exposure
self.frequency_generator = FrequencyGenerator(
base_frequency=base_frequency,
revenue_scaling_exponent=revenue_scaling_exponent,
reference_revenue=reference_revenue,
seed=seed,
)
self.severity_distribution = LognormalLoss(mean=severity_mean, cv=severity_cv, seed=seed)
self.loss_type = "large"
[docs]
def reseed(self, seed) -> None:
"""Re-seed all internal random states.
Args:
seed: New random seed (int or SeedSequence). A SeedSequence is
used internally to derive independent child seeds for
frequency and severity.
"""
ss = seed if isinstance(seed, np.random.SeedSequence) else np.random.SeedSequence(seed)
child_seeds = ss.spawn(2)
self.frequency_generator.reseed(child_seeds[0])
self.severity_distribution.reset_seed(child_seeds[1])
[docs]
def generate_losses(self, duration: float, revenue: float) -> List[LossEvent]:
"""Generate large loss events.
Args:
duration: Simulation period in years.
revenue: Current revenue level.
Returns:
List of loss events.
"""
event_times = self.frequency_generator.generate_event_times(duration, revenue)
if len(event_times) == 0:
return []
severities = self.severity_distribution.generate_severity(len(event_times))
return [
LossEvent(time=t, amount=s, loss_type=self.loss_type)
for t, s in zip(event_times, severities)
]
[docs]
class CatastrophicLossGenerator:
"""Generator for low-frequency, high-severity catastrophic losses.
Uses Pareto distribution for heavy-tailed severity modeling.
Examples: major equipment failure, facility damage, environmental disasters.
"""
def __init__(
self,
base_frequency: float = 0.03,
severity_alpha: float = 2.5,
severity_xm: float = 1_000_000,
revenue_scaling_exponent: float = 0.0,
reference_revenue: float = 10_000_000,
exposure: Optional["ExposureBase"] = None,
seed: Optional[int] = None,
):
"""Initialize catastrophic loss generator.
Args:
base_frequency: Base events per year (0.01-0.05 typical).
severity_alpha: Pareto shape parameter (2.5 typical).
severity_xm: Pareto minimum value ($1M+ typical).
exposure: Optional exposure object for dynamic frequency scaling.
seed: Random seed for reproducibility.
"""
# Store exposure for dynamic scaling
self.exposure = exposure
# Catastrophic events typically don't scale with revenue
self.frequency_generator = FrequencyGenerator(
base_frequency=base_frequency,
revenue_scaling_exponent=revenue_scaling_exponent,
reference_revenue=reference_revenue,
seed=seed,
)
self.severity_distribution = ParetoLoss(alpha=severity_alpha, xm=severity_xm, seed=seed)
self.loss_type = "catastrophic"
[docs]
def reseed(self, seed) -> None:
"""Re-seed all internal random states.
Args:
seed: New random seed (int or SeedSequence). A SeedSequence is
used internally to derive independent child seeds for
frequency and severity.
"""
ss = seed if isinstance(seed, np.random.SeedSequence) else np.random.SeedSequence(seed)
child_seeds = ss.spawn(2)
self.frequency_generator.reseed(child_seeds[0])
self.severity_distribution.reset_seed(child_seeds[1])
[docs]
def generate_losses(self, duration: float, revenue: float = 10_000_000) -> List[LossEvent]:
"""Generate catastrophic loss events.
Args:
duration: Simulation period in years.
revenue: Current revenue level (not used for scaling).
Returns:
List of loss events.
"""
event_times = self.frequency_generator.generate_event_times(duration, revenue)
if len(event_times) == 0:
return []
severities = self.severity_distribution.generate_severity(len(event_times))
return [
LossEvent(time=t, amount=s, loss_type=self.loss_type)
for t, s in zip(event_times, severities)
]
[docs]
class ManufacturingLossGenerator:
"""Composite loss generator for widget manufacturing risks.
Combines attritional, large, and catastrophic loss generators
to provide comprehensive risk modeling.
"""
def __init__(
self,
attritional_params: Optional[dict] = None,
large_params: Optional[dict] = None,
catastrophic_params: Optional[dict] = None,
extreme_params: Optional[dict] = None,
exposure: Optional["ExposureBase"] = None,
seed: Optional[int] = None,
):
"""Initialize manufacturing loss generator.
Args:
attritional_params: Parameters for attritional losses.
large_params: Parameters for large losses.
catastrophic_params: Parameters for catastrophic losses.
extreme_params: Optional parameters for extreme value modeling.
- threshold_value (float): Threshold for GPD application (required if extreme_params provided)
- severity_shape (float): GPD shape parameter ξ (required)
- severity_scale (float): GPD scale parameter β > 0 (required)
exposure: Optional exposure object for dynamic frequency scaling.
seed: Random seed for reproducibility.
"""
# Use provided parameters or defaults
attritional_params = attritional_params or {}
large_params = large_params or {}
catastrophic_params = catastrophic_params or {}
# Store exposure object
self.exposure = exposure
# Pass exposure to each generator if provided
if exposure is not None:
attritional_params["exposure"] = exposure
large_params["exposure"] = exposure
catastrophic_params["exposure"] = exposure
# Set seed for all generators using SeedSequence for statistical independence
if seed is not None:
# Use SeedSequence to spawn statistically independent child sequences
# This ensures proper independence between generator streams
ss = np.random.SeedSequence(seed)
child_seeds = ss.spawn(4) # 4 generators: attritional, large, catastrophic, gpd
# Pass child SeedSequences directly to default_rng via sub-generators
attritional_params["seed"] = child_seeds[0]
large_params["seed"] = child_seeds[1]
catastrophic_params["seed"] = child_seeds[2]
gpd_child_seed = child_seeds[3]
else:
gpd_child_seed = None
self.attritional = AttritionalLossGenerator(**attritional_params)
self.large = LargeLossGenerator(**large_params)
self.catastrophic = CatastrophicLossGenerator(**catastrophic_params)
# Initialize extreme value modeling if parameters provided
self.extreme_params = extreme_params
self.threshold_value = None
self.gpd_generator = None
if extreme_params is not None:
self.threshold_value = extreme_params.get("threshold_value")
if self.threshold_value is not None:
# Create GPD generator with independent seed from SeedSequence
gpd_seed = gpd_child_seed if seed is not None else None
self.gpd_generator = GeneralizedParetoLoss(
severity_shape=extreme_params["severity_shape"],
severity_scale=extreme_params["severity_scale"],
seed=gpd_seed,
)
[docs]
def reseed(self, seed: int) -> None:
"""Re-seed all internal random states using SeedSequence.
Derives independent child seeds for each sub-generator so that
parallel workers produce statistically distinct loss sequences.
Args:
seed: New random seed.
"""
ss = np.random.SeedSequence(seed)
n_children = 4 if self.gpd_generator is not None else 3
child_seeds = ss.spawn(n_children)
self.attritional.reseed(child_seeds[0])
self.large.reseed(child_seeds[1])
self.catastrophic.reseed(child_seeds[2])
if self.gpd_generator is not None:
self.gpd_generator.reset_seed(child_seeds[3])
[docs]
@classmethod
def create_simple(
cls,
frequency: float = 0.1,
severity_mean: float = 5_000_000,
severity_std: float = 2_000_000,
seed: Optional[int] = None,
) -> "ManufacturingLossGenerator":
"""Create a simple loss generator (migration helper from ClaimGenerator).
This factory method provides a simplified interface similar to ClaimGenerator,
making migration easier. It creates a generator with mostly attritional losses
and minimal catastrophic risk.
Args:
frequency: Annual frequency of losses (Poisson lambda).
severity_mean: Mean loss amount in dollars.
severity_std: Standard deviation of loss amount.
seed: Random seed for reproducibility.
Returns:
ManufacturingLossGenerator configured for simple use case.
Examples:
Simple usage (equivalent to ClaimGenerator)::
generator = ManufacturingLossGenerator.create_simple(
frequency=0.1,
severity_mean=5_000_000,
severity_std=2_000_000,
seed=42
)
losses, stats = generator.generate_losses(duration=10, revenue=10_000_000)
Accessing loss amounts::
total_loss = sum(loss.amount for loss in losses)
print(f"Total losses: ${total_loss:,.0f}")
print(f"Number of events: {stats['total_losses']}")
Note:
For advanced features (multiple loss types, extreme value modeling),
use the standard __init__ method with explicit parameters.
See Also:
Migration guide: docs/migration_guides/claim_generator_migration.md
"""
# Convert coefficient of variation to lognormal parameters
if severity_mean <= 0:
raise ValueError(f"severity_mean must be positive, got {severity_mean}")
if severity_std < 0:
raise ValueError(f"severity_std must be non-negative, got {severity_std}")
cv = severity_std / severity_mean if severity_mean > 0 else 0
# Configure as primarily attritional with small large loss component
# Attritional losses: 90% of events, use mean/cv approach
attritional_params = {
"base_frequency": frequency * 0.9, # 90% of frequency
"severity_mean": severity_mean * 0.5, # Smaller attritional losses
"severity_cv": cv,
}
# Large losses: 10% of events, use mean/cv approach
large_params = {
"base_frequency": frequency * 0.1, # 10% of frequency
"severity_mean": severity_mean * 2, # Larger losses
"severity_cv": cv * 1.5, # More variable
}
# Catastrophic losses: very rare, use Pareto distribution
catastrophic_params = {
"base_frequency": 0.001, # Very rare
"severity_alpha": 2.5, # Pareto shape
"severity_xm": severity_mean * 5, # Much larger severities
}
return cls(
attritional_params=attritional_params,
large_params=large_params,
catastrophic_params=catastrophic_params,
seed=seed,
)
[docs]
def generate_losses(
self, duration: float, revenue: float, include_catastrophic: bool = True, time: float = 0.0
) -> Tuple[List[LossEvent], Dict[str, Any]]:
"""Generate all types of losses for manufacturing operations.
Args:
duration: Simulation period in years.
revenue: Current revenue level.
include_catastrophic: Whether to include catastrophic events.
time: Current time for exposure calculation (default 0.0).
Returns:
Tuple of (all_losses, statistics_dict).
"""
# Use exposure if available to get actual revenue
if self.exposure is not None:
actual_revenue = self.exposure.get_exposure(time)
else:
actual_revenue = revenue
# Generate each type of loss
attritional_losses = self.attritional.generate_losses(duration, actual_revenue)
large_losses = self.large.generate_losses(duration, actual_revenue)
if include_catastrophic:
catastrophic_losses = self.catastrophic.generate_losses(duration, actual_revenue)
else:
catastrophic_losses = []
# Combine all losses
all_losses = attritional_losses + large_losses + catastrophic_losses
# Sort by time
all_losses.sort(key=lambda x: x.time)
# Apply extreme value transformation if configured
extreme_losses = []
if self.extreme_params is not None and self.threshold_value is not None:
assert self.gpd_generator is not None # For type checking
# Identify losses exceeding threshold
for loss in all_losses:
if loss.amount > self.threshold_value:
# Generate GPD excess
gpd_excess = self.gpd_generator.generate_severity(1)[0]
# Create new extreme loss with transformed amount
extreme_loss = LossEvent(
amount=self.threshold_value + gpd_excess,
time=loss.time,
loss_type="extreme",
)
extreme_losses.append(extreme_loss)
# Remove original loss from its category
if loss in attritional_losses:
attritional_losses.remove(loss)
elif loss in large_losses:
large_losses.remove(loss)
elif loss in catastrophic_losses:
catastrophic_losses.remove(loss)
# Add extreme losses to combined list
all_losses = attritional_losses + large_losses + catastrophic_losses + extreme_losses
# Re-sort by time after adding extreme losses
all_losses.sort(key=lambda x: x.time)
# Calculate statistics
statistics = {
"total_losses": len(all_losses),
"attritional_count": len(attritional_losses),
"large_count": len(large_losses),
"catastrophic_count": len(catastrophic_losses),
"extreme_count": len(extreme_losses),
"total_amount": sum(loss.amount for loss in all_losses),
"attritional_amount": sum(loss.amount for loss in attritional_losses),
"large_amount": sum(loss.amount for loss in large_losses),
"catastrophic_amount": sum(loss.amount for loss in catastrophic_losses),
"extreme_amount": sum(loss.amount for loss in extreme_losses),
"average_loss": (
sum(loss.amount for loss in all_losses) / len(all_losses) if all_losses else 0
),
"max_loss": max((loss.amount for loss in all_losses), default=0),
"annual_frequency": len(all_losses) / duration if duration > 0 else 0,
"annual_expected_loss": (
sum(loss.amount for loss in all_losses) / duration if duration > 0 else 0
),
}
return all_losses, statistics
[docs]
def validate_distributions(
self, n_simulations: int = 10000, duration: float = 1.0, revenue: float = 10_000_000
) -> Dict[str, Dict[str, float]]:
"""Validate distribution properties through simulation.
Args:
n_simulations: Number of simulations to run.
duration: Duration of each simulation.
revenue: Revenue level for testing.
Returns:
Dictionary of validation statistics.
"""
results: Dict[str, List[float]] = {
"attritional": [],
"large": [],
"catastrophic": [],
"extreme": [],
"total": [],
}
for _ in range(n_simulations):
_losses, stats = self.generate_losses(duration, revenue)
results["attritional"].append(stats["attritional_amount"])
results["large"].append(stats["large_amount"])
results["catastrophic"].append(stats["catastrophic_amount"])
results["extreme"].append(stats.get("extreme_amount", 0))
results["total"].append(stats["total_amount"])
# Calculate validation statistics
validation = {}
for loss_type, amounts in results.items():
amounts_array = np.array(amounts)
validation[loss_type] = {
"mean": np.mean(amounts_array),
"std": np.std(amounts_array),
"cv": (
np.std(amounts_array) / np.mean(amounts_array)
if np.mean(amounts_array) > 0
else 0
),
"min": np.min(amounts_array),
"p25": np.percentile(amounts_array, 25),
"median": np.median(amounts_array),
"p75": np.percentile(amounts_array, 75),
"p95": np.percentile(amounts_array, 95),
"p99": np.percentile(amounts_array, 99),
"max": np.max(amounts_array),
}
return validation