Source code for ergodic_insurance.business_optimizer

"""Business outcome optimization algorithms for insurance decisions.

This module implements sophisticated optimization algorithms focused on real business
outcomes (ROE, growth rate, survival probability) rather than technical metrics.
These algorithms maximize long-term company value through optimal insurance decisions.

Author: Alex Filiakov
Date: 2025-01-25
"""

# pylint: disable=too-many-lines

from dataclasses import dataclass, field
from enum import Enum
import logging
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd
from scipy import optimize

from .config import BusinessOptimizerConfig
from .decision_engine import InsuranceDecisionEngine
from .ergodic_analyzer import ErgodicAnalyzer
from .gpu_backend import GPUConfig, is_gpu_available
from .loss_distributions import LossDistribution
from .manufacturer import WidgetManufacturer

logger = logging.getLogger(__name__)


[docs] class OptimizationDirection(Enum): """Direction of optimization for objectives.""" MAXIMIZE = "maximize" MINIMIZE = "minimize"
[docs] @dataclass class BusinessObjective: """Business optimization objective definition. Attributes: name: Name of the objective (e.g., 'ROE', 'bankruptcy_risk') weight: Weight in multi-objective optimization (0-1) target_value: Optional target value for the objective optimization_direction: Whether to maximize or minimize constraint_type: Optional constraint type ('>=', '<=', '==') constraint_value: Optional constraint value """ name: str weight: float = 1.0 target_value: Optional[float] = None optimization_direction: OptimizationDirection = OptimizationDirection.MAXIMIZE constraint_type: Optional[str] = None constraint_value: Optional[float] = None
[docs] def __post_init__(self): """Validate objective configuration.""" if not 0 <= self.weight <= 1: raise ValueError(f"Weight must be between 0 and 1, got {self.weight}") if self.constraint_type and self.constraint_type not in [">=", "<=", "=="]: raise ValueError(f"Invalid constraint type: {self.constraint_type}")
[docs] @dataclass class BusinessConstraints: """Business optimization constraints. Attributes: max_risk_tolerance: Maximum acceptable probability of bankruptcy min_roe_threshold: Minimum required return on equity max_leverage_ratio: Maximum debt-to-equity ratio min_liquidity_ratio: Minimum liquidity requirements max_premium_budget: Maximum insurance premium as % of revenue min_coverage_ratio: Minimum coverage as % of assets regulatory_requirements: Additional regulatory constraints """ max_risk_tolerance: float = 0.01 # 1% bankruptcy risk min_roe_threshold: float = 0.10 # 10% minimum ROE max_leverage_ratio: float = 2.0 # 2:1 debt-to-equity min_liquidity_ratio: float = 1.2 # 1.2x current ratio max_premium_budget: float = 0.02 # 2% of revenue min_coverage_ratio: float = 0.5 # 50% of assets regulatory_requirements: Dict[str, float] = field(default_factory=dict)
[docs] def __post_init__(self): """Validate constraint values.""" if self.max_risk_tolerance < 0 or self.max_risk_tolerance > 1: raise ValueError("Risk tolerance must be between 0 and 1") if self.min_roe_threshold < 0: raise ValueError("ROE threshold must be non-negative") if self.max_leverage_ratio < 0: raise ValueError("Leverage ratio must be non-negative") if self.min_liquidity_ratio < 0: raise ValueError("Liquidity ratio must be non-negative")
[docs] @dataclass class OptimalStrategy: """Optimal insurance strategy result. Attributes: coverage_limit: Optimal coverage limit amount deductible: Optimal deductible amount premium_rate: Optimal premium rate expected_roe: Expected ROE with this strategy bankruptcy_risk: Probability of bankruptcy growth_rate: Expected growth rate capital_efficiency: Capital efficiency ratio recommendations: List of actionable recommendations """ coverage_limit: float deductible: float premium_rate: float expected_roe: float bankruptcy_risk: float growth_rate: float capital_efficiency: float recommendations: List[str] = field(default_factory=list)
[docs] def to_dict(self) -> Dict[str, Union[float, List[str]]]: """Convert to dictionary for serialization.""" return { "coverage_limit": self.coverage_limit, "deductible": self.deductible, "premium_rate": self.premium_rate, "expected_roe": self.expected_roe, "bankruptcy_risk": self.bankruptcy_risk, "growth_rate": self.growth_rate, "capital_efficiency": self.capital_efficiency, "recommendations": self.recommendations, }
[docs] @dataclass class BusinessOptimizationResult: """Result of business outcome optimization. Attributes: optimal_strategy: The optimal insurance strategy objective_values: Values achieved for each objective constraint_satisfaction: Status of constraint satisfaction convergence_info: Optimization convergence information sensitivity_analysis: Sensitivity to parameter changes """ optimal_strategy: OptimalStrategy objective_values: Dict[str, float] constraint_satisfaction: Dict[str, bool] convergence_info: Dict[str, Union[bool, int, float]] sensitivity_analysis: Optional[Dict[str, float]] = None
[docs] def is_feasible(self) -> bool: """Check if all constraints are satisfied.""" return all(self.constraint_satisfaction.values())
[docs] class BusinessOptimizer: """Optimize business outcomes through insurance decisions. This class implements sophisticated optimization algorithms focused on real business metrics like ROE, growth rate, and survival probability. """ def __init__( self, manufacturer: WidgetManufacturer, decision_engine: Optional[InsuranceDecisionEngine] = None, ergodic_analyzer: Optional[ErgodicAnalyzer] = None, loss_distribution: Optional[LossDistribution] = None, optimizer_config: Optional[BusinessOptimizerConfig] = None, gpu_config: Optional[GPUConfig] = None, ): """Initialize business optimizer. Args: manufacturer: Widget manufacturer model decision_engine: Insurance decision engine (optional) ergodic_analyzer: Ergodic analysis tools (optional) loss_distribution: Loss distribution model (optional) optimizer_config: Configuration for optimizer heuristic parameters (optional). If None, uses default BusinessOptimizerConfig values. gpu_config: GPU acceleration configuration (optional). If None, GPU acceleration is not used. """ self.manufacturer = manufacturer self.optimizer_config = optimizer_config or BusinessOptimizerConfig() self.gpu_config = gpu_config self._gpu_batch_objective: Optional[Any] = None # lazy init # Create default loss distribution if not provided if loss_distribution is None: from .loss_distributions import LognormalLoss loss_distribution = LognormalLoss(mean=100000, cv=1.5) self.loss_distribution = loss_distribution self.decision_engine = decision_engine or InsuranceDecisionEngine( manufacturer, loss_distribution ) self.ergodic_analyzer = ergodic_analyzer self.logger = logging.getLogger(self.__class__.__name__) def _get_gpu_batch_objective(self): """Lazily initialize GPU batch objective evaluator.""" if self._gpu_batch_objective is None: use_gpu = self.gpu_config is not None and self.gpu_config.enabled and is_gpu_available() if use_gpu or self.gpu_config is not None: from .gpu_objective import GPUBatchObjective self._gpu_batch_objective = GPUBatchObjective( equity=float(self.manufacturer.equity), total_assets=float(self.manufacturer.total_assets), revenue=float(self.manufacturer.calculate_revenue()), optimizer_config=self.optimizer_config, gpu_config=self.gpu_config, ) return self._gpu_batch_objective
[docs] def with_manufacturer(self, manufacturer: WidgetManufacturer) -> "BusinessOptimizer": """Create a lightweight optimizer for a different manufacturer. Reuses the optimizer config, loss distribution, decision engine, and ergodic analyzer from this instance, avoiding the overhead of reconstructing shared components (YAML I/O, engine initialization). The decision engine retains a reference to the original manufacturer, which is correct because methods like maximize_roe_with_insurance read directly from self.manufacturer rather than through the decision engine. Args: manufacturer: New manufacturer to optimize for. Returns: A new BusinessOptimizer sharing this instance's internal components. """ return BusinessOptimizer( manufacturer=manufacturer, decision_engine=self.decision_engine, ergodic_analyzer=self.ergodic_analyzer, loss_distribution=self.loss_distribution, optimizer_config=self.optimizer_config, gpu_config=self.gpu_config, )
[docs] def maximize_roe_with_insurance( self, constraints: BusinessConstraints, time_horizon: int = 10, n_simulations: int = 1000 ) -> OptimalStrategy: """Maximize ROE subject to business constraints. Objective: max(ROE_with_insurance - ROE_baseline) Args: constraints: Business constraints to satisfy time_horizon: Planning horizon in years n_simulations: Number of Monte Carlo simulations Returns: Optimal insurance strategy maximizing ROE """ self.logger.info( f"Maximizing ROE over {time_horizon} years with {n_simulations} simulations" ) # Boundary: float for scipy.optimize total_assets = float(self.manufacturer.total_assets) revenue = float(self.manufacturer.calculate_revenue()) # Define optimization bounds bounds = [ (1e6, min(total_assets * 2, 100e6)), # Coverage limit (0, 1e6), # Deductible (0.001, 0.10), # Premium rate (0.1% to 10%) ] # Define objective function def objective(x): coverage_limit, deductible, premium_rate = x # Simulate with insurance roe_with_insurance = self._simulate_roe( coverage_limit=coverage_limit, deductible=deductible, premium_rate=premium_rate, time_horizon=time_horizon, n_simulations=n_simulations, ) # Return negative ROE for minimization return -roe_with_insurance # Define constraints constraint_list = [] # Premium budget constraint def premium_constraint(x): _, _, premium_rate = x coverage_limit = x[0] annual_premium = coverage_limit * premium_rate max_premium = revenue * constraints.max_premium_budget return max_premium - annual_premium constraint_list.append({"type": "ineq", "fun": premium_constraint}) # Bankruptcy risk constraint def risk_constraint(x): bankruptcy_risk = self._estimate_bankruptcy_risk( coverage_limit=x[0], deductible=x[1], premium_rate=x[2], time_horizon=time_horizon ) return constraints.max_risk_tolerance - bankruptcy_risk constraint_list.append({"type": "ineq", "fun": risk_constraint}) # Coverage ratio constraint def coverage_constraint(x): coverage_limit = x[0] min_coverage = total_assets * constraints.min_coverage_ratio return coverage_limit - min_coverage constraint_list.append({"type": "ineq", "fun": coverage_constraint}) # Initial guess x0 = [ total_assets * 0.8, # 80% of assets 100000, # $100k deductible 0.02, # 2% premium rate ] # Run optimization result = optimize.minimize( objective, x0, method="SLSQP", bounds=bounds, constraints=constraint_list, options={"maxiter": 100, "ftol": 1e-6}, ) if not result.success: self.logger.warning(f"Optimization did not converge: {result.message}") # Extract optimal values optimal_coverage, optimal_deductible, optimal_premium = result.x optimal_roe = -result.fun # Calculate additional metrics bankruptcy_risk = self._estimate_bankruptcy_risk( optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) growth_rate = self._estimate_growth_rate( optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) capital_efficiency = self._calculate_capital_efficiency( optimal_coverage, optimal_deductible, optimal_premium ) # Generate recommendations recommendations = self._generate_roe_recommendations( optimal_coverage, optimal_deductible, optimal_premium, optimal_roe ) return OptimalStrategy( coverage_limit=optimal_coverage, deductible=optimal_deductible, premium_rate=optimal_premium, expected_roe=optimal_roe, bankruptcy_risk=bankruptcy_risk, growth_rate=growth_rate, capital_efficiency=capital_efficiency, recommendations=recommendations, )
[docs] def maximize_roe_gpu( self, constraints: BusinessConstraints, time_horizon: int = 10, n_simulations: int = 1000, method: str = "scipy", n_starts: int = 10, top_k: int = 5, de_pop_size: int = 50, de_generations: int = 100, ) -> OptimalStrategy: """GPU-accelerated ROE maximization with batched objective evaluation. Uses GPU-batched evaluation for faster optimization. Falls back to CPU-based maximize_roe_with_insurance if GPU is unavailable. Args: constraints: Business constraints to satisfy time_horizon: Planning horizon in years n_simulations: Number of Monte Carlo simulations per evaluation method: Optimization method - 'scipy' (SLSQP with GPU gradient), 'multi_start' (GPU-screened multi-start), or 'de' (GPU differential evolution) n_starts: Number of starting points for multi-start method top_k: Number of top starting points to optimize de_pop_size: Population size for differential evolution de_generations: Number of generations for DE Returns: Optimal insurance strategy maximizing ROE Since: Version 0.11.0 (Issue #966) """ batch_obj = self._get_gpu_batch_objective() if batch_obj is None: self.logger.info("GPU batch objective unavailable, falling back to CPU") return self.maximize_roe_with_insurance(constraints, time_horizon, n_simulations) from .gpu_objective import ( GPUDifferentialEvolution, GPUMultiStartScreener, GPUObjectiveWrapper, ) self.logger.info( f"GPU-accelerated ROE maximization: method={method}, " f"horizon={time_horizon}y, sims={n_simulations}" ) total_assets = float(self.manufacturer.total_assets) revenue = float(self.manufacturer.calculate_revenue()) bounds = [ (1e6, min(total_assets * 2, 100e6)), (0, 1e6), (0.001, 0.10), ] # Build constraint functions constraint_list = [] def premium_constraint(x): _, _, premium_rate = x coverage_limit = x[0] annual_premium = coverage_limit * premium_rate max_premium = revenue * constraints.max_premium_budget return max_premium - annual_premium constraint_list.append({"type": "ineq", "fun": premium_constraint}) def risk_constraint(x): bankruptcy_risk = self._estimate_bankruptcy_risk( coverage_limit=x[0], deductible=x[1], premium_rate=x[2], time_horizon=time_horizon, ) return constraints.max_risk_tolerance - bankruptcy_risk constraint_list.append({"type": "ineq", "fun": risk_constraint}) def coverage_constraint(x): return x[0] - total_assets * constraints.min_coverage_ratio constraint_list.append({"type": "ineq", "fun": coverage_constraint}) if method == "de": de_optimizer = GPUDifferentialEvolution( batch_objective=batch_obj, bounds=bounds, objective_name="roe", time_horizon=time_horizon, n_simulations=n_simulations, seed=self.optimizer_config.seed, ) result = de_optimizer.optimize( pop_size=de_pop_size, n_generations=de_generations, ) else: wrapper = GPUObjectiveWrapper( batch_objective=batch_obj, objective_name="roe", time_horizon=time_horizon, n_simulations=n_simulations, ) if method == "multi_start": screener = GPUMultiStartScreener( batch_objective=batch_obj, objective_name="roe", time_horizon=time_horizon, n_simulations=n_simulations, ) rng = np.random.default_rng(self.optimizer_config.seed) all_starts = [ np.array( [ rng.uniform(bounds[0][0], bounds[0][1]), rng.uniform(bounds[1][0], bounds[1][1]), rng.uniform(bounds[2][0], bounds[2][1]), ] ) for _ in range(n_starts) ] best_starts = screener.screen_starting_points(np.array(all_starts), top_k=top_k) best_result = None for start in best_starts: try: res = optimize.minimize( wrapper, start, method="SLSQP", jac=wrapper.gradient, bounds=bounds, constraints=constraint_list, options={"maxiter": 100, "ftol": 1e-6}, ) if best_result is None or res.fun < best_result.fun: best_result = res except (ValueError, RuntimeError) as e: self.logger.warning(f"Multi-start attempt failed: {e}") continue if best_result is None: self.logger.warning("All multi-start attempts failed, falling back to CPU") return self.maximize_roe_with_insurance( constraints, time_horizon, n_simulations ) result = best_result else: # Default scipy with GPU-batched gradient x0 = np.array([total_assets * 0.8, 100000, 0.02]) result = optimize.minimize( wrapper, x0, method="SLSQP", jac=wrapper.gradient, bounds=bounds, constraints=constraint_list, options={"maxiter": 100, "ftol": 1e-6}, ) if not result.success: self.logger.warning(f"GPU optimization did not converge: {result.message}") optimal_coverage, optimal_deductible, optimal_premium = result.x optimal_roe = -result.fun if method != "de" else -result.fun bankruptcy_risk = self._estimate_bankruptcy_risk( optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) growth_rate = self._estimate_growth_rate( optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) capital_efficiency = self._calculate_capital_efficiency( optimal_coverage, optimal_deductible, optimal_premium ) recommendations = self._generate_roe_recommendations( optimal_coverage, optimal_deductible, optimal_premium, optimal_roe ) return OptimalStrategy( coverage_limit=optimal_coverage, deductible=optimal_deductible, premium_rate=optimal_premium, expected_roe=optimal_roe, bankruptcy_risk=bankruptcy_risk, growth_rate=growth_rate, capital_efficiency=capital_efficiency, recommendations=recommendations, )
[docs] def minimize_bankruptcy_risk( self, growth_targets: Dict[str, float], budget_constraint: float, time_horizon: int = 10 ) -> OptimalStrategy: # pylint: disable=too-many-locals """Minimize bankruptcy risk while achieving growth targets. Objective: min(P(bankruptcy)) Args: growth_targets: Target growth rates (e.g., {'revenue': 0.15, 'assets': 0.10}) budget_constraint: Maximum premium budget time_horizon: Planning horizon in years Returns: Risk-minimizing insurance strategy """ self.logger.info(f"Minimizing bankruptcy risk over {time_horizon} years") # Define optimization bounds total_assets = float(self.manufacturer.total_assets) bounds = [ (1e6, min(total_assets * 3, 150e6)), # Coverage limit (0, 500000), # Deductible (0.001, 0.15), # Premium rate (0.1% to 15%) ] # Define objective function (minimize bankruptcy risk) def objective(x): coverage_limit, deductible, premium_rate = x bankruptcy_risk = self._estimate_bankruptcy_risk( coverage_limit, deductible, premium_rate, time_horizon ) return bankruptcy_risk # Define constraints constraint_list = [] # Budget constraint def budget_constraint_fn(x): coverage_limit, _, premium_rate = x annual_premium = coverage_limit * premium_rate return budget_constraint - annual_premium constraint_list.append({"type": "ineq", "fun": budget_constraint_fn}) # Growth target constraints for metric, target in growth_targets.items(): def growth_constraint(x, metric=metric, target=target): growth_rate = self._estimate_growth_rate( x[0], x[1], x[2], time_horizon, metric=metric ) return growth_rate - target constraint_list.append({"type": "ineq", "fun": growth_constraint}) # Initial guess x0 = [ total_assets * 1.5, # 150% of assets 50000, # $50k deductible 0.03, # 3% premium rate ] # Run optimization result = optimize.minimize( objective, x0, method="SLSQP", bounds=bounds, constraints=constraint_list, options={"maxiter": 150, "ftol": 1e-7}, ) if not result.success: self.logger.warning(f"Risk minimization did not converge: {result.message}") # Extract optimal values optimal_coverage, optimal_deductible, optimal_premium = result.x optimal_risk = result.fun # Calculate additional metrics expected_roe = self._simulate_roe( optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) growth_rate = self._estimate_growth_rate( optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) capital_efficiency = self._calculate_capital_efficiency( optimal_coverage, optimal_deductible, optimal_premium ) # Generate recommendations recommendations = self._generate_risk_recommendations( optimal_coverage, optimal_deductible, optimal_premium, optimal_risk ) return OptimalStrategy( coverage_limit=optimal_coverage, deductible=optimal_deductible, premium_rate=optimal_premium, expected_roe=expected_roe, bankruptcy_risk=optimal_risk, growth_rate=growth_rate, capital_efficiency=capital_efficiency, recommendations=recommendations, )
[docs] def optimize_capital_efficiency( self, available_capital: float, investment_opportunities: Dict[str, float] ) -> Dict[str, float]: """Optimize capital allocation across insurance and investments. Args: available_capital: Total capital available for allocation investment_opportunities: Opportunities with expected returns Returns: Optimal capital allocation dictionary """ self.logger.info(f"Optimizing capital efficiency with ${available_capital:,.0f}") # Categories for capital allocation categories = ["insurance_premium", "working_capital", "growth_investment", "cash_reserve"] # Expected returns for each category expected_returns = { "insurance_premium": self._estimate_insurance_return(), "working_capital": 0.12, # Working capital efficiency "growth_investment": investment_opportunities.get("growth", 0.20), "cash_reserve": 0.02, # Minimal return on reserves } # Risk factors for each category risk_factors = { "insurance_premium": 0.05, # Low risk due to protection "working_capital": 0.15, "growth_investment": 0.30, "cash_reserve": 0.01, } # Define optimization problem n_categories = len(categories) # Objective: maximize risk-adjusted return def objective(x): total_return = sum(x[i] * expected_returns[cat] for i, cat in enumerate(categories)) total_risk = np.sqrt( sum((x[i] * risk_factors[cat]) ** 2 for i, cat in enumerate(categories)) ) sharpe_ratio = total_return / (total_risk + 1e-6) # Risk-adjusted return return -sharpe_ratio # Negative for minimization # Constraints constraints = [ # Sum equals available capital {"type": "eq", "fun": lambda x: sum(x) - available_capital}, # Minimum insurance allocation (1% of capital) {"type": "ineq", "fun": lambda x: x[0] - 0.01 * available_capital}, # Minimum working capital (15% of capital) {"type": "ineq", "fun": lambda x: x[1] - 0.15 * available_capital}, # Minimum cash reserve (5% of capital) {"type": "ineq", "fun": lambda x: x[3] - 0.05 * available_capital}, ] # Bounds (all non-negative, up to full capital) bounds = [(0, available_capital) for _ in range(n_categories)] # Initial guess (equal allocation) x0 = [available_capital / n_categories] * n_categories # Run optimization result = optimize.minimize( objective, x0, method="SLSQP", bounds=bounds, constraints=constraints, options={"maxiter": 100}, ) if not result.success: self.logger.warning("Capital allocation optimization did not fully converge") # Create allocation dictionary allocation = {cat: result.x[i] for i, cat in enumerate(categories)} # Add efficiency metrics allocation["expected_return"] = sum( allocation[cat] * expected_returns[cat] for cat in categories ) allocation["risk_level"] = np.sqrt( sum((allocation[cat] * risk_factors[cat]) ** 2 for cat in categories) ) allocation["sharpe_ratio"] = -result.fun return allocation
[docs] def analyze_time_horizon_impact( self, strategies: List[Dict[str, Any]], time_horizons: Optional[List[int]] = None ) -> pd.DataFrame: """Analyze strategy performance across different time horizons. Args: strategies: List of strategy parameters time_horizons: List of time horizons to analyze Returns: DataFrame with performance metrics by time horizon """ if time_horizons is None: time_horizons = [1, 3, 10, 30] # Default horizons self.logger.info( f"Analyzing {len(strategies)} strategies across {len(time_horizons)} time horizons" ) results = [] for strategy in strategies: coverage_limit = strategy.get( "coverage_limit", float(self.manufacturer.total_assets) ) # Boundary: float for scipy.optimize deductible = strategy.get("deductible", 100000) premium_rate = strategy.get("premium_rate", 0.02) strategy_name = strategy.get("name", "Strategy") for horizon in time_horizons: # Calculate metrics for this combination roe = self._simulate_roe(coverage_limit, deductible, premium_rate, horizon) bankruptcy_risk = self._estimate_bankruptcy_risk( coverage_limit, deductible, premium_rate, horizon ) growth_rate = self._estimate_growth_rate( coverage_limit, deductible, premium_rate, horizon ) # Calculate ergodic vs ensemble difference if analyzer available ergodic_diff: float = 0.0 if self.ergodic_analyzer and horizon >= 10: ergodic_growth = self._calculate_ergodic_growth( coverage_limit, deductible, premium_rate, horizon ) ensemble_growth = growth_rate ergodic_diff = ergodic_growth - ensemble_growth results.append( { "strategy": strategy_name, "horizon_years": horizon, "coverage_limit": coverage_limit, "deductible": deductible, "premium_rate": premium_rate, "expected_roe": roe, "bankruptcy_risk": bankruptcy_risk, "growth_rate": growth_rate, "ergodic_difference": ergodic_diff, "horizon_category": self._categorize_horizon(horizon), } ) df = pd.DataFrame(results) # Add relative performance metrics for horizon in time_horizons: mask = df["horizon_years"] == horizon df.loc[mask, "roe_rank"] = df.loc[mask, "expected_roe"].rank(ascending=False) df.loc[mask, "risk_rank"] = df.loc[mask, "bankruptcy_risk"].rank(ascending=True) return df
[docs] def optimize_business_outcomes( self, objectives: List[BusinessObjective], constraints: BusinessConstraints, time_horizon: int = 10, method: str = "weighted_sum", ) -> BusinessOptimizationResult: # pylint: disable=too-many-locals """Multi-objective optimization of business outcomes. Args: objectives: List of business objectives to optimize constraints: Business constraints to satisfy time_horizon: Planning horizon in years method: Optimization method ('weighted_sum', 'epsilon_constraint', 'pareto') Returns: Comprehensive optimization result """ self.logger.info(f"Optimizing {len(objectives)} objectives using {method} method") # Normalize weights on local copies to avoid mutating caller's objects total_weight = sum(obj.weight for obj in objectives) if total_weight > 0: normalized_weights = [obj.weight / total_weight for obj in objectives] else: normalized_weights = [obj.weight for obj in objectives] # Define optimization bounds # Boundary: float for scipy.optimize total_assets = float(self.manufacturer.total_assets) bounds = [ (1e6, min(total_assets * 2.5, 100e6)), # Coverage limit (0, 500000), # Deductible (0.001, 0.10), # Premium rate ] # Build composite objective function def composite_objective(x): coverage_limit, deductible, premium_rate = x total_score = 0.0 for i, obj in enumerate(objectives): value = self._evaluate_objective( obj.name, coverage_limit, deductible, premium_rate, time_horizon ) # Normalize and apply direction if obj.optimization_direction == OptimizationDirection.MAXIMIZE: score = value # Higher is better else: score = -value # Lower is better (negate for minimization) total_score = total_score + normalized_weights[i] * score return -total_score # Negative for scipy minimization # Build constraints list constraint_list = self._build_constraint_list(objectives, constraints, time_horizon) # Initial guess x0 = [total_assets * 1.0, 100000, 0.025] # Run optimization result = optimize.minimize( composite_objective, x0, method="SLSQP", bounds=bounds, constraints=constraint_list, options={"maxiter": 200, "ftol": 1e-6}, ) # Extract results optimal_coverage, optimal_deductible, optimal_premium = result.x # Calculate all objective values objective_values = {} for obj in objectives: value = self._evaluate_objective( obj.name, optimal_coverage, optimal_deductible, optimal_premium, time_horizon ) objective_values[obj.name] = value # Check constraint satisfaction constraint_satisfaction = self._check_constraints( optimal_coverage, optimal_deductible, optimal_premium, constraints, time_horizon ) # Perform sensitivity analysis sensitivity = self._perform_sensitivity_analysis( optimal_coverage, optimal_deductible, optimal_premium, objectives, time_horizon, normalized_weights=normalized_weights, ) # Create optimal strategy optimal_strategy = OptimalStrategy( coverage_limit=optimal_coverage, deductible=optimal_deductible, premium_rate=optimal_premium, expected_roe=objective_values.get("ROE", 0), bankruptcy_risk=objective_values.get("bankruptcy_risk", 0), growth_rate=objective_values.get("growth_rate", 0), capital_efficiency=self._calculate_capital_efficiency( optimal_coverage, optimal_deductible, optimal_premium ), recommendations=self._generate_comprehensive_recommendations( optimal_coverage, optimal_deductible, optimal_premium, objective_values ), ) # Build convergence info convergence_info = { "converged": result.success, "iterations": result.nit if hasattr(result, "nit") else 0, "function_value": result.fun, "message": result.message if hasattr(result, "message") else "Optimization complete", } return BusinessOptimizationResult( optimal_strategy=optimal_strategy, objective_values=objective_values, constraint_satisfaction=constraint_satisfaction, convergence_info=convergence_info, sensitivity_analysis=sensitivity, )
# Private helper methods def _deductible_ratio(self, deductible: float, coverage_limit: float) -> float: """Return the fraction of coverage retained by the insured (0-1).""" return min(deductible / coverage_limit, 1.0) if coverage_limit > 0 else 0.0 def _simulate_roe( self, coverage_limit: float, deductible: float, premium_rate: float, time_horizon: int, n_simulations: int = 100, ) -> float: """Simulate ROE with given insurance parameters.""" rng = np.random.default_rng(self.optimizer_config.seed) roe_values = [] # Boundary: float for scipy.optimize equity = float(self.manufacturer.equity) total_assets = float(self.manufacturer.total_assets) # Higher deductible reduces effective premium (insurer covers less) ded_ratio = self._deductible_ratio(deductible, coverage_limit) annual_premium = coverage_limit * premium_rate * (1 - ded_ratio) for _ in range(min(n_simulations, 100)): # Limit for performance # Simple ROE simulation base_roe = self.optimizer_config.base_roe # Insurance impact premium_cost = annual_premium / equity protection_benefit = self.optimizer_config.protection_benefit_factor * ( coverage_limit / total_assets ) # Deductible reduces effective protection (business retains losses up to deductible) retained_loss_drag = ( deductible / equity * self.optimizer_config.protection_benefit_factor ) # uses raw deductible, not ded_ratio — intentional (absolute retained loss) # Adjust ROE adjusted_roe = base_roe - premium_cost + protection_benefit - retained_loss_drag # Add randomness adjusted_roe *= rng.normal(1.0, self.optimizer_config.roe_noise_std) roe_values.append(adjusted_roe) return float(np.mean(roe_values)) def _estimate_bankruptcy_risk( self, coverage_limit: float, deductible: float, premium_rate: float, time_horizon: int ) -> float: """Estimate probability of bankruptcy.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) revenue = float(self.manufacturer.calculate_revenue()) # Higher deductible reduces effective premium ded_ratio = self._deductible_ratio(deductible, coverage_limit) annual_premium = coverage_limit * premium_rate * (1 - ded_ratio) # Simple bankruptcy risk model base_risk = self.optimizer_config.base_bankruptcy_risk # Insurance reduces risk, but deductible creates a coverage gap coverage_ratio = coverage_limit / total_assets effective_coverage_ratio = coverage_ratio * (1 - ded_ratio) risk_reduction = min( effective_coverage_ratio * self.optimizer_config.max_risk_reduction, self.optimizer_config.max_risk_reduction, ) # Deductible adds retained-loss risk (higher deductible = more retained risk) retained_risk = (deductible / total_assets) * self.optimizer_config.max_risk_reduction # Premium cost increases risk slightly premium_burden = annual_premium / revenue risk_increase = premium_burden * self.optimizer_config.premium_burden_risk_factor # Time horizon effect time_factor = 1 - np.exp( -time_horizon / self.optimizer_config.time_risk_constant ) # Risk increases with time bankruptcy_risk = (base_risk - risk_reduction + retained_risk + risk_increase) * time_factor return float(max(0, min(1, bankruptcy_risk))) def _estimate_growth_rate( self, coverage_limit: float, deductible: float, premium_rate: float, time_horizon: int, metric: str = "revenue", ) -> float: """Estimate growth rate for given metric.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) revenue = float(self.manufacturer.calculate_revenue()) # Higher deductible reduces effective premium ded_ratio = self._deductible_ratio(deductible, coverage_limit) annual_premium = coverage_limit * premium_rate * (1 - ded_ratio) # Base growth rate base_growth = self.optimizer_config.base_growth_rate # Insurance enables more aggressive growth (reduced by deductible gap) coverage_ratio = coverage_limit / total_assets effective_coverage_ratio = coverage_ratio * (1 - ded_ratio) growth_boost = effective_coverage_ratio * self.optimizer_config.growth_boost_factor # Premium cost reduces growth (lower with higher deductible) premium_drag = annual_premium / revenue * self.optimizer_config.premium_drag_factor # Retained risk from deductible constrains growth retained_risk_drag = (deductible / total_assets) * self.optimizer_config.growth_boost_factor # Calculate adjusted growth adjusted_growth = base_growth + growth_boost - premium_drag - retained_risk_drag # Adjust for different metrics if metric == "assets": adjusted_growth *= self.optimizer_config.asset_growth_factor elif metric == "equity": adjusted_growth *= self.optimizer_config.equity_growth_factor return float(max(0, adjusted_growth)) def _calculate_capital_efficiency( self, coverage_limit: float, deductible: float, premium_rate: float ) -> float: """Calculate capital efficiency ratio.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) # Higher deductible reduces effective premium ded_ratio = self._deductible_ratio(deductible, coverage_limit) annual_premium = coverage_limit * premium_rate * (1 - ded_ratio) # Capital freed by risk transfer (reduced by deductible retention) risk_transfer_benefit = ( coverage_limit * (1 - ded_ratio) * self.optimizer_config.risk_transfer_benefit_rate ) # Net capital efficiency net_benefit = risk_transfer_benefit - annual_premium efficiency_ratio = 1 + (net_benefit / total_assets) return float(max(0, efficiency_ratio)) def _estimate_insurance_return(self) -> float: """Estimate return on insurance investment.""" # Insurance provides value through: # 1. Risk reduction (allows higher leverage) # 2. Stability (better credit terms) # 3. Growth enablement (take more risks) risk_reduction_value = self.optimizer_config.risk_reduction_value stability_value = self.optimizer_config.stability_value growth_enablement = self.optimizer_config.growth_enablement_value return risk_reduction_value + stability_value + growth_enablement def _calculate_ergodic_growth( self, coverage_limit: float, deductible: float, premium_rate: float, time_horizon: int ) -> float: """Calculate ergodic (time-average) growth rate.""" if not self.ergodic_analyzer: return self._estimate_growth_rate( coverage_limit, deductible, premium_rate, time_horizon ) # Use ergodic analyzer for proper calculation # This is a simplified version ensemble_growth = self._estimate_growth_rate( coverage_limit, deductible, premium_rate, time_horizon ) volatility = self.optimizer_config.assumed_volatility # Insurance reduces volatility (deductible reduces effective coverage) # Convert Decimal to float for calculations total_assets = float(self.manufacturer.total_assets) ded_ratio = self._deductible_ratio(deductible, coverage_limit) effective_coverage_ratio = (coverage_limit / total_assets) * (1 - ded_ratio) volatility_reduction = ( effective_coverage_ratio * self.optimizer_config.volatility_reduction_factor ) adjusted_volatility = max( self.optimizer_config.min_volatility, volatility - volatility_reduction ) # Ergodic correction for multiplicative process with reduced volatility ergodic_growth = ensemble_growth - 0.5 * adjusted_volatility**2 return float(ergodic_growth) def _categorize_horizon(self, years: int) -> str: """Categorize time horizon.""" if years <= 1: return "Short-term" if years <= 3: return "Medium-term" if years <= 10: return "Long-term" return "Strategic" def _evaluate_objective( self, objective_name: str, coverage_limit: float, deductible: float, premium_rate: float, time_horizon: int, ) -> float: """Evaluate a specific objective.""" if objective_name.lower() == "roe": return self._simulate_roe(coverage_limit, deductible, premium_rate, time_horizon) if objective_name.lower() == "bankruptcy_risk": return self._estimate_bankruptcy_risk( coverage_limit, deductible, premium_rate, time_horizon ) if objective_name.lower() == "growth_rate": return self._estimate_growth_rate( coverage_limit, deductible, premium_rate, time_horizon ) if objective_name.lower() == "capital_efficiency": return self._calculate_capital_efficiency(coverage_limit, deductible, premium_rate) self.logger.warning(f"Unknown objective: {objective_name}") return 0 def _build_constraint_list( self, objectives: List[BusinessObjective], constraints: BusinessConstraints, time_horizon: int, ) -> List[Dict]: """Build constraint list for optimization.""" constraint_list = [] # Business constraints def roe_constraint(x): roe = self._simulate_roe(x[0], x[1], x[2], time_horizon) return roe - constraints.min_roe_threshold constraint_list.append({"type": "ineq", "fun": roe_constraint}) def risk_constraint(x): risk = self._estimate_bankruptcy_risk(x[0], x[1], x[2], time_horizon) return constraints.max_risk_tolerance - risk constraint_list.append({"type": "ineq", "fun": risk_constraint}) def premium_constraint(x): ded_ratio = self._deductible_ratio(x[1], x[0]) annual_premium = x[0] * x[2] * (1 - ded_ratio) max_premium = ( float(self.manufacturer.calculate_revenue()) * constraints.max_premium_budget ) return max_premium - annual_premium constraint_list.append({"type": "ineq", "fun": premium_constraint}) # Objective-specific constraints for obj in objectives: if obj.constraint_type and obj.constraint_value is not None: def obj_constraint(x, obj=obj): value = self._evaluate_objective(obj.name, x[0], x[1], x[2], time_horizon) if obj.constraint_type == ">=": return value - obj.constraint_value if obj.constraint_type == "<=": return obj.constraint_value - value # '==' return abs(value - obj.constraint_value) - 0.001 constraint_list.append( {"type": "ineq" if obj.constraint_type != "==" else "eq", "fun": obj_constraint} ) return constraint_list def _check_constraints( self, coverage_limit: float, deductible: float, premium_rate: float, constraints: BusinessConstraints, time_horizon: int, ) -> Dict[str, bool]: """Check if constraints are satisfied.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) equity = float(self.manufacturer.equity) revenue = float(self.manufacturer.calculate_revenue()) satisfaction = {} # ROE constraint roe = self._simulate_roe(coverage_limit, deductible, premium_rate, time_horizon) satisfaction["min_roe"] = roe >= constraints.min_roe_threshold # Risk constraint risk = self._estimate_bankruptcy_risk( coverage_limit, deductible, premium_rate, time_horizon ) satisfaction["max_risk"] = risk <= constraints.max_risk_tolerance # Premium budget constraint annual_premium = coverage_limit * premium_rate max_premium = revenue * constraints.max_premium_budget satisfaction["premium_budget"] = annual_premium <= max_premium # Coverage ratio constraint coverage_ratio = coverage_limit / total_assets satisfaction["min_coverage"] = coverage_ratio >= constraints.min_coverage_ratio # Leverage constraint (simplified) liabilities = total_assets - equity leverage = liabilities / (equity + 1e-6) satisfaction["max_leverage"] = leverage <= constraints.max_leverage_ratio return satisfaction def _perform_sensitivity_analysis( self, coverage_limit: float, deductible: float, premium_rate: float, objectives: List[BusinessObjective], time_horizon: int, normalized_weights: Optional[List[float]] = None, ) -> Dict[str, float]: """Perform sensitivity analysis on key parameters.""" weights = normalized_weights or [obj.weight for obj in objectives] sensitivity = {} delta = 0.01 # 1% change # Base objective value base_value = sum( w * self._evaluate_objective( obj.name, coverage_limit, deductible, premium_rate, time_horizon ) for w, obj in zip(weights, objectives) ) # Coverage limit sensitivity coverage_delta = coverage_limit * delta value_up = sum( w * self._evaluate_objective( obj.name, coverage_limit + coverage_delta, deductible, premium_rate, time_horizon ) for w, obj in zip(weights, objectives) ) sensitivity["coverage_limit"] = (value_up - base_value) / (coverage_delta + 1e-6) # Deductible sensitivity deductible_delta = max(1000, deductible * delta) value_up = sum( w * self._evaluate_objective( obj.name, coverage_limit, deductible + deductible_delta, premium_rate, time_horizon ) for w, obj in zip(weights, objectives) ) sensitivity["deductible"] = (value_up - base_value) / (deductible_delta + 1e-6) # Premium rate sensitivity premium_delta = premium_rate * delta value_up = sum( w * self._evaluate_objective( obj.name, coverage_limit, deductible, premium_rate + premium_delta, time_horizon ) for w, obj in zip(weights, objectives) ) sensitivity["premium_rate"] = (value_up - base_value) / (premium_delta + 1e-6) return sensitivity def _generate_roe_recommendations( self, coverage_limit: float, deductible: float, premium_rate: float, expected_roe: float ) -> List[str]: """Generate ROE-focused recommendations.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) recommendations = [] if expected_roe > 0.20: recommendations.append( "Excellent ROE achieved - consider increasing growth investments" ) elif expected_roe > 0.15: recommendations.append("Strong ROE performance - maintain current strategy") else: recommendations.append( "ROE below target - review premium costs and coverage efficiency" ) if premium_rate > 0.05: recommendations.append( "High premium rate - negotiate better terms or consider alternatives" ) if deductible < 50000: recommendations.append( "Low deductible may be increasing costs - consider higher retention" ) elif deductible > 500000: recommendations.append( "High deductible exposes significant risk - evaluate coverage gap" ) coverage_ratio = coverage_limit / total_assets if coverage_ratio < 0.5: recommendations.append("Coverage may be insufficient for major losses") elif coverage_ratio > 1.5: recommendations.append("Consider if coverage exceeds actual exposure") return recommendations def _generate_risk_recommendations( self, coverage_limit: float, deductible: float, premium_rate: float, bankruptcy_risk: float ) -> List[str]: """Generate risk-focused recommendations.""" # Convert Decimal properties to float for calculations total_assets = float(self.manufacturer.total_assets) revenue = float(self.manufacturer.calculate_revenue()) recommendations = [] if bankruptcy_risk < 0.001: recommendations.append("Excellent risk profile - can support aggressive growth") elif bankruptcy_risk < 0.01: recommendations.append("Risk well-controlled - current insurance adequate") else: recommendations.append( "Elevated bankruptcy risk - increase coverage or reduce leverage" ) if coverage_limit < total_assets * 0.5: recommendations.append("Coverage may be insufficient for tail risks") if premium_rate * coverage_limit > revenue * 0.03: recommendations.append("Insurance costs exceeding 3% of revenue - review cost-benefit") return recommendations def _generate_comprehensive_recommendations( self, coverage_limit: float, deductible: float, premium_rate: float, objective_values: Dict[str, float], ) -> List[str]: """Generate comprehensive recommendations based on all metrics.""" recommendations = [] # ROE recommendations roe = objective_values.get("ROE", objective_values.get("roe", 0)) if roe > 0: if roe > 0.20: recommendations.append("Exceptional ROE - leverage success for expansion") elif roe < 0.10: recommendations.append("ROE below industry standards - optimize capital structure") # Risk recommendations risk = objective_values.get("bankruptcy_risk", 0) if risk > 0.02: recommendations.append("High bankruptcy risk - prioritize risk mitigation") elif risk < 0.005: recommendations.append("Conservative risk profile - opportunity for higher returns") # Growth recommendations growth = objective_values.get("growth_rate", 0) if growth > 0.15: recommendations.append("Strong growth trajectory - ensure adequate risk controls") elif growth < 0.05: recommendations.append("Low growth - consider strategic initiatives") # Insurance structure recommendations # Convert Decimal properties to float for calculations revenue = float(self.manufacturer.calculate_revenue()) total_assets = float(self.manufacturer.total_assets) annual_premium = coverage_limit * premium_rate premium_to_revenue = annual_premium / revenue if revenue > 0 else 0 if premium_to_revenue > 0.04: recommendations.append("Premium costs high - explore alternative risk financing") elif premium_to_revenue < 0.01: recommendations.append("Low insurance spend - verify adequate protection") # Deductible recommendations deductible_to_assets = deductible / total_assets if total_assets > 0 else 0 if deductible_to_assets > 0.05: recommendations.append( "High deductible relative to assets - monitor retention capacity" ) return recommendations[:5] # Limit to top 5 recommendations