Source code for ergodic_insurance.sensitivity

"""Comprehensive sensitivity analysis tools for insurance optimization.

This module provides tools for analyzing how changes in key parameters affect
optimization results, including one-at-a-time (OAT) analysis, tornado diagrams,
and two-way sensitivity analysis with efficient caching.

Example:
    Basic sensitivity analysis for a single parameter::

        from ergodic_insurance.sensitivity import SensitivityAnalyzer
        from ergodic_insurance.business_optimizer import BusinessOptimizer
        from ergodic_insurance.manufacturer import WidgetManufacturer

        # Setup optimizer
        manufacturer = WidgetManufacturer(initial_assets=10_000_000)
        optimizer = BusinessOptimizer(manufacturer)

        # Run sensitivity analysis
        analyzer = SensitivityAnalyzer(base_config, optimizer)
        result = analyzer.analyze_parameter(
            "frequency",
            param_range=(3, 8),
            n_points=11
        )

        # Generate tornado diagram
        tornado_data = analyzer.create_tornado_diagram(
            parameters=["frequency", "severity_mean", "premium_rate"],
            metric="optimal_roe"
        )

.. versionchanged:: 0.7.0
    Replaced bare ``print()`` warning calls with ``logging.warning()``.
    See :issue:`382`.

Author: Alex Filiakov
Date: 2025-01-29
"""

from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import hashlib
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd

from .safe_pickle import safe_dump, safe_load

logger = logging.getLogger(__name__)


