Source code for ergodic_insurance.business_optimizer

"""Business outcome optimization algorithms for insurance decisions.

This module implements sophisticated optimization algorithms focused on real business
outcomes (ROE, growth rate, survival probability) rather than technical metrics.
These algorithms maximize long-term company value through optimal insurance decisions.

Author: Alex Filiakov
Date: 2025-01-25
"""

# pylint: disable=too-many-lines

from dataclasses import dataclass, field
from enum import Enum
import logging
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd
from scipy import optimize

from .config import BusinessOptimizerConfig
from .decision_engine import InsuranceDecisionEngine
from .ergodic_analyzer import ErgodicAnalyzer
from .loss_distributions import LossDistribution
from .manufacturer import WidgetManufacturer

logger = logging.getLogger(__name__)


[docs] class OptimizationDirection(Enum): """Direction of optimization for objectives.""" MAXIMIZE = "maximize" MINIMIZE = "minimize"
[docs] @dataclass class BusinessObjective: """Business optimization objective definition. Attributes: name: Name of the objective (e.g., 'ROE', 'bankruptcy_risk') weight: Weight in multi-objective optimization (0-1) target_value: Optional target value for the objective optimization_direction: Whether to maximize or minimize constraint_type: Optional constraint type ('>=', '<=', '==') constraint_value: Optional constraint value """ name: str weight: float = 1.0 target_value: Optional[float] = None optimization_direction: OptimizationDirection = OptimizationDirection.MAXIMIZE constraint_type: Optional[str] = None constraint_value: Optional[float] = None
[docs] def __post_init__(self): """Validate objective configuration.""" if not 0 <= self.weight <= 1: raise ValueError(f"Weight must be between 0 and 1, got {self.weight}") if self.constraint_type and self.constraint_type not in [">=", "<=", "=="]: raise ValueError(f"Invalid constraint type: {self.constraint_type}")
[docs] @dataclass class BusinessConstraints: """Business optimization constraints. Attributes: max_risk_tolerance: Maximum acceptable probability of bankruptcy min_roe_threshold: Minimum required return on equity max_leverage_ratio: Maximum debt-to-equity ratio min_liquidity_ratio: Minimum liquidity requirements max_premium_budget: Maximum insurance premium as % of revenue min_coverage_ratio: Minimum coverage as % of assets regulatory_requirements: Additional regulatory constraints """ max_risk_tolerance: float = 0.01 # 1% bankruptcy risk min_roe_threshold: float = 0.10 # 10% minimum ROE max_leverage_ratio: float = 2.0 # 2:1 debt-to-equity min_liquidity_ratio: float = 1.2 # 1.2x current ratio max_premium_budget: float = 0.02 # 2% of revenue min_coverage_ratio: float = 0.5 # 50% of assets regulatory_requirements: Dict[str, float] = field(default_factory=dict)
[docs] def __post_init__(self): """Validate constraint values.""" if self.max_risk_tolerance < 0 or self.max_risk_tolerance > 1: raise ValueError("Risk tolerance must be between 0 and 1") if self.min_roe_threshold < 0: raise ValueError("ROE threshold must be non-negative") if self.max_leverage_ratio < 0: raise ValueError("Leverage ratio must be non-negative") if self.min_liquidity_ratio < 0: raise ValueError("Liquidity ratio must be non-negative")
[docs] @dataclass class OptimalStrategy: """Optimal insurance strategy result. Attributes: coverage_limit: Optimal coverage limit amount deductible: Optimal deductible amount premium_rate: Optimal premium rate expected_roe: Expected ROE with this strategy bankruptcy_risk: Probability of bankruptcy growth_rate: Expected growth rate capital_efficiency: Capital efficiency ratio recommendations: List of actionable recommendations """ coverage_limit: float deductible: float premium_rate: float expected_roe: float bankruptcy_risk: float growth_rate: float capital_efficiency: float recommendations: List[str] = field(default_factory=list)
[docs] def to_dict(self) -> Dict[str, Union[float, List[str]]]: """Convert to dictionary for serialization.""" return { "coverage_limit": self.coverage_limit, "deductible": self.deductible, "premium_rate": self.premium_rate, "expected_roe": self.expected_roe, "bankruptcy_risk": self.bankruptcy_risk, "growth_rate": self.growth_rate, "capital_efficiency": self.capital_efficiency, "recommendations": self.recommendations, }
[docs] @dataclass class BusinessOptimizationResult: """Result of business outcome optimization. Attributes: optimal_strategy: The optimal insurance strategy objective_values: Values achieved for each objective constraint_satisfaction: Status of constraint satisfaction convergence_info: Optimization convergence information sensitivity_analysis: Sensitivity to parameter changes """ optimal_strategy: OptimalStrategy objective_values: Dict[str, float] constraint_satisfaction: Dict[str, bool] convergence_info: Dict[str, Union[bool, int, float]] sensitivity_analysis: Optional[Dict[str, float]] = None
[docs] def is_feasible(self) -> bool: """Check if all constraints are satisfied.""" return all(self.constraint_satisfaction.values())
[docs] class BusinessOptimizer: """Optimize business outcomes through insurance decisions. This class implements sophisticated optimization algorithms focused on real business metrics like ROE, growth rate, and survival probability. """ def __init__( self, manufacturer: WidgetManufacturer, decision_engine: Optional[InsuranceDecisionEngine] = None, ergodic_analyzer: Optional[ErgodicAnalyzer] = None, loss_distribution: Optional[LossDistribution] = None, optimizer_config: Optional[BusinessOptimizerConfig] = None, ): """Initialize business optimizer. Args: manufacturer: Widget manufacturer model decision_engine: Insurance decision engine (optional) ergodic_analyzer: Ergodic analysis tools (optional) loss_distribution: Loss distribution model (optional) optimizer_config: Configuration for optimizer heuristic parameters (optional). If None, uses default BusinessOptimizerConfig values. """ self.manufacturer = manufacturer self.optimizer_config = optimizer_config or BusinessOptimizerConfig() # Create default loss distribution if not provided if loss_distribution is None: from .loss_distributions import LognormalLoss loss_distribution = LognormalLoss(mean=100000, cv=1.5) self.loss_distribution = loss_distribution self.decision_engine = decision_engine or InsuranceDecisionEngine( manufacturer, loss_distribution ) self.ergodic_analyzer = ergodic_analyzer self.logger = logging.getLogger(self.__class__.__name__)
[docs] def maximize_roe_with_insurance( self, constraints: BusinessConstraints, time_horizon: int = 10, n_simulations: int = 1000 ) -> OptimalStrategy: """Maximize ROE subject to business constraints. Objective: max(ROE_with_insurance - ROE_baseline) Args: constraints: Business constraints to satisfy time_horizon: Planning horizon in years n_simulations: Number of Monte Carlo simulations Returns: Optimal insurance strategy maximizing ROE """ self.logger.info( f"Maximizing ROE over {time_horizon} years with {n_simulations} simulations" ) # Boundary: float for scipy.optimize total_assets = float(self.manufacturer.total_assets) revenue = float(self.manufacturer.calculate_revenue()) # Define optimization bounds bounds = [ (1e6, min(total_assets * 2, 100e6)), # Coverage limit (0, 1e6), # Deductible (0.001, 0.10), # Premium rate (0.1% to 10%) ] # Define objective function def objective(x): coverage_limit, deductible, premium_rate = x # Simulate with insurance roe_with_insurance = self._simulate_roe( coverage_limit=coverage_limit, deductible=deductible, premium_rate=premium_rate, time_horizon=time_horizon, n_simulations=n_simulations, ) # Return negative ROE for minimization return -roe_with_insurance # Define constraints constraint_list = [] # Premium budget constraint def premium_constraint(x): _, _, premium_rate = x coverage_limit = x[0] annual_premium = coverage_limit * premium_rate max_premium = revenue * constraints.max_premium_budget return max_premium - annual_premium constraint_list.append({"type": "ineq", "fun": premium_constraint}) # Bankruptcy risk constraint def risk_constraint(x): bankruptcy_risk = self._estimate_bankruptcy_risk( coverage_limit=x[0], deductible=x[1], premium_rate=x[2], time_horizon=time_horizon ) return constraints.max_risk_tolerance - bankruptcy_risk constraint_list.append({"type": "ineq", "fun": risk_constraint}) # Coverage ratio constraint def coverage_constraint(x): coverage_limit = x[0] min_coverage = total_assets * constraints.min_coverage_ratio return coverage_limit - min_coverage constraint_list.append({"type": "ineq", "fun": coverage_constraint}) # Initial guess x0 = [ total_assets * 0.8, # 80% of assets 100000, # $100k deductible 0.02, # 2% premium rate ] # Run optimization result = optimize.minimize( objective, x0, method="SLSQP", bounds=bounds, constraints=constraint_list, options={"maxiter": 100, "ftol": 1e-6}, ) if not result.success: self.logger.warning(f"Optimization did not converge: {result.message}") # Extract optimal values optimal_coverage, optimal_deductible, optimal_premium = result.x optimal_roe = -result.fun # Calculate additional metrics bankruptcy_risk = self._estimate_bankruptcy_risk( optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) growth_rate = self._estimate_growth_rate( optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) capital_efficiency = self._calculate_capital_efficiency( optimal_coverage, optimal_deductible, optimal_premium ) # Generate recommendations recommendations = self._generate_roe_recommendations( optimal_coverage, optimal_deductible, optimal_premium, optimal_roe ) return OptimalStrategy( coverage_limit=optimal_coverage, deductible=optimal_deductible, premium_rate=optimal_premium, expected_roe=optimal_roe, bankruptcy_risk=bankruptcy_risk, growth_rate=growth_rate, capital_efficiency=capital_efficiency, recommendations=recommendations, )
[docs] def minimize_bankruptcy_risk( self, growth_targets: Dict[str, float], budget_constraint: float, time_horizon: int = 10 ) -> OptimalStrategy: # pylint: disable=too-many-locals """Minimize bankruptcy risk while achieving growth targets. Objective: min(P(bankruptcy)) Args: growth_targets: Target growth rates (e.g., {'revenue': 0.15, 'assets': 0.10}) budget_constraint: Maximum premium budget time_horizon: Planning horizon in years Returns: Risk-minimizing insurance strategy """ self.logger.info(f"Minimizing bankruptcy risk over {time_horizon} years") # Define optimization bounds total_assets = float(self.manufacturer.total_assets) bounds = [ (1e6, min(total_assets * 3, 150e6)), # Coverage limit (0, 500000), # Deductible (0.001, 0.15), # Premium rate (0.1% to 15%) ] # Define objective function (minimize bankruptcy risk) def objective(x): coverage_limit, deductible, premium_rate = x bankruptcy_risk = self._estimate_bankruptcy_risk( coverage_limit, deductible, premium_rate, time_horizon ) return bankruptcy_risk # Define constraints constraint_list = [] # Budget constraint def budget_constraint_fn(x): coverage_limit, _, premium_rate = x annual_premium = coverage_limit * premium_rate return budget_constraint - annual_premium constraint_list.append({"type": "ineq", "fun": budget_constraint_fn}) # Growth target constraints for metric, target in growth_targets.items(): def growth_constraint(x, metric=metric, target=target): growth_rate = self._estimate_growth_rate( x[0], x[1], x[2], time_horizon, metric=metric ) return growth_rate - target constraint_list.append({"type": "ineq", "fun": growth_constraint}) # Initial guess x0 = [ total_assets * 1.5, # 150% of assets 50000, # $50k deductible 0.03, # 3% premium rate ] # Run optimization result = optimize.minimize( objective, x0, method="SLSQP", bounds=bounds, constraints=constraint_list, options={"maxiter": 150, "ftol": 1e-7}, ) if not result.success: self.logger.warning(f"Risk minimization did not converge: {result.message}") # Extract optimal values optimal_coverage, optimal_deductible, optimal_premium = result.x optimal_risk = result.fun # Calculate additional metrics expected_roe = self._simulate_roe( optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) growth_rate = self._estimate_growth_rate( optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) capital_efficiency = self._calculate_capital_efficiency( optimal_coverage, optimal_deductible, optimal_premium ) # Generate recommendations recommendations = self._generate_risk_recommendations( optimal_coverage, optimal_deductible, optimal_premium, optimal_risk ) return OptimalStrategy( coverage_limit=optimal_coverage, deductible=optimal_deductible, premium_rate=optimal_premium, expected_roe=expected_roe, bankruptcy_risk=optimal_risk, growth_rate=growth_rate, capital_efficiency=capital_efficiency, recommendations=recommendations, )
[docs] def optimize_capital_efficiency( self, available_capital: float, investment_opportunities: Dict[str, float] ) -> Dict[str, float]: """Optimize capital allocation across insurance and investments. Args: available_capital: Total capital available for allocation investment_opportunities: Opportunities with expected returns Returns: Optimal capital allocation dictionary """ self.logger.info(f"Optimizing capital efficiency with ${available_capital:,.0f}") # Categories for capital allocation categories = ["insurance_premium", "working_capital", "growth_investment", "cash_reserve"] # Expected returns for each category expected_returns = { "insurance_premium": self._estimate_insurance_return(), "working_capital": 0.12, # Working capital efficiency "growth_investment": investment_opportunities.get("growth", 0.20), "cash_reserve": 0.02, # Minimal return on reserves } # Risk factors for each category risk_factors = { "insurance_premium": 0.05, # Low risk due to protection "working_capital": 0.15, "growth_investment": 0.30, "cash_reserve": 0.01, } # Define optimization problem n_categories = len(categories) # Objective: maximize risk-adjusted return def objective(x): total_return = sum(x[i] * expected_returns[cat] for i, cat in enumerate(categories)) total_risk = np.sqrt( sum((x[i] * risk_factors[cat]) ** 2 for i, cat in enumerate(categories)) ) sharpe_ratio = total_return / (total_risk + 1e-6) # Risk-adjusted return return -sharpe_ratio # Negative for minimization # Constraints constraints = [ # Sum equals available capital {"type": "eq", "fun": lambda x: sum(x) - available_capital}, # Minimum insurance allocation (1% of capital) {"type": "ineq", "fun": lambda x: x[0] - 0.01 * available_capital}, # Minimum working capital (15% of capital) {"type": "ineq", "fun": lambda x: x[1] - 0.15 * available_capital}, # Minimum cash reserve (5% of capital) {"type": "ineq", "fun": lambda x: x[3] - 0.05 * available_capital}, ] # Bounds (all non-negative, up to full capital) bounds = [(0, available_capital) for _ in range(n_categories)] # Initial guess (equal allocation) x0 = [available_capital / n_categories] * n_categories # Run optimization result = optimize.minimize( objective, x0, method="SLSQP", bounds=bounds, constraints=constraints, options={"maxiter": 100}, ) if not result.success: self.logger.warning("Capital allocation optimization did not fully converge") # Create allocation dictionary allocation = {cat: result.x[i] for i, cat in enumerate(categories)} # Add efficiency metrics allocation["expected_return"] = sum( allocation[cat] * expected_returns[cat] for cat in categories ) allocation["risk_level"] = np.sqrt( sum((allocation[cat] * risk_factors[cat]) ** 2 for cat in categories) ) allocation["sharpe_ratio"] = -result.fun return allocation
[docs] def analyze_time_horizon_impact( self, strategies: List[Dict[str, Any]], time_horizons: Optional[List[int]] = None ) -> pd.DataFrame: """Analyze strategy performance across different time horizons. Args: strategies: List of strategy parameters time_horizons: List of time horizons to analyze Returns: DataFrame with performance metrics by time horizon """ if time_horizons is None: time_horizons = [1, 3, 10, 30] # Default horizons self.logger.info( f"Analyzing {len(strategies)} strategies across {len(time_horizons)} time horizons" ) results = [] for strategy in strategies: coverage_limit = strategy.get( "coverage_limit", float(self.manufacturer.total_assets) ) # Boundary: float for scipy.optimize deductible = strategy.get("deductible", 100000) premium_rate = strategy.get("premium_rate", 0.02) strategy_name = strategy.get("name", "Strategy") for horizon in time_horizons: # Calculate metrics for this combination roe = self._simulate_roe(coverage_limit, deductible, premium_rate, horizon) bankruptcy_risk = self._estimate_bankruptcy_risk( coverage_limit, deductible, premium_rate, horizon ) growth_rate = self._estimate_growth_rate( coverage_limit, deductible, premium_rate, horizon ) # Calculate ergodic vs ensemble difference if analyzer available ergodic_diff: float = 0.0 if self.ergodic_analyzer and horizon >= 10: ergodic_growth = self._calculate_ergodic_growth( coverage_limit, deductible, premium_rate, horizon ) ensemble_growth = growth_rate ergodic_diff = ergodic_growth - ensemble_growth results.append( { "strategy": strategy_name, "horizon_years": horizon, "coverage_limit": coverage_limit, "deductible": deductible, "premium_rate": premium_rate, "expected_roe": roe, "bankruptcy_risk": bankruptcy_risk, "growth_rate": growth_rate, "ergodic_difference": ergodic_diff, "horizon_category": self._categorize_horizon(horizon), } ) df = pd.DataFrame(results) # Add relative performance metrics for horizon in time_horizons: mask = df["horizon_years"] == horizon df.loc[mask, "roe_rank"] = df.loc[mask, "expected_roe"].rank(ascending=False) df.loc[mask, "risk_rank"] = df.loc[mask, "bankruptcy_risk"].rank(ascending=True) return df
[docs] def optimize_business_outcomes( self, objectives: List[BusinessObjective], constraints: BusinessConstraints, time_horizon: int = 10, method: str = "weighted_sum", ) -> BusinessOptimizationResult: # pylint: disable=too-many-locals """Multi-objective optimization of business outcomes. Args: objectives: List of business objectives to optimize constraints: Business constraints to satisfy time_horizon: Planning horizon in years method: Optimization method ('weighted_sum', 'epsilon_constraint', 'pareto') Returns: Comprehensive optimization result """ self.logger.info(f"Optimizing {len(objectives)} objectives using {method} method") # Normalize weights total_weight = sum(obj.weight for obj in objectives) if total_weight > 0: for obj in objectives: obj.weight /= total_weight # Define optimization bounds # Boundary: float for scipy.optimize total_assets = float(self.manufacturer.total_assets) bounds = [ (1e6, min(total_assets * 2.5, 100e6)), # Coverage limit (0, 500000), # Deductible (0.001, 0.10), # Premium rate ] # Build composite objective function def composite_objective(x): coverage_limit, deductible, premium_rate = x total_score = 0.0 total_score = 0.0 for obj in objectives: value = self._evaluate_objective( obj.name, coverage_limit, deductible, premium_rate, time_horizon ) # Normalize and apply direction if obj.optimization_direction == OptimizationDirection.MAXIMIZE: score = value # Higher is better else: score = -value # Lower is better (negate for minimization) total_score = total_score + obj.weight * score return -total_score # Negative for scipy minimization # Build constraints list constraint_list = self._build_constraint_list(objectives, constraints, time_horizon) # Initial guess x0 = [total_assets * 1.0, 100000, 0.025] # Run optimization result = optimize.minimize( composite_objective, x0, method="SLSQP", bounds=bounds, constraints=constraint_list, options={"maxiter": 200, "ftol": 1e-6}, ) # Extract results optimal_coverage, optimal_deductible, optimal_premium = result.x # Calculate all objective values objective_values = {} for obj in objectives: value = self._evaluate_objective( obj.name, optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) objective_values[obj.name] = value # Check constraint satisfaction constraint_satisfaction = self._check_constraints( optimal_coverage, optimal_deductible, optimal_premium, constraints, time_horizon ) # Perform sensitivity analysis sensitivity = self._perform_sensitivity_analysis( optimal_coverage, optimal_deductible, optimal_premium, objectives, time_horizon ) # Create optimal strategy optimal_strategy = OptimalStrategy( coverage_limit=optimal_coverage, deductible=optimal_deductible, premium_rate=optimal_premium, expected_roe=objective_values.get("ROE", 0), bankruptcy_risk=objective_values.get("bankruptcy_risk", 0), growth_rate=objective_values.get("growth_rate", 0), capital_efficiency=self._calculate_capital_efficiency( optimal_coverage, optimal_deductible, optimal_premium ), recommendations=self._generate_comprehensive_recommendations( optimal_coverage, optimal_deductible, optimal_premium, objective_values ), ) # Build convergence info convergence_info = { "converged": result.success, "iterations": result.nit if hasattr(result, "nit") else 0, "function_value": result.fun, "message": result.message if hasattr(result, "message") else "Optimization complete", } return BusinessOptimizationResult( optimal_strategy=optimal_strategy, objective_values=objective_values, constraint_satisfaction=constraint_satisfaction, convergence_info=convergence_info, sensitivity_analysis=sensitivity, )
# Private helper methods def _simulate_roe( self, coverage_limit: float, deductible: float, premium_rate: float, time_horizon: int, n_simulations: int = 100, ) -> float: """Simulate ROE with given insurance parameters.""" rng = np.random.default_rng() roe_values = [] # Boundary: float for scipy.optimize equity = float(self.manufacturer.equity) total_assets = float(self.manufacturer.total_assets) annual_premium = coverage_limit * premium_rate for _ in range(min(n_simulations, 100)): # Limit for performance # Simple ROE simulation base_roe = self.optimizer_config.base_roe # Insurance impact premium_cost = annual_premium / equity protection_benefit = self.optimizer_config.protection_benefit_factor * ( coverage_limit / total_assets ) # Adjust ROE adjusted_roe = base_roe - premium_cost + protection_benefit # Add randomness adjusted_roe *= rng.normal(1.0, self.optimizer_config.roe_noise_std) roe_values.append(adjusted_roe) return float(np.mean(roe_values)) def _estimate_bankruptcy_risk( self, coverage_limit: float, deductible: float, premium_rate: float, time_horizon: int ) -> float: """Estimate probability of bankruptcy.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) revenue = float(self.manufacturer.calculate_revenue()) annual_premium = coverage_limit * premium_rate # Simple bankruptcy risk model base_risk = self.optimizer_config.base_bankruptcy_risk # Insurance reduces risk coverage_ratio = coverage_limit / total_assets risk_reduction = min( coverage_ratio * self.optimizer_config.max_risk_reduction, self.optimizer_config.max_risk_reduction, ) # Premium cost increases risk slightly premium_burden = annual_premium / revenue risk_increase = premium_burden * self.optimizer_config.premium_burden_risk_factor # Time horizon effect time_factor = 1 - np.exp( -time_horizon / self.optimizer_config.time_risk_constant ) # Risk increases with time bankruptcy_risk = (base_risk - risk_reduction + risk_increase) * time_factor return float(max(0, min(1, bankruptcy_risk))) def _estimate_growth_rate( self, coverage_limit: float, deductible: float, premium_rate: float, time_horizon: int, metric: str = "revenue", ) -> float: """Estimate growth rate for given metric.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) revenue = float(self.manufacturer.calculate_revenue()) annual_premium = coverage_limit * premium_rate # Base growth rate base_growth = self.optimizer_config.base_growth_rate # Insurance enables more aggressive growth coverage_ratio = coverage_limit / total_assets growth_boost = coverage_ratio * self.optimizer_config.growth_boost_factor # Premium cost reduces growth premium_drag = annual_premium / revenue * self.optimizer_config.premium_drag_factor # Calculate adjusted growth adjusted_growth = base_growth + growth_boost - premium_drag # Adjust for different metrics if metric == "assets": adjusted_growth *= self.optimizer_config.asset_growth_factor elif metric == "equity": adjusted_growth *= self.optimizer_config.equity_growth_factor return float(max(0, adjusted_growth)) def _calculate_capital_efficiency( self, coverage_limit: float, deductible: float, premium_rate: float ) -> float: """Calculate capital efficiency ratio.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) annual_premium = coverage_limit * premium_rate # Capital freed by risk transfer risk_transfer_benefit = coverage_limit * self.optimizer_config.risk_transfer_benefit_rate # Net capital efficiency net_benefit = risk_transfer_benefit - annual_premium efficiency_ratio = 1 + (net_benefit / total_assets) return float(max(0, efficiency_ratio)) def _estimate_insurance_return(self) -> float: """Estimate return on insurance investment.""" # Insurance provides value through: # 1. Risk reduction (allows higher leverage) # 2. Stability (better credit terms) # 3. Growth enablement (take more risks) risk_reduction_value = self.optimizer_config.risk_reduction_value stability_value = self.optimizer_config.stability_value growth_enablement = self.optimizer_config.growth_enablement_value return risk_reduction_value + stability_value + growth_enablement def _calculate_ergodic_growth( self, coverage_limit: float, deductible: float, premium_rate: float, time_horizon: int ) -> float: """Calculate ergodic (time-average) growth rate.""" if not self.ergodic_analyzer: return self._estimate_growth_rate( coverage_limit, deductible, premium_rate, time_horizon ) # Use ergodic analyzer for proper calculation # This is a simplified version ensemble_growth = self._estimate_growth_rate( coverage_limit, deductible, premium_rate, time_horizon ) volatility = self.optimizer_config.assumed_volatility # Insurance reduces volatility # Convert Decimal to float for calculations total_assets = float(self.manufacturer.total_assets) coverage_ratio = coverage_limit / total_assets volatility_reduction = coverage_ratio * self.optimizer_config.volatility_reduction_factor adjusted_volatility = max( self.optimizer_config.min_volatility, volatility - volatility_reduction ) # Ergodic correction for multiplicative process with reduced volatility ergodic_growth = ensemble_growth - 0.5 * adjusted_volatility**2 return float(ergodic_growth) def _categorize_horizon(self, years: int) -> str: """Categorize time horizon.""" if years <= 1: return "Short-term" if years <= 3: return "Medium-term" if years <= 10: return "Long-term" return "Strategic" def _evaluate_objective( self, objective_name: str, coverage_limit: float, deductible: float, premium_rate: float, time_horizon: int, ) -> float: """Evaluate a specific objective.""" if objective_name.lower() == "roe": return self._simulate_roe(coverage_limit, deductible, premium_rate, time_horizon) if objective_name.lower() == "bankruptcy_risk": return self._estimate_bankruptcy_risk( coverage_limit, deductible, premium_rate, time_horizon ) if objective_name.lower() == "growth_rate": return self._estimate_growth_rate( coverage_limit, deductible, premium_rate, time_horizon ) if objective_name.lower() == "capital_efficiency": return self._calculate_capital_efficiency(coverage_limit, deductible, premium_rate) self.logger.warning(f"Unknown objective: {objective_name}") return 0 def _build_constraint_list( self, objectives: List[BusinessObjective], constraints: BusinessConstraints, time_horizon: int, ) -> List[Dict]: """Build constraint list for optimization.""" constraint_list = [] # Business constraints def roe_constraint(x): roe = self._simulate_roe(x[0], x[1], x[2], time_horizon) return roe - constraints.min_roe_threshold constraint_list.append({"type": "ineq", "fun": roe_constraint}) def risk_constraint(x): risk = self._estimate_bankruptcy_risk(x[0], x[1], x[2], time_horizon) return constraints.max_risk_tolerance - risk constraint_list.append({"type": "ineq", "fun": risk_constraint}) def premium_constraint(x): annual_premium = x[0] * x[2] max_premium = ( float(self.manufacturer.calculate_revenue()) * constraints.max_premium_budget ) return max_premium - annual_premium constraint_list.append({"type": "ineq", "fun": premium_constraint}) # Objective-specific constraints for obj in objectives: if obj.constraint_type and obj.constraint_value is not None: def obj_constraint(x, obj=obj): value = self._evaluate_objective(obj.name, x[0], x[1], x[2], time_horizon) if obj.constraint_type == ">=": return value - obj.constraint_value if obj.constraint_type == "<=": return obj.constraint_value - value # '==' return abs(value - obj.constraint_value) - 0.001 constraint_list.append( {"type": "ineq" if obj.constraint_type != "==" else "eq", "fun": obj_constraint} ) return constraint_list def _check_constraints( self, coverage_limit: float, deductible: float, premium_rate: float, constraints: BusinessConstraints, time_horizon: int, ) -> Dict[str, bool]: """Check if constraints are satisfied.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) equity = float(self.manufacturer.equity) revenue = float(self.manufacturer.calculate_revenue()) satisfaction = {} # ROE constraint roe = self._simulate_roe(coverage_limit, deductible, premium_rate, time_horizon) satisfaction["min_roe"] = roe >= constraints.min_roe_threshold # Risk constraint risk = self._estimate_bankruptcy_risk( coverage_limit, deductible, premium_rate, time_horizon ) satisfaction["max_risk"] = risk <= constraints.max_risk_tolerance # Premium budget constraint annual_premium = coverage_limit * premium_rate max_premium = revenue * constraints.max_premium_budget satisfaction["premium_budget"] = annual_premium <= max_premium # Coverage ratio constraint coverage_ratio = coverage_limit / total_assets satisfaction["min_coverage"] = coverage_ratio >= constraints.min_coverage_ratio # Leverage constraint (simplified) liabilities = total_assets - equity leverage = liabilities / (equity + 1e-6) satisfaction["max_leverage"] = leverage <= constraints.max_leverage_ratio return satisfaction def _perform_sensitivity_analysis( self, coverage_limit: float, deductible: float, premium_rate: float, objectives: List[BusinessObjective], time_horizon: int, ) -> Dict[str, float]: """Perform sensitivity analysis on key parameters.""" sensitivity = {} delta = 0.01 # 1% change # Base objective value base_value = sum( obj.weight * self._evaluate_objective( obj.name, coverage_limit, deductible, premium_rate, time_horizon ) for obj in objectives ) # Coverage limit sensitivity coverage_delta = coverage_limit * delta value_up = sum( obj.weight * self._evaluate_objective( obj.name, coverage_limit + coverage_delta, deductible, premium_rate, time_horizon ) for obj in objectives ) sensitivity["coverage_limit"] = (value_up - base_value) / (coverage_delta + 1e-6) # Deductible sensitivity deductible_delta = max(1000, deductible * delta) value_up = sum( obj.weight * self._evaluate_objective( obj.name, coverage_limit, deductible + deductible_delta, premium_rate, time_horizon ) for obj in objectives ) sensitivity["deductible"] = (value_up - base_value) / (deductible_delta + 1e-6) # Premium rate sensitivity premium_delta = premium_rate * delta value_up = sum( obj.weight * self._evaluate_objective( obj.name, coverage_limit, deductible, premium_rate + premium_delta, time_horizon ) for obj in objectives ) sensitivity["premium_rate"] = (value_up - base_value) / (premium_delta + 1e-6) return sensitivity def _generate_roe_recommendations( self, coverage_limit: float, deductible: float, premium_rate: float, expected_roe: float ) -> List[str]: """Generate ROE-focused recommendations.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) recommendations = [] if expected_roe > 0.20: recommendations.append( "Excellent ROE achieved - consider increasing growth investments" ) elif expected_roe > 0.15: recommendations.append("Strong ROE performance - maintain current strategy") else: recommendations.append( "ROE below target - review premium costs and coverage efficiency" ) if premium_rate > 0.05: recommendations.append( "High premium rate - negotiate better terms or consider alternatives" ) if deductible < 50000: recommendations.append( "Low deductible may be increasing costs - consider higher retention" ) elif deductible > 500000: recommendations.append( "High deductible exposes significant risk - evaluate coverage gap" ) coverage_ratio = coverage_limit / total_assets if coverage_ratio < 0.5: recommendations.append("Coverage may be insufficient for major losses") elif coverage_ratio > 1.5: recommendations.append("Consider if coverage exceeds actual exposure") return recommendations def _generate_risk_recommendations( self, coverage_limit: float, deductible: float, premium_rate: float, bankruptcy_risk: float ) -> List[str]: """Generate risk-focused recommendations.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) revenue = float(self.manufacturer.calculate_revenue()) recommendations = [] if bankruptcy_risk < 0.001: recommendations.append("Excellent risk profile - can support aggressive growth") elif bankruptcy_risk < 0.01: recommendations.append("Risk well-controlled - current insurance adequate") else: recommendations.append( "Elevated bankruptcy risk - increase coverage or reduce leverage" ) if coverage_limit < total_assets * 0.5: recommendations.append("Coverage may be insufficient for tail risks") if premium_rate * coverage_limit > revenue * 0.03: recommendations.append("Insurance costs exceeding 3% of revenue - review cost-benefit") return recommendations def _generate_comprehensive_recommendations( self, coverage_limit: float, deductible: float, premium_rate: float, objective_values: Dict[str, float], ) -> List[str]: """Generate comprehensive recommendations based on all metrics.""" recommendations = [] # ROE recommendations roe = objective_values.get("ROE", objective_values.get("roe", 0)) if roe > 0: if roe > 0.20: recommendations.append("Exceptional ROE - leverage success for expansion") elif roe < 0.10: recommendations.append("ROE below industry standards - optimize capital structure") # Risk recommendations risk = objective_values.get("bankruptcy_risk", 0) if risk > 0.02: recommendations.append("High bankruptcy risk - prioritize risk mitigation") elif risk < 0.005: recommendations.append("Conservative risk profile - opportunity for higher returns") # Growth recommendations growth = objective_values.get("growth_rate", 0) if growth > 0.15: recommendations.append("Strong growth trajectory - ensure adequate risk controls") elif growth < 0.05: recommendations.append("Low growth - consider strategic initiatives") # Insurance structure recommendations # Convert Decimal properties to float for calculations revenue = float(self.manufacturer.calculate_revenue()) total_assets = float(self.manufacturer.total_assets) annual_premium = coverage_limit * premium_rate premium_to_revenue = annual_premium / revenue if revenue > 0 else 0 if premium_to_revenue > 0.04: recommendations.append("Premium costs high - explore alternative risk financing") elif premium_to_revenue < 0.01: recommendations.append("Low insurance spend - verify adequate protection") # Deductible recommendations deductible_to_assets = deductible / total_assets if total_assets > 0 else 0 if deductible_to_assets > 0.05: recommendations.append( "High deductible relative to assets - monitor retention capacity" ) return recommendations[:5] # Limit to top 5 recommendations