"""Comprehensive sensitivity analysis tools for insurance optimization.
This module provides tools for analyzing how changes in key parameters affect
optimization results, including one-at-a-time (OAT) analysis, tornado diagrams,
and two-way sensitivity analysis with efficient caching.
Example:
Basic sensitivity analysis for a single parameter::
from ergodic_insurance.sensitivity import SensitivityAnalyzer
from ergodic_insurance.business_optimizer import BusinessOptimizer
from ergodic_insurance.manufacturer import WidgetManufacturer
# Setup optimizer
manufacturer = WidgetManufacturer(initial_assets=10_000_000)
optimizer = BusinessOptimizer(manufacturer)
# Run sensitivity analysis
analyzer = SensitivityAnalyzer(base_config, optimizer)
result = analyzer.analyze_parameter(
"frequency",
param_range=(3, 8),
n_points=11
)
# Generate tornado diagram
tornado_data = analyzer.create_tornado_diagram(
parameters=["frequency", "severity_mean", "premium_rate"],
metric="optimal_roe"
)
.. versionchanged:: 0.7.0
Replaced bare ``print()`` warning calls with ``logging.warning()``.
See :issue:`382`.
Author: Alex Filiakov
Date: 2025-01-29
"""
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import hashlib
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from .safe_pickle import safe_dump, safe_load
logger = logging.getLogger(__name__)
[docs]
@dataclass
class SensitivityResult:
"""Results from sensitivity analysis for a single parameter.
Attributes:
parameter: Name of the parameter being analyzed
baseline_value: Original value of the parameter
variations: Array of parameter values tested
metrics: Dictionary of metric arrays for each variation
parameter_path: Nested path to parameter (e.g., "manufacturer.base_operating_margin")
units: Optional units for the parameter (e.g., "percentage", "dollars")
"""
parameter: str
baseline_value: float
variations: np.ndarray
metrics: Dict[str, np.ndarray]
parameter_path: Optional[str] = None
units: Optional[str] = None
[docs]
def calculate_impact(self, metric: str) -> float:
"""Calculate signed point elasticity of a metric w.r.t. this parameter.
Uses central finite differences at the baseline to estimate the
derivative, then normalises to a unit-free elasticity::
elasticity = (dM/dP) * (P_baseline / M_baseline)
A positive value means increasing the parameter increases the metric;
a negative value means increasing the parameter decreases the metric.
Args:
metric: Name of the metric to calculate impact for
Returns:
Signed point elasticity at the baseline
Raises:
KeyError: If metric not found in results
"""
if metric not in self.metrics:
raise KeyError(f"Metric '{metric}' not found in results")
baseline_idx = len(self.variations) // 2
baseline_metric = self.metrics[metric][baseline_idx]
# Avoid division by zero
if baseline_metric == 0 or self.baseline_value == 0:
return 0.0
n = len(self.variations)
# Central finite difference when both neighbours exist
if baseline_idx > 0 and baseline_idx < n - 1:
dM = float(
self.metrics[metric][baseline_idx + 1] - self.metrics[metric][baseline_idx - 1]
)
dP = float(self.variations[baseline_idx + 1] - self.variations[baseline_idx - 1])
elif baseline_idx < n - 1:
# Forward difference (baseline is the first element)
dM = float(self.metrics[metric][baseline_idx + 1] - self.metrics[metric][baseline_idx])
dP = float(self.variations[baseline_idx + 1] - self.variations[baseline_idx])
elif baseline_idx > 0:
# Backward difference (baseline is the last element)
dM = float(self.metrics[metric][baseline_idx] - self.metrics[metric][baseline_idx - 1])
dP = float(self.variations[baseline_idx] - self.variations[baseline_idx - 1])
else:
# Single point — no derivative possible
return 0.0
if dP == 0:
return 0.0
return float((dM / dP) * (self.baseline_value / baseline_metric))
[docs]
def get_metric_bounds(self, metric: str) -> Tuple[float, float]:
"""Get the minimum and maximum values for a metric.
Args:
metric: Name of the metric
Returns:
Tuple of (min_value, max_value)
Raises:
KeyError: If metric not found in results
"""
if metric not in self.metrics:
raise KeyError(f"Metric '{metric}' not found in results")
return float(self.metrics[metric].min()), float(self.metrics[metric].max())
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Convert results to a pandas DataFrame.
Returns:
DataFrame with variations and all metrics
"""
data = {"parameter_value": self.variations}
data.update(self.metrics)
return pd.DataFrame(data)
[docs]
@dataclass
class TwoWaySensitivityResult:
"""Results from two-way sensitivity analysis.
Attributes:
parameter1: Name of first parameter
parameter2: Name of second parameter
values1: Array of values for first parameter
values2: Array of values for second parameter
metric_grid: 2D array of metric values [len(values1), len(values2)]
metric_name: Name of the metric analyzed
"""
parameter1: str
parameter2: str
values1: np.ndarray
values2: np.ndarray
metric_grid: np.ndarray
metric_name: str
[docs]
def find_optimal_region(self, target_value: float, tolerance: float = 0.05) -> np.ndarray:
"""Find parameter combinations that achieve target metric value.
Args:
target_value: Target value for the metric
tolerance: Relative tolerance for matching (default 5%)
Returns:
Boolean mask array indicating satisfactory regions
"""
lower_bound = target_value * (1 - tolerance)
upper_bound = target_value * (1 + tolerance)
return (self.metric_grid >= lower_bound) & (self.metric_grid <= upper_bound)
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Convert to DataFrame for easier manipulation.
Returns:
DataFrame with multi-index for parameters and metric values
"""
# Create meshgrid for parameter combinations
p1_grid, p2_grid = np.meshgrid(self.values1, self.values2, indexing="ij")
# Flatten arrays for DataFrame
data = {
self.parameter1: p1_grid.flatten(),
self.parameter2: p2_grid.flatten(),
self.metric_name: self.metric_grid.flatten(),
}
return pd.DataFrame(data)
[docs]
class SensitivityAnalyzer:
"""Comprehensive sensitivity analysis tools for optimization.
This class provides methods for analyzing how parameter changes affect
optimization outcomes, with built-in caching for efficiency.
Attributes:
base_config: Base configuration dictionary
optimizer: Optimizer object with an optimize() method
results_cache: Cache for optimization results
cache_dir: Directory for persistent cache storage
"""
def __init__(
self, base_config: Dict[str, Any], optimizer: Any, cache_dir: Optional[Path] = None
):
"""Initialize sensitivity analyzer.
Args:
base_config: Base configuration dictionary for optimization
optimizer: Object with optimize(config) method returning results
cache_dir: Optional directory for persistent caching
"""
self.base_config = base_config.copy()
self.optimizer = optimizer
self.results_cache: Dict[str, Any] = {}
self.cache_dir = cache_dir
# Create cache directory if specified
if self.cache_dir:
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _get_cache_key(self, config: Dict[str, Any]) -> str:
"""Generate cache key for configuration.
Args:
config: Configuration dictionary
Returns:
Hash string for the configuration
"""
# Sort keys for consistent hashing
sorted_config = dict(sorted(config.items()))
config_str = str(sorted_config)
return hashlib.md5(config_str.encode()).hexdigest()
def _get_cached_result(self, cache_key: str) -> Optional[Any]:
"""Retrieve cached result if available.
Args:
cache_key: Cache key for the result
Returns:
Cached result or None if not found
"""
# Check in-memory cache first
if cache_key in self.results_cache:
return self.results_cache[cache_key]
# Check persistent cache if configured
if self.cache_dir:
cache_file = self.cache_dir / f"{cache_key}.pkl"
if cache_file.exists():
try:
with open(cache_file, "rb") as f:
result = safe_load(f)
self.results_cache[cache_key] = result
return result
except Exception: # pylint: disable=broad-exception-caught
# If cache loading fails, continue without it
pass
return None
def _cache_result(self, cache_key: str, result: Any) -> None:
"""Store result in cache.
Args:
cache_key: Cache key for the result
result: Result to cache
"""
# Store in memory
self.results_cache[cache_key] = result
# Store persistently if configured
if self.cache_dir:
cache_file = self.cache_dir / f"{cache_key}.pkl"
try:
with open(cache_file, "wb") as f:
safe_dump(result, f)
except Exception: # pylint: disable=broad-exception-caught
# If caching fails, continue without it
pass
def _update_nested_config(
self, config: Dict[str, Any], param_path: str, value: Any
) -> Dict[str, Any]:
"""Update a nested parameter in configuration.
Uses shallow copies along only the modification path instead of a full
deep copy. Sibling branches share references with the original, which
is safe because we never mutate them — the leaf value is always a
scalar (float from ``np.linspace``).
Args:
config: Configuration dictionary
param_path: Dot-separated path to parameter
value: New value for parameter
Returns:
Updated configuration dictionary (original is not modified)
"""
config_copy = config.copy() # shallow copy top-level
parts = param_path.split(".")
# Shallow-copy each dict along the path from root to the target key
current = config_copy
for part in parts[:-1]:
if part not in current:
current[part] = {}
elif not isinstance(current[part], dict):
# Convert to dict if needed
current[part] = {"value": current[part]}
else:
current[part] = current[part].copy() # shallow copy only this level
current = current[part]
# Set the final value
current[parts[-1]] = value
return config_copy
[docs]
def analyze_parameter( # pylint: disable=too-many-locals,too-many-branches,too-many-statements
self,
param_name: str,
param_range: Optional[Tuple[float, float]] = None,
n_points: int = 11,
param_path: Optional[str] = None,
relative_range: float = 0.3,
) -> SensitivityResult:
"""Analyze sensitivity to a single parameter.
Args:
param_name: Name of parameter to analyze
param_range: (min, max) range for parameter values
n_points: Number of points to evaluate
param_path: Nested path to parameter (e.g., "manufacturer.tax_rate")
relative_range: If param_range not provided, use ±relative_range from baseline
Returns:
SensitivityResult with analysis results
Raises:
KeyError: If parameter not found in base configuration
"""
# Determine parameter path
if param_path is None:
param_path = param_name
# Get baseline value
baseline: Any
if "." in param_path:
# Handle nested parameters
parts = param_path.split(".")
baseline = self.base_config
for part in parts:
if part not in baseline:
raise KeyError(f"Parameter '{param_path}' not found in configuration")
baseline = baseline[part]
else:
if param_name not in self.base_config:
raise KeyError(f"Parameter '{param_name}' not found in configuration")
baseline = self.base_config[param_name]
# Determine parameter range
if param_range is None:
# Ensure baseline is numeric
try:
baseline_float = float(baseline)
except (TypeError, ValueError) as exc:
raise ValueError(
f"Parameter '{param_name}' has non-numeric baseline value: {baseline}"
) from exc
min_val = baseline_float * (1 - relative_range)
max_val = baseline_float * (1 + relative_range)
param_range = (min_val, max_val)
min_val, max_val = param_range
variations = np.linspace(min_val, max_val, n_points)
# Initialize metrics storage
metrics: Dict[str, List[float]] = {
"optimal_roe": [],
"bankruptcy_risk": [],
"optimal_retention": [],
"total_premium": [],
"growth_rate": [],
"capital_efficiency": [],
}
# Run optimization for each variation
for value in variations:
# Update configuration
if "." in param_path:
config = self._update_nested_config(self.base_config, param_path, value)
else:
config = self.base_config.copy()
config[param_name] = value
# Get result (with caching)
cache_key = self._get_cache_key(config)
result = self._get_cached_result(cache_key)
if result is None:
# Run optimization
result = self.optimizer.optimize(config)
self._cache_result(cache_key, result)
# Extract metrics
# Handle different result structures
if hasattr(result, "optimal_strategy"):
strategy = result.optimal_strategy
metrics["optimal_roe"].append(strategy.expected_roe)
metrics["bankruptcy_risk"].append(strategy.bankruptcy_risk)
metrics["growth_rate"].append(strategy.growth_rate)
metrics["capital_efficiency"].append(strategy.capital_efficiency)
# Handle retention/deductible
if hasattr(strategy, "deductible"):
metrics["optimal_retention"].append(strategy.deductible)
else:
metrics["optimal_retention"].append(0.0)
# Handle premium
if hasattr(strategy, "premium_rate"):
metrics["total_premium"].append(strategy.premium_rate)
else:
metrics["total_premium"].append(0.0)
else:
# Fallback for simpler result structures
metrics["optimal_roe"].append(getattr(result, "roe", 0.0))
metrics["bankruptcy_risk"].append(getattr(result, "ruin_prob", 0.0))
metrics["optimal_retention"].append(getattr(result, "retention", 0.0))
metrics["total_premium"].append(getattr(result, "premium", 0.0))
metrics["growth_rate"].append(getattr(result, "growth_rate", 0.0))
metrics["capital_efficiency"].append(getattr(result, "capital_efficiency", 0.0))
# Convert metrics to arrays
metrics_arrays: Dict[str, np.ndarray] = {}
for key, values in metrics.items():
metrics_arrays[key] = np.array(values)
return SensitivityResult(
parameter=param_name,
baseline_value=float(
baseline
), # baseline is guaranteed to be numeric from earlier check
variations=variations,
metrics=metrics_arrays,
parameter_path=param_path,
)
[docs]
def create_tornado_diagram(
self,
parameters: List[Union[str, Tuple[str, str]]],
metric: str = "optimal_roe",
relative_range: float = 0.3,
n_points: int = 11,
) -> pd.DataFrame:
"""Create tornado diagram data for parameter impacts.
Args:
parameters: List of parameter names or (name, path) tuples
metric: Metric to analyze
relative_range: Relative range for parameter variations
n_points: Number of points for analysis
Returns:
DataFrame sorted by impact magnitude with columns:
- parameter: Parameter name
- impact: Absolute impact value
- direction: "positive" or "negative"
- low_value: Metric value at parameter minimum
- high_value: Metric value at parameter maximum
- baseline: Metric value at baseline
- baseline_param: Baseline parameter value
"""
impacts = []
for param in parameters:
# Handle both string and tuple inputs
if isinstance(param, tuple):
param_name, param_path = param
else:
param_name = param
param_path = param
try:
# Analyze sensitivity
result = self.analyze_parameter(
param_name,
param_path=param_path,
relative_range=relative_range,
n_points=n_points,
)
# Calculate impact
impact = result.calculate_impact(metric)
# Get metric bounds
low_val, high_val = result.get_metric_bounds(metric)
baseline_idx = len(result.variations) // 2
baseline_metric = result.metrics[metric][baseline_idx]
# Store for tornado diagram
impacts.append(
{
"parameter": param_name,
"impact": abs(impact),
"direction": "positive" if impact > 0 else "negative",
"low_value": low_val,
"high_value": high_val,
"baseline": baseline_metric,
"baseline_param": result.baseline_value,
"range_width": high_val - low_val,
}
)
except (KeyError, Exception) as e: # pylint: disable=broad-exception-caught
# Skip parameters that cause errors
logger.warning("Could not analyze parameter '%s': %s", param_name, e)
continue
# Create DataFrame and sort by impact
df = pd.DataFrame(impacts)
if not df.empty:
df = df.sort_values("impact", ascending=False)
return df
[docs]
def analyze_two_way( # pylint: disable=too-many-locals,too-many-branches
self,
param1: Union[str, Tuple[str, str]],
param2: Union[str, Tuple[str, str]],
param1_range: Optional[Tuple[float, float]] = None,
param2_range: Optional[Tuple[float, float]] = None,
n_points1: int = 10,
n_points2: int = 10,
metric: str = "optimal_roe",
relative_range: float = 0.3,
max_workers: Optional[int] = None,
) -> TwoWaySensitivityResult:
"""Perform two-way sensitivity analysis.
Args:
param1: First parameter name or (name, path) tuple
param2: Second parameter name or (name, path) tuple
param1_range: Range for first parameter
param2_range: Range for second parameter
n_points1: Number of points for first parameter
n_points2: Number of points for second parameter
metric: Metric to analyze
relative_range: Relative range if explicit ranges not provided
max_workers: Maximum number of threads for parallel optimization.
If ``None`` or 1, optimizations run sequentially (default).
Uses :class:`~concurrent.futures.ThreadPoolExecutor` so that
NumPy/SciPy work can release the GIL for true parallelism
without pickling overhead.
Returns:
TwoWaySensitivityResult with grid of metric values
"""
# Parse parameter specifications
if isinstance(param1, tuple):
param1_name, param1_path = param1
else:
param1_name = param1_path = param1
if isinstance(param2, tuple):
param2_name, param2_path = param2
else:
param2_name = param2_path = param2
# Get baseline values
baseline1 = self._get_param_value(param1_path)
baseline2 = self._get_param_value(param2_path)
# Determine ranges
if param1_range is None:
param1_range = (baseline1 * (1 - relative_range), baseline1 * (1 + relative_range))
if param2_range is None:
param2_range = (baseline2 * (1 - relative_range), baseline2 * (1 + relative_range))
# Create parameter grids
values1 = np.linspace(param1_range[0], param1_range[1], n_points1)
values2 = np.linspace(param2_range[0], param2_range[1], n_points2)
# Initialize result grid
metric_grid = np.zeros((len(values1), len(values2)))
# --- Pre-build all configs and separate cached from uncached ----------
configs: Dict[Tuple[int, int], Dict[str, Any]] = {}
cache_keys: Dict[Tuple[int, int], str] = {}
cached_results: Dict[Tuple[int, int], Any] = {}
uncached: Dict[Tuple[int, int], Dict[str, Any]] = {}
for i, val1 in enumerate(values1):
for j, val2 in enumerate(values2):
config = self.base_config.copy()
if "." in param1_path:
config = self._update_nested_config(config, param1_path, val1)
else:
config[param1_name] = val1
if "." in param2_path:
config = self._update_nested_config(config, param2_path, val2)
else:
config[param2_name] = val2
cache_key = self._get_cache_key(config)
cache_keys[(i, j)] = cache_key
configs[(i, j)] = config
hit = self._get_cached_result(cache_key)
if hit is not None:
cached_results[(i, j)] = hit
else:
uncached[(i, j)] = config
# --- Run uncached optimizations (parallel or sequential) --------------
new_results: Dict[Tuple[int, int], Any] = {}
if uncached and max_workers is not None and max_workers > 1:
# Parallel path using ThreadPoolExecutor
keys_list = list(uncached.keys())
configs_list = [uncached[k] for k in keys_list]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results_iter = executor.map(self.optimizer.optimize, configs_list)
for key, result in zip(keys_list, results_iter):
new_results[key] = result
else:
# Sequential path (default — preserves backwards compatibility)
for key, config in uncached.items():
new_results[key] = self.optimizer.optimize(config)
# --- Cache new results and build the metric grid ----------------------
for key, result in new_results.items():
self._cache_result(cache_keys[key], result)
all_results = {**cached_results, **new_results}
for (i, j), result in all_results.items():
metric_grid[i, j] = self._extract_metric(result, metric)
return TwoWaySensitivityResult(
parameter1=param1_name,
parameter2=param2_name,
values1=values1,
values2=values2,
metric_grid=metric_grid,
metric_name=metric,
)
def _get_param_value(self, param_path: str) -> Any:
"""Get parameter value from configuration.
Args:
param_path: Dot-separated path to parameter
Returns:
Parameter value
Raises:
KeyError: If parameter not found
"""
if "." in param_path:
parts = param_path.split(".")
value = self.base_config
for part in parts:
if part not in value:
raise KeyError(f"Parameter '{param_path}' not found")
value = value[part]
return value
if param_path not in self.base_config:
raise KeyError(f"Parameter '{param_path}' not found")
return self.base_config[param_path]
def _extract_metric(self, result: Any, metric: str) -> float:
"""Extract metric value from optimization result.
Args:
result: Optimization result object
metric: Name of metric to extract
Returns:
Metric value
"""
# Try different result structures
if hasattr(result, "optimal_strategy"):
strategy = result.optimal_strategy
# Map metrics to strategy attributes
strategy_metric_map = {
"optimal_roe": lambda s: float(s.expected_roe),
"bankruptcy_risk": lambda s: float(s.bankruptcy_risk),
"growth_rate": lambda s: float(s.growth_rate),
"capital_efficiency": lambda s: float(s.capital_efficiency),
"optimal_retention": lambda s: getattr(s, "deductible", 0.0),
"total_premium": lambda s: getattr(s, "premium_rate", 0.0),
}
if metric in strategy_metric_map:
return strategy_metric_map[metric](strategy)
# Fallback to direct attribute access
metric_map = {
"optimal_roe": "roe",
"bankruptcy_risk": "ruin_prob",
"optimal_retention": "retention",
"total_premium": "premium",
"growth_rate": "growth_rate",
"capital_efficiency": "capital_efficiency",
}
attr_name = metric_map.get(metric, metric)
return getattr(result, attr_name, 0.0)
[docs]
def clear_cache(self) -> None:
"""Clear all cached results."""
self.results_cache.clear()
# Clear persistent cache if configured
if self.cache_dir and self.cache_dir.exists():
for cache_file in self.cache_dir.glob("*.pkl"):
try:
cache_file.unlink()
except Exception: # pylint: disable=broad-exception-caught
pass
[docs]
def analyze_parameter_group(
self,
parameter_group: Dict[str, Tuple[float, float]],
n_points: int = 11,
metric: str = "optimal_roe",
) -> Dict[str, SensitivityResult]:
"""Analyze sensitivity for a group of parameters.
Args:
parameter_group: Dictionary of parameter names to (min, max) ranges
n_points: Number of points for each parameter
metric: Primary metric for analysis
Returns:
Dictionary of parameter names to SensitivityResult objects
"""
results = {}
for param_name, param_range in parameter_group.items():
try:
result = self.analyze_parameter(
param_name, param_range=param_range, n_points=n_points
)
results[param_name] = result
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning("Could not analyze '%s': %s", param_name, e)
return results