[docs] @dataclass class SensitivityResult: """Results from sensitivity analysis for a single parameter. Attributes: parameter: Name of the parameter being analyzed baseline_value: Original value of the parameter variations: Array of parameter values tested metrics: Dictionary of metric arrays for each variation parameter_path: Nested path to parameter (e.g., "manufacturer.base_operating_margin") units: Optional units for the parameter (e.g., "percentage", "dollars") """ parameter: str baseline_value: float variations: np.ndarray metrics: Dict[str, np.ndarray] parameter_path: Optional[str] = None units: Optional[str] = None
[docs] def calculate_impact(self, metric: str) -> float: """Calculate signed point elasticity of a metric w.r.t. this parameter. Uses central finite differences at the baseline to estimate the derivative, then normalises to a unit-free elasticity:: elasticity = (dM/dP) * (P_baseline / M_baseline) A positive value means increasing the parameter increases the metric; a negative value means increasing the parameter decreases the metric. Args: metric: Name of the metric to calculate impact for Returns: Signed point elasticity at the baseline Raises: KeyError: If metric not found in results """ if metric not in self.metrics: raise KeyError(f"Metric '{metric}' not found in results") baseline_idx = len(self.variations) // 2 baseline_metric = self.metrics[metric][baseline_idx] # Avoid division by zero if baseline_metric == 0 or self.baseline_value == 0: return 0.0 n = len(self.variations) # Central finite difference when both neighbours exist if baseline_idx > 0 and baseline_idx < n - 1: dM = float( self.metrics[metric][baseline_idx + 1] - self.metrics[metric][baseline_idx - 1] ) dP = float(self.variations[baseline_idx + 1] - self.variations[baseline_idx - 1]) elif baseline_idx < n - 1: # Forward difference (baseline is the first element) dM = float(self.metrics[metric][baseline_idx + 1] - self.metrics[metric][baseline_idx]) dP = float(self.variations[baseline_idx + 1] - self.variations[baseline_idx]) elif baseline_idx > 0: # Backward difference (baseline is the last element) dM = float(self.metrics[metric][baseline_idx] - self.metrics[metric][baseline_idx - 1]) dP = float(self.variations[baseline_idx] - self.variations[baseline_idx - 1]) else: # Single point — no derivative possible return 0.0 if dP == 0: return 0.0 return float((dM / dP) * (self.baseline_value / baseline_metric))
[docs] def get_metric_bounds(self, metric: str) -> Tuple[float, float]: """Get the minimum and maximum values for a metric. Args: metric: Name of the metric Returns: Tuple of (min_value, max_value) Raises: KeyError: If metric not found in results """ if metric not in self.metrics: raise KeyError(f"Metric '{metric}' not found in results") return float(self.metrics[metric].min()), float(self.metrics[metric].max())
[docs] def to_dataframe(self) -> pd.DataFrame: """Convert results to a pandas DataFrame. Returns: DataFrame with variations and all metrics """ data = {"parameter_value": self.variations} data.update(self.metrics) return pd.DataFrame(data)
[docs] @dataclass class TwoWaySensitivityResult: """Results from two-way sensitivity analysis. Attributes: parameter1: Name of first parameter parameter2: Name of second parameter values1: Array of values for first parameter values2: Array of values for second parameter metric_grid: 2D array of metric values [len(values1), len(values2)] metric_name: Name of the metric analyzed """ parameter1: str parameter2: str values1: np.ndarray values2: np.ndarray metric_grid: np.ndarray metric_name: str
[docs] def find_optimal_region(self, target_value: float, tolerance: float = 0.05) -> np.ndarray: """Find parameter combinations that achieve target metric value. Args: target_value: Target value for the metric tolerance: Relative tolerance for matching (default 5%) Returns: Boolean mask array indicating satisfactory regions """ lower_bound = target_value * (1 - tolerance) upper_bound = target_value * (1 + tolerance) return (self.metric_grid >= lower_bound) & (self.metric_grid <= upper_bound)
[docs] def to_dataframe(self) -> pd.DataFrame: """Convert to DataFrame for easier manipulation. Returns: DataFrame with multi-index for parameters and metric values """ # Create meshgrid for parameter combinations p1_grid, p2_grid = np.meshgrid(self.values1, self.values2, indexing="ij") # Flatten arrays for DataFrame data = { self.parameter1: p1_grid.flatten(), self.parameter2: p2_grid.flatten(), self.metric_name: self.metric_grid.flatten(), } return pd.DataFrame(data)
[docs] class SensitivityAnalyzer: """Comprehensive sensitivity analysis tools for optimization. This class provides methods for analyzing how parameter changes affect optimization outcomes, with built-in caching for efficiency. Attributes: base_config: Base configuration dictionary optimizer: Optimizer object with an optimize() method results_cache: Cache for optimization results cache_dir: Directory for persistent cache storage """ def __init__( self, base_config: Dict[str, Any], optimizer: Any, cache_dir: Optional[Path] = None ): """Initialize sensitivity analyzer. Args: base_config: Base configuration dictionary for optimization optimizer: Object with optimize(config) method returning results cache_dir: Optional directory for persistent caching """ self.base_config = base_config.copy() self.optimizer = optimizer self.results_cache: Dict[str, Any] = {} self.cache_dir = cache_dir # Create cache directory if specified if self.cache_dir: self.cache_dir.mkdir(parents=True, exist_ok=True) def _get_cache_key(self, config: Dict[str, Any]) -> str: """Generate cache key for configuration. Args: config: Configuration dictionary Returns: Hash string for the configuration """ # Sort keys for consistent hashing sorted_config = dict(sorted(config.items())) config_str = str(sorted_config) return hashlib.md5(config_str.encode()).hexdigest() def _get_cached_result(self, cache_key: str) -> Optional[Any]: """Retrieve cached result if available. Args: cache_key: Cache key for the result Returns: Cached result or None if not found """ # Check in-memory cache first if cache_key in self.results_cache: return self.results_cache[cache_key] # Check persistent cache if configured if self.cache_dir: cache_file = self.cache_dir / f"{cache_key}.pkl" if cache_file.exists(): try: with open(cache_file, "rb") as f: result = safe_load(f) self.results_cache[cache_key] = result return result except Exception: # pylint: disable=broad-exception-caught # If cache loading fails, continue without it pass return None def _cache_result(self, cache_key: str, result: Any) -> None: """Store result in cache. Args: cache_key: Cache key for the result result: Result to cache """ # Store in memory self.results_cache[cache_key] = result # Store persistently if configured if self.cache_dir: cache_file = self.cache_dir / f"{cache_key}.pkl" try: with open(cache_file, "wb") as f: safe_dump(result, f) except Exception: # pylint: disable=broad-exception-caught # If caching fails, continue without it pass def _update_nested_config( self, config: Dict[str, Any], param_path: str, value: Any ) -> Dict[str, Any]: """Update a nested parameter in configuration. Uses shallow copies along only the modification path instead of a full deep copy. Sibling branches share references with the original, which is safe because we never mutate them — the leaf value is always a scalar (float from ``np.linspace``). Args: config: Configuration dictionary param_path: Dot-separated path to parameter value: New value for parameter Returns: Updated configuration dictionary (original is not modified) """ config_copy = config.copy() # shallow copy top-level parts = param_path.split(".") # Shallow-copy each dict along the path from root to the target key current = config_copy for part in parts[:-1]: if part not in current: current[part] = {} elif not isinstance(current[part], dict): # Convert to dict if needed current[part] = {"value": current[part]} else: current[part] = current[part].copy() # shallow copy only this level current = current[part] # Set the final value current[parts[-1]] = value return config_copy
[docs] def analyze_parameter( # pylint: disable=too-many-locals,too-many-branches,too-many-statements self, param_name: str, param_range: Optional[Tuple[float, float]] = None, n_points: int = 11, param_path: Optional[str] = None, relative_range: float = 0.3, ) -> SensitivityResult: """Analyze sensitivity to a single parameter. Args: param_name: Name of parameter to analyze param_range: (min, max) range for parameter values n_points: Number of points to evaluate param_path: Nested path to parameter (e.g., "manufacturer.tax_rate") relative_range: If param_range not provided, use ±relative_range from baseline Returns: SensitivityResult with analysis results Raises: KeyError: If parameter not found in base configuration """ # Determine parameter path if param_path is None: param_path = param_name # Get baseline value baseline: Any if "." in param_path: # Handle nested parameters parts = param_path.split(".") baseline = self.base_config for part in parts: if part not in baseline: raise KeyError(f"Parameter '{param_path}' not found in configuration") baseline = baseline[part] else: if param_name not in self.base_config: raise KeyError(f"Parameter '{param_name}' not found in configuration") baseline = self.base_config[param_name] # Determine parameter range if param_range is None: # Ensure baseline is numeric try: baseline_float = float(baseline) except (TypeError, ValueError) as exc: raise ValueError( f"Parameter '{param_name}' has non-numeric baseline value: {baseline}" ) from exc min_val = baseline_float * (1 - relative_range) max_val = baseline_float * (1 + relative_range) param_range = (min_val, max_val) min_val, max_val = param_range variations = np.linspace(min_val, max_val, n_points) # Initialize metrics storage metrics: Dict[str, List[float]] = { "optimal_roe": [], "bankruptcy_risk": [], "optimal_retention": [], "total_premium": [], "growth_rate": [], "capital_efficiency": [], } # Run optimization for each variation for value in variations: # Update configuration if "." in param_path: config = self._update_nested_config(self.base_config, param_path, value) else: config = self.base_config.copy() config[param_name] = value # Get result (with caching) cache_key = self._get_cache_key(config) result = self._get_cached_result(cache_key) if result is None: # Run optimization result = self.optimizer.optimize(config) self._cache_result(cache_key, result) # Extract metrics # Handle different result structures if hasattr(result, "optimal_strategy"): strategy = result.optimal_strategy metrics["optimal_roe"].append(strategy.expected_roe) metrics["bankruptcy_risk"].append(strategy.bankruptcy_risk) metrics["growth_rate"].append(strategy.growth_rate) metrics["capital_efficiency"].append(strategy.capital_efficiency) # Handle retention/deductible if hasattr(strategy, "deductible"): metrics["optimal_retention"].append(strategy.deductible) else: metrics["optimal_retention"].append(0.0) # Handle premium if hasattr(strategy, "premium_rate"): metrics["total_premium"].append(strategy.premium_rate) else: metrics["total_premium"].append(0.0) else: # Fallback for simpler result structures metrics["optimal_roe"].append(getattr(result, "roe", 0.0)) metrics["bankruptcy_risk"].append(getattr(result, "ruin_prob", 0.0)) metrics["optimal_retention"].append(getattr(result, "retention", 0.0)) metrics["total_premium"].append(getattr(result, "premium", 0.0)) metrics["growth_rate"].append(getattr(result, "growth_rate", 0.0)) metrics["capital_efficiency"].append(getattr(result, "capital_efficiency", 0.0)) # Convert metrics to arrays metrics_arrays: Dict[str, np.ndarray] = {} for key, values in metrics.items(): metrics_arrays[key] = np.array(values) return SensitivityResult( parameter=param_name, baseline_value=float( baseline ), # baseline is guaranteed to be numeric from earlier check variations=variations, metrics=metrics_arrays, parameter_path=param_path, )
[docs] def create_tornado_diagram( self, parameters: List[Union[str, Tuple[str, str]]], metric: str = "optimal_roe", relative_range: float = 0.3, n_points: int = 11, ) -> pd.DataFrame: """Create tornado diagram data for parameter impacts. Args: parameters: List of parameter names or (name, path) tuples metric: Metric to analyze relative_range: Relative range for parameter variations n_points: Number of points for analysis Returns: DataFrame sorted by impact magnitude with columns: - parameter: Parameter name - impact: Absolute impact value - direction: "positive" or "negative" - low_value: Metric value at parameter minimum - high_value: Metric value at parameter maximum - baseline: Metric value at baseline - baseline_param: Baseline parameter value """ impacts = [] for param in parameters: # Handle both string and tuple inputs if isinstance(param, tuple): param_name, param_path = param else: param_name = param param_path = param try: # Analyze sensitivity result = self.analyze_parameter( param_name, param_path=param_path, relative_range=relative_range, n_points=n_points, ) # Calculate impact impact = result.calculate_impact(metric) # Get metric bounds low_val, high_val = result.get_metric_bounds(metric) baseline_idx = len(result.variations) // 2 baseline_metric = result.metrics[metric][baseline_idx] # Store for tornado diagram impacts.append( { "parameter": param_name, "impact": abs(impact), "direction": "positive" if impact > 0 else "negative", "low_value": low_val, "high_value": high_val, "baseline": baseline_metric, "baseline_param": result.baseline_value, "range_width": high_val - low_val, } ) except (KeyError, Exception) as e: # pylint: disable=broad-exception-caught # Skip parameters that cause errors logger.warning("Could not analyze parameter '%s': %s", param_name, e) continue # Create DataFrame and sort by impact df = pd.DataFrame(impacts) if not df.empty: df = df.sort_values("impact", ascending=False) return df
[docs] def analyze_two_way( # pylint: disable=too-many-locals,too-many-branches self, param1: Union[str, Tuple[str, str]], param2: Union[str, Tuple[str, str]], param1_range: Optional[Tuple[float, float]] = None, param2_range: Optional[Tuple[float, float]] = None, n_points1: int = 10, n_points2: int = 10, metric: str = "optimal_roe", relative_range: float = 0.3, max_workers: Optional[int] = None, ) -> TwoWaySensitivityResult: """Perform two-way sensitivity analysis. Args: param1: First parameter name or (name, path) tuple param2: Second parameter name or (name, path) tuple param1_range: Range for first parameter param2_range: Range for second parameter n_points1: Number of points for first parameter n_points2: Number of points for second parameter metric: Metric to analyze relative_range: Relative range if explicit ranges not provided max_workers: Maximum number of threads for parallel optimization. If ``None`` or 1, optimizations run sequentially (default). Uses :class:`~concurrent.futures.ThreadPoolExecutor` so that NumPy/SciPy work can release the GIL for true parallelism without pickling overhead. Returns: TwoWaySensitivityResult with grid of metric values """ # Parse parameter specifications if isinstance(param1, tuple): param1_name, param1_path = param1 else: param1_name = param1_path = param1 if isinstance(param2, tuple): param2_name, param2_path = param2 else: param2_name = param2_path = param2 # Get baseline values baseline1 = self._get_param_value(param1_path) baseline2 = self._get_param_value(param2_path) # Determine ranges if param1_range is None: param1_range = (baseline1 * (1 - relative_range), baseline1 * (1 + relative_range)) if param2_range is None: param2_range = (baseline2 * (1 - relative_range), baseline2 * (1 + relative_range)) # Create parameter grids values1 = np.linspace(param1_range[0], param1_range[1], n_points1) values2 = np.linspace(param2_range[0], param2_range[1], n_points2) # Initialize result grid metric_grid = np.zeros((len(values1), len(values2))) # --- Pre-build all configs and separate cached from uncached ---------- configs: Dict[Tuple[int, int], Dict[str, Any]] = {} cache_keys: Dict[Tuple[int, int], str] = {} cached_results: Dict[Tuple[int, int], Any] = {} uncached: Dict[Tuple[int, int], Dict[str, Any]] = {} for i, val1 in enumerate(values1): for j, val2 in enumerate(values2): config = self.base_config.copy() if "." in param1_path: config = self._update_nested_config(config, param1_path, val1) else: config[param1_name] = val1 if "." in param2_path: config = self._update_nested_config(config, param2_path, val2) else: config[param2_name] = val2 cache_key = self._get_cache_key(config) cache_keys[(i, j)] = cache_key configs[(i, j)] = config hit = self._get_cached_result(cache_key) if hit is not None: cached_results[(i, j)] = hit else: uncached[(i, j)] = config # --- Run uncached optimizations (parallel or sequential) -------------- new_results: Dict[Tuple[int, int], Any] = {} if uncached and max_workers is not None and max_workers > 1: # Parallel path using ThreadPoolExecutor keys_list = list(uncached.keys()) configs_list = [uncached[k] for k in keys_list] with ThreadPoolExecutor(max_workers=max_workers) as executor: results_iter = executor.map(self.optimizer.optimize, configs_list) for key, result in zip(keys_list, results_iter): new_results[key] = result else: # Sequential path (default — preserves backwards compatibility) for key, config in uncached.items(): new_results[key] = self.optimizer.optimize(config) # --- Cache new results and build the metric grid ---------------------- for key, result in new_results.items(): self._cache_result(cache_keys[key], result) all_results = {**cached_results, **new_results} for (i, j), result in all_results.items(): metric_grid[i, j] = self._extract_metric(result, metric) return TwoWaySensitivityResult( parameter1=param1_name, parameter2=param2_name, values1=values1, values2=values2, metric_grid=metric_grid, metric_name=metric, )
def _get_param_value(self, param_path: str) -> Any: """Get parameter value from configuration. Args: param_path: Dot-separated path to parameter Returns: Parameter value Raises: KeyError: If parameter not found """ if "." in param_path: parts = param_path.split(".") value = self.base_config for part in parts: if part not in value: raise KeyError(f"Parameter '{param_path}' not found") value = value[part] return value if param_path not in self.base_config: raise KeyError(f"Parameter '{param_path}' not found") return self.base_config[param_path] def _extract_metric(self, result: Any, metric: str) -> float: """Extract metric value from optimization result. Args: result: Optimization result object metric: Name of metric to extract Returns: Metric value """ # Try different result structures if hasattr(result, "optimal_strategy"): strategy = result.optimal_strategy # Map metrics to strategy attributes strategy_metric_map = { "optimal_roe": lambda s: float(s.expected_roe), "bankruptcy_risk": lambda s: float(s.bankruptcy_risk), "growth_rate": lambda s: float(s.growth_rate), "capital_efficiency": lambda s: float(s.capital_efficiency), "optimal_retention": lambda s: getattr(s, "deductible", 0.0), "total_premium": lambda s: getattr(s, "premium_rate", 0.0), } if metric in strategy_metric_map: return strategy_metric_map[metric](strategy) # Fallback to direct attribute access metric_map = { "optimal_roe": "roe", "bankruptcy_risk": "ruin_prob", "optimal_retention": "retention", "total_premium": "premium", "growth_rate": "growth_rate", "capital_efficiency": "capital_efficiency", } attr_name = metric_map.get(metric, metric) return getattr(result, attr_name, 0.0)
[docs] def clear_cache(self) -> None: """Clear all cached results.""" self.results_cache.clear() # Clear persistent cache if configured if self.cache_dir and self.cache_dir.exists(): for cache_file in self.cache_dir.glob("*.pkl"): try: cache_file.unlink() except Exception: # pylint: disable=broad-exception-caught pass
[docs] def analyze_parameter_group( self, parameter_group: Dict[str, Tuple[float, float]], n_points: int = 11, metric: str = "optimal_roe", ) -> Dict[str, SensitivityResult]: """Analyze sensitivity for a group of parameters. Args: parameter_group: Dictionary of parameter names to (min, max) ranges n_points: Number of points for each parameter metric: Primary metric for analysis Returns: Dictionary of parameter names to SensitivityResult objects """ results = {} for param_name, param_range in parameter_group.items(): try: result = self.analyze_parameter( param_name, param_range=param_range, n_points=n_points ) results[param_name] = result except Exception as e: # pylint: disable=broad-exception-caught logger.warning("Could not analyze '%s': %s", param_name, e) return results