Source code for ergodic_insurance.strategy_backtester

"""Strategy backtesting framework for insurance decision strategies.

This module provides base classes and implementations for various insurance
strategies that can be tested and compared in walk-forward validation.

Example:
    >>> from strategy_backtester import ConservativeFixedStrategy, StrategyBacktester
    >>> from simulation import SimulationEngine

    >>> # Create and configure a strategy
    >>> strategy = ConservativeFixedStrategy(
    ...     primary_limit=5000000,
    ...     excess_limit=20000000,
    ...     deductible=100000
    ... )
    >>>
    >>> # Run backtest
    >>> backtester = StrategyBacktester(simulation_engine)
    >>> results = backtester.test_strategy(
    ...     strategy=strategy,
    ...     n_simulations=1000,
    ...     n_years=10
    ... )
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass
import hashlib
import logging
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd

from .insurance import InsuranceLayer, InsurancePolicy
from .insurance_program import EnhancedInsuranceLayer, InsuranceProgram
from .loss_distributions import ManufacturingLossGenerator
from .manufacturer import WidgetManufacturer
from .monte_carlo import MonteCarloEngine, SimulationConfig
from .monte_carlo import SimulationResults as MCSimulationResults
from .optimization import PenaltyMethodOptimizer
from .simulation import Simulation, SimulationResults
from .validation_metrics import MetricCalculator, ValidationMetrics

logger = logging.getLogger(__name__)


[docs] class InsuranceStrategy(ABC): """Abstract base class for insurance strategies. Defines the interface that all insurance strategies must implement for use in backtesting and walk-forward validation. """ def __init__(self, name: str): """Initialize strategy. Args: name: Strategy name for identification. """ self.name = name self.metadata: Dict[str, Any] = {} self.adaptation_history: List[Dict[str, Any]] = []
[docs] @abstractmethod def get_insurance_program( self, manufacturer: WidgetManufacturer, historical_losses: Optional[np.ndarray] = None, current_year: int = 0, ) -> Optional[InsuranceProgram]: """Get insurance program for the current state. Args: manufacturer: Current manufacturer state historical_losses: Past loss data for adaptive strategies current_year: Current year in simulation Returns: InsuranceProgram or None for no insurance. """
[docs] def update(self, losses: np.ndarray, recoveries: np.ndarray, year: int): """Update strategy based on recent experience. Args: losses: Recent loss amounts recoveries: Recent recovery amounts year: Current year """
[docs] def reset(self): """Reset strategy to initial state.""" self.adaptation_history.clear()
[docs] def get_description(self) -> str: """Get strategy description. Returns: Human-readable strategy description. """ return f"{self.name} strategy"
[docs] class NoInsuranceStrategy(InsuranceStrategy): """Baseline strategy with no insurance.""" def __init__(self): """Initialize no insurance strategy.""" super().__init__("No Insurance")
[docs] def get_insurance_program( self, manufacturer: WidgetManufacturer, historical_losses: Optional[np.ndarray] = None, current_year: int = 0, ) -> Optional[InsuranceProgram]: """Return no insurance program. Returns: None to indicate no insurance. """ return None
[docs] class ConservativeFixedStrategy(InsuranceStrategy): """Conservative strategy with high limits and low deductible.""" def __init__( self, primary_limit: float = 5000000, excess_limit: float = 20000000, higher_limit: float = 25000000, deductible: float = 50000, ): """Initialize conservative strategy. Args: primary_limit: Primary layer limit excess_limit: Excess layer limit higher_limit: Higher excess layer limit deductible: Deductible amount """ super().__init__("Conservative Fixed") self.primary_limit = primary_limit self.excess_limit = excess_limit self.higher_limit = higher_limit self.deductible = deductible
[docs] def get_insurance_program( self, manufacturer: WidgetManufacturer, historical_losses: Optional[np.ndarray] = None, current_year: int = 0, ) -> Optional[InsuranceProgram]: """Get conservative insurance program. Returns: InsuranceProgram with high coverage. """ layers = [ EnhancedInsuranceLayer( attachment_point=self.deductible, limit=self.primary_limit - self.deductible, base_premium_rate=0.015, reinstatements=1, ), EnhancedInsuranceLayer( attachment_point=self.primary_limit, limit=self.excess_limit, base_premium_rate=0.008, reinstatements=1, ), EnhancedInsuranceLayer( attachment_point=self.primary_limit + self.excess_limit, limit=self.higher_limit, base_premium_rate=0.004, reinstatements=0, ), ] return InsuranceProgram(layers=layers, deductible=self.deductible)
[docs] class AggressiveFixedStrategy(InsuranceStrategy): """Aggressive strategy with low limits and high deductible.""" def __init__( self, primary_limit: float = 2000000, excess_limit: float = 5000000, deductible: float = 250000, ): """Initialize aggressive strategy. Args: primary_limit: Primary layer limit excess_limit: Excess layer limit deductible: Deductible amount """ super().__init__("Aggressive Fixed") self.primary_limit = primary_limit self.excess_limit = excess_limit self.deductible = deductible
[docs] def get_insurance_program( self, manufacturer: WidgetManufacturer, historical_losses: Optional[np.ndarray] = None, current_year: int = 0, ) -> Optional[InsuranceProgram]: """Get aggressive insurance program. Returns: InsuranceProgram with limited coverage. """ layers = [ EnhancedInsuranceLayer( attachment_point=self.deductible, limit=self.primary_limit - self.deductible, base_premium_rate=0.012, reinstatements=0, ), EnhancedInsuranceLayer( attachment_point=self.primary_limit, limit=self.excess_limit, base_premium_rate=0.006, reinstatements=0, ), ] return InsuranceProgram(layers=layers, deductible=self.deductible)
[docs] class OptimizedStaticStrategy(InsuranceStrategy): """Strategy using optimization to find best static limits.""" def __init__( self, optimizer: Optional[PenaltyMethodOptimizer] = None, target_roe: float = 0.15, max_ruin_prob: float = 0.01, ): """Initialize optimized strategy. Args: optimizer: Optimizer instance to use target_roe: Target ROE for optimization max_ruin_prob: Maximum acceptable ruin probability """ super().__init__("Optimized Static") self.optimizer = optimizer self.target_roe = target_roe self.max_ruin_prob = max_ruin_prob self.optimized_params: Optional[Dict[str, float]] = None
[docs] def optimize_limits(self, manufacturer: WidgetManufacturer, simulation_engine: Simulation): """Run optimization to find best limits. Args: manufacturer: Manufacturer instance simulation_engine: Simulation engine for evaluation """ # Define optimization problem def objective(x): # x = [deductible, primary_limit, excess_limit] deductible, primary, excess = x # Create trial insurance program layers = [ InsuranceLayer(deductible, primary - deductible, 0.015), InsuranceLayer(primary, excess, 0.008), ] policy = InsurancePolicy(layers, deductible) # Run short simulation to evaluate # Run short simulation using MonteCarloEngine from .monte_carlo import MonteCarloEngine from .monte_carlo import SimulationConfig as MCConfig mc_config = MCConfig(n_simulations=100, n_years=5) # Create loss generator loss_generator = ManufacturingLossGenerator(seed=42) # Create insurance program from policy from .insurance_program import EnhancedInsuranceLayer, InsuranceProgram program_layers = [ EnhancedInsuranceLayer( attachment_point=layer.attachment_point, limit=layer.limit, base_premium_rate=layer.rate, ) for layer in policy.layers ] program = InsuranceProgram(layers=program_layers) # Initialize Monte Carlo engine with required parameters mc_engine = MonteCarloEngine( loss_generator=loss_generator, insurance_program=program, manufacturer=manufacturer, config=mc_config, ) results = mc_engine.run() # Maximize ROE (minimize negative ROE) return -results.metrics.get("mean_roe", 0) # Define constraints def ruin_constraint(x): deductible, primary, excess = x layers = [ InsuranceLayer(deductible, primary - deductible, 0.015), InsuranceLayer(primary, excess, 0.008), ] policy = InsurancePolicy(layers, deductible) # Run short simulation using MonteCarloEngine from .monte_carlo import MonteCarloEngine from .monte_carlo import SimulationConfig as MCConfig mc_config = MCConfig(n_simulations=100, n_years=5) # Create loss generator loss_generator = ManufacturingLossGenerator(seed=42) # Create insurance program from policy from .insurance_program import EnhancedInsuranceLayer, InsuranceProgram program_layers = [ EnhancedInsuranceLayer( attachment_point=layer.attachment_point, limit=layer.limit, base_premium_rate=layer.rate, ) for layer in policy.layers ] program = InsuranceProgram(layers=program_layers) # Initialize Monte Carlo engine with required parameters mc_engine = MonteCarloEngine( loss_generator=loss_generator, insurance_program=program, manufacturer=manufacturer, config=mc_config, ) results = mc_engine.run() # Constraint: ruin_prob <= max_ruin_prob # Extract ruin probability for the final year ruin_prob_value = results.ruin_probability.get( str(mc_config.n_years), list(results.ruin_probability.values())[-1] if results.ruin_probability else 0.0, ) return self.max_ruin_prob - ruin_prob_value # Run optimization using scipy minimize from scipy.optimize import minimize bounds = [(50000, 500000), (1000000, 10000000), (5000000, 50000000)] constraints = [{"type": "ineq", "fun": ruin_constraint}] x0 = [100000, 5000000, 20000000] # Initial guess result = minimize( objective, x0, method="SLSQP", bounds=bounds, constraints=constraints, options={"maxiter": 100}, ) if result.success: self.optimized_params = { "deductible": result.x[0], "primary_limit": result.x[1], "excess_limit": result.x[2], } logger.info(f"Optimization successful: {self.optimized_params}") else: # Fall back to conservative defaults self.optimized_params = { "deductible": 100000, "primary_limit": 5000000, "excess_limit": 20000000, } logger.warning(f"Optimization failed, using defaults: {self.optimized_params}")
[docs] def get_insurance_program( self, manufacturer: WidgetManufacturer, historical_losses: Optional[np.ndarray] = None, current_year: int = 0, ) -> Optional[InsuranceProgram]: """Get optimized insurance program. Returns: InsuranceProgram with optimized parameters. """ if not self.optimized_params: # Use conservative defaults if not optimized self.optimized_params = { "deductible": 100000, "primary_limit": 5000000, "excess_limit": 20000000, } layers = [ EnhancedInsuranceLayer( attachment_point=self.optimized_params["deductible"], limit=self.optimized_params["primary_limit"] - self.optimized_params["deductible"], base_premium_rate=0.015, reinstatements=1, ), EnhancedInsuranceLayer( attachment_point=self.optimized_params["primary_limit"], limit=self.optimized_params["excess_limit"], base_premium_rate=0.008, reinstatements=0, ), ] return InsuranceProgram(layers=layers, deductible=self.optimized_params["deductible"])
[docs] class AdaptiveStrategy(InsuranceStrategy): """Strategy that adjusts based on recent loss experience.""" def __init__( self, base_deductible: float = 100000, base_primary: float = 3000000, base_excess: float = 10000000, adaptation_window: int = 3, adjustment_factor: float = 0.2, ): """Initialize adaptive strategy. Args: base_deductible: Base deductible amount base_primary: Base primary limit base_excess: Base excess limit adaptation_window: Years of history to consider adjustment_factor: How much to adjust limits (0-1) """ super().__init__("Adaptive") self.base_deductible = base_deductible self.base_primary = base_primary self.base_excess = base_excess self.adaptation_window = adaptation_window self.adjustment_factor = adjustment_factor # Current adjusted parameters self.current_deductible = base_deductible self.current_primary = base_primary self.current_excess = base_excess # Loss history for adaptation self.loss_history: List[float] = []
[docs] def update(self, losses: np.ndarray, recoveries: np.ndarray, year: int): """Update strategy based on recent losses. Args: losses: Recent loss amounts recoveries: Recent recovery amounts year: Current year """ # Add to history total_losses = float(np.sum(losses)) self.loss_history.append(total_losses) # Keep only recent history if len(self.loss_history) > self.adaptation_window: self.loss_history = self.loss_history[-self.adaptation_window :] # Adapt if we have enough history if len(self.loss_history) >= 2: avg_losses = np.mean(self.loss_history) recent_losses = self.loss_history[-1] # Calculate adjustment ratio if avg_losses > 0: ratio = float(recent_losses / avg_losses) else: ratio = 1.0 # Adjust limits based on recent experience if ratio > 1.5: # Recent losses much higher than average # Increase coverage adjustment = float(1 + self.adjustment_factor * (ratio - 1)) self.current_primary = float( min(self.base_primary * adjustment, self.base_primary * 2) ) self.current_excess = float( min(self.base_excess * adjustment, self.base_excess * 2) ) self.current_deductible = float( max(self.base_deductible / adjustment, self.base_deductible * 0.5) ) elif ratio < 0.5: # Recent losses much lower than average # Decrease coverage adjustment = float(1 - self.adjustment_factor * (1 - ratio)) self.current_primary = float( max(self.base_primary * adjustment, self.base_primary * 0.5) ) self.current_excess = float( max(self.base_excess * adjustment, self.base_excess * 0.5) ) self.current_deductible = float( min(self.base_deductible / adjustment, self.base_deductible * 2) ) else: # Gradually return to base levels self.current_primary = 0.9 * self.current_primary + 0.1 * self.base_primary self.current_excess = 0.9 * self.current_excess + 0.1 * self.base_excess self.current_deductible = 0.9 * self.current_deductible + 0.1 * self.base_deductible # Record adaptation self.adaptation_history.append( { "year": year, "avg_losses": avg_losses, "recent_losses": recent_losses, "ratio": ratio, "deductible": self.current_deductible, "primary": self.current_primary, "excess": self.current_excess, } )
[docs] def get_insurance_program( self, manufacturer: WidgetManufacturer, historical_losses: Optional[np.ndarray] = None, current_year: int = 0, ) -> Optional[InsuranceProgram]: """Get adaptive insurance program. Returns: InsuranceProgram with adapted parameters. """ layers = [ EnhancedInsuranceLayer( attachment_point=self.current_deductible, limit=self.current_primary - self.current_deductible, base_premium_rate=0.014, # Slightly lower due to adaptability reinstatements=1, ), EnhancedInsuranceLayer( attachment_point=self.current_primary, limit=self.current_excess, base_premium_rate=0.007, reinstatements=0, ), ] return InsuranceProgram(layers=layers, deductible=self.current_deductible)
[docs] def reset(self): """Reset strategy to initial state.""" super().reset() self.current_deductible = self.base_deductible self.current_primary = self.base_primary self.current_excess = self.base_excess self.loss_history.clear()
[docs] @dataclass class BacktestResult: """Results from strategy backtesting. Attributes: strategy_name: Name of tested strategy simulation_results: Raw simulation results (either Simulation or MC results) metrics: Calculated performance metrics execution_time: Time taken to run backtest config: Configuration used for backtest """ strategy_name: str simulation_results: Union[SimulationResults, MCSimulationResults] metrics: ValidationMetrics execution_time: float config: SimulationConfig
[docs] class StrategyBacktester: """Engine for backtesting insurance strategies.""" def __init__( self, simulation_engine: Optional[Simulation] = None, metric_calculator: Optional[MetricCalculator] = None, ): """Initialize backtester. Args: simulation_engine: Engine for running simulations metric_calculator: Calculator for performance metrics """ self.simulation_engine = simulation_engine self.metric_calculator = metric_calculator or MetricCalculator() self.results_cache: Dict[str, BacktestResult] = {}
[docs] def test_strategy( self, strategy: InsuranceStrategy, manufacturer: WidgetManufacturer, config: SimulationConfig, use_cache: bool = True, ) -> BacktestResult: """Test a single strategy. Args: strategy: Strategy to test manufacturer: Manufacturer instance config: Simulation configuration use_cache: Whether to use cached results Returns: BacktestResult with performance metrics. """ # Check cache cache_key = f"{strategy.name}_{hashlib.sha256(str(config).encode()).hexdigest()[:16]}" if use_cache and cache_key in self.results_cache: logger.info(f"Using cached results for {strategy.name}") return self.results_cache[cache_key] logger.info(f"Testing strategy: {strategy.name}") # Handle OptimizedStaticStrategy if isinstance(strategy, OptimizedStaticStrategy) and not strategy.optimized_params: logger.info(f"Running optimization for {strategy.name}") # Ensure we pass a valid simulation engine if self.simulation_engine is not None: strategy.optimize_limits(manufacturer, self.simulation_engine) else: logger.warning("No simulation engine available for optimization") # Get insurance program insurance_program = strategy.get_insurance_program(manufacturer) # Run simulation import time start_time = time.time() # Create loss generator loss_generator = ManufacturingLossGenerator(seed=config.seed) # Create a default insurance program if None is returned if insurance_program is None: from .insurance_program import InsuranceProgram insurance_program = InsuranceProgram(layers=[]) # No insurance # Initialize Monte Carlo engine with required parameters monte_carlo = MonteCarloEngine( loss_generator=loss_generator, insurance_program=insurance_program, manufacturer=manufacturer, config=config, ) simulation_results = monte_carlo.run() execution_time = time.time() - start_time # Calculate metrics - handle Monte Carlo results metrics = self._calculate_metrics_mc(simulation_results, config.n_years) # Create result result = BacktestResult( strategy_name=strategy.name, simulation_results=simulation_results, metrics=metrics, execution_time=execution_time, config=config, ) # Cache result if use_cache: self.results_cache[cache_key] = result return result
[docs] def test_multiple_strategies( self, strategies: List[InsuranceStrategy], manufacturer: WidgetManufacturer, config: SimulationConfig, ) -> pd.DataFrame: """Test multiple strategies and compare. Args: strategies: List of strategies to test manufacturer: Manufacturer instance config: Simulation configuration Returns: DataFrame comparing strategy performance. """ results = [] for strategy in strategies: result = self.test_strategy(strategy, manufacturer, config) # Create summary row row = { "strategy": strategy.name, "roe": result.metrics.roe, "ruin_probability": result.metrics.ruin_probability, "growth_rate": result.metrics.growth_rate, "volatility": result.metrics.volatility, "sharpe_ratio": result.metrics.sharpe_ratio, "max_drawdown": result.metrics.max_drawdown, "execution_time": result.execution_time, } results.append(row) return pd.DataFrame(results)
def _calculate_metrics_mc( self, simulation_results: MCSimulationResults, n_years: int ) -> ValidationMetrics: """Calculate metrics from Monte Carlo simulation results. Args: simulation_results: Monte Carlo simulation results n_years: Number of years simulated Returns: ValidationMetrics object. """ # Extract returns and final assets returns = simulation_results.growth_rates final_assets = simulation_results.final_assets # Calculate metrics metrics = self.metric_calculator.calculate_metrics( returns=returns, final_assets=final_assets, initial_assets=10000000, # Default initial assets n_years=n_years, ) # Add ruin probability from MC results # Extract ruin probability for the final year from the dict metrics.ruin_probability = simulation_results.ruin_probability.get( str(n_years), ( list(simulation_results.ruin_probability.values())[-1] if simulation_results.ruin_probability else 0.0 ), ) return metrics def _calculate_metrics( self, simulation_results: SimulationResults, n_years: int ) -> ValidationMetrics: """Calculate metrics from simulation results. Args: simulation_results: Raw simulation results n_years: Number of years simulated Returns: ValidationMetrics object. """ # Calculate growth rates from ROE returns = simulation_results.roe # Get final assets from the last asset value final_assets = np.array([simulation_results.assets[-1]]) # Calculate metrics metrics = self.metric_calculator.calculate_metrics( returns=returns, final_assets=final_assets, initial_assets=10000000, # Default initial assets n_years=n_years, ) # Calculate ruin probability from insolvency metrics.ruin_probability = 1.0 if simulation_results.insolvency_year is not None else 0.0 return metrics