Source code for ergodic_insurance.visualization.executive_plots

"""Executive-level visualization functions.

This module provides high-level visualization functions for executive reporting
including loss distributions, return period curves, and insurance layer diagrams.
"""

# pylint: disable=too-many-lines

from typing import Any, Dict, List, Optional, Tuple, Union, cast

from matplotlib.figure import Figure
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
import numpy as np
import pandas as pd
from scipy.interpolate import make_interp_spline
from scipy.signal import argrelextrema

from .core import COLOR_SEQUENCE, WSJ_COLORS, WSJFormatter, format_currency, set_wsj_style


[docs] def safe_tight_layout(): """Apply tight_layout with warning suppression.""" import warnings with warnings.catch_warnings(): warnings.filterwarnings("ignore", message=".*Tight layout not applied.*") warnings.filterwarnings("ignore", message=".*not compatible with tight_layout.*") try: plt.tight_layout() except (ValueError, RuntimeError): # If tight_layout fails, continue without it pass
[docs] def plot_loss_distribution( # pylint: disable=too-many-locals,too-many-statements losses: Union[np.ndarray, pd.DataFrame], title: str = "Loss Distribution", bins: int = 50, show_metrics: bool = True, var_levels: Optional[List[float]] = None, figsize: Tuple[int, int] = (12, 6), show_stats: bool = False, log_scale: bool = False, use_factory: bool = False, theme: Optional[Any] = None, ) -> Figure: """Create WSJ-style loss distribution plot. Creates a two-panel visualization showing loss distribution histogram and Q-Q plot for normality assessment. Args: losses: Array of loss values or DataFrame with 'amount' column title: Plot title bins: Number of histogram bins show_metrics: Whether to show VaR/TVaR lines var_levels: VaR confidence levels to show (default: [0.95, 0.99]) figsize: Figure size (width, height) show_stats: Whether to show statistics log_scale: Whether to use log scale use_factory: Whether to use new visualization factory if available theme: Optional theme to use with factory Returns: Matplotlib figure with distribution plots Examples: >>> losses = np.random.lognormal(10, 2, 1000) >>> fig = plot_loss_distribution(losses, title="Annual Loss Distribution") """ set_wsj_style() # Handle DataFrame input if isinstance(losses, pd.DataFrame): if "amount" in losses.columns: losses_array: np.ndarray = np.asarray(losses["amount"].values) else: # Use the first numeric column numeric_cols = losses.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: losses_array = np.asarray(losses[numeric_cols[0]].values) else: raise ValueError("DataFrame must have at least one numeric column") else: # Convert to numpy array if needed losses_array = np.asarray(losses) losses = losses_array if var_levels is None: var_levels = [0.95, 0.99] fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize) # Check for empty data and create empty plot if len(losses) == 0: ax1.text( 0.5, 0.5, "No data available", ha="center", va="center", transform=ax1.transAxes, fontsize=12, ) ax1.set_title("Distribution of Losses") ax2.text( 0.5, 0.5, "No data available", ha="center", va="center", transform=ax2.transAxes, fontsize=12, ) ax2.set_title("Q-Q Plot") safe_tight_layout() return fig # Histogram ax1.hist( losses, bins=bins, color=WSJ_COLORS["blue"], alpha=0.7, edgecolor="black", linewidth=0.5 ) ax1.set_xlabel("Loss Amount") ax1.set_ylabel("Frequency") ax1.set_title("Distribution of Losses") ax1.xaxis.set_major_formatter(mticker.FuncFormatter(WSJFormatter.currency_formatter)) ax1.grid(True, alpha=0.3) # Add VaR lines if requested if show_metrics: from ..risk_metrics import RiskMetrics metrics = RiskMetrics(losses) colors = [WSJ_COLORS["red"], WSJ_COLORS["orange"]] for i, level in enumerate(var_levels): var_value = metrics.var(level) ax1.axvline( var_value, color=colors[i % len(colors)], linestyle="--", linewidth=1.5, label=f"VaR({level:.0%}): {format_currency(var_value)}", ) ax1.legend(loc="upper right") # Q-Q plot from scipy import stats stats.probplot(losses, dist="norm", plot=ax2) ax2.set_title("Q-Q Plot (Normal)") ax2.set_xlabel("Theoretical Quantiles") ax2.set_ylabel("Sample Quantiles") ax2.grid(True, alpha=0.3) # Format Q-Q plot y-axis ax2.yaxis.set_major_formatter(mticker.FuncFormatter(WSJFormatter.currency_formatter)) # Style the Q-Q plot line lines = ax2.get_lines() if lines: lines[0].set_color(WSJ_COLORS["blue"]) lines[0].set_marker("o") lines[0].set_markersize(3) lines[0].set_markerfacecolor(WSJ_COLORS["blue"]) lines[0].set_alpha(0.5) if len(lines) > 1: lines[1].set_color(WSJ_COLORS["red"]) lines[1].set_linewidth(2) plt.suptitle(title, fontsize=14, fontweight="bold") safe_tight_layout() return fig
[docs] def plot_return_period_curve( # pylint: disable=too-many-locals losses: Union[np.ndarray, pd.DataFrame], return_periods: Optional[np.ndarray] = None, scenarios: Optional[Dict[str, np.ndarray]] = None, title: str = "Return Period Curves", figsize: Tuple[int, int] = (10, 6), confidence_level: float = 0.95, show_grid: bool = True, ) -> Figure: """Create WSJ-style return period curve. Visualizes the relationship between return periods and loss magnitudes, commonly used in catastrophe modeling and risk assessment. Args: losses: Loss amounts (array or DataFrame) return_periods: Array of return periods (years), optional scenarios: Optional dict of scenario names to loss arrays title: Plot title figsize: Figure size (width, height) confidence_level: Confidence level for bands show_grid: Whether to show grid Returns: Matplotlib figure with return period curve Examples: >>> losses = np.random.lognormal(10, 2, 1000) >>> fig = plot_return_period_curve(losses) """ set_wsj_style() # Handle DataFrame input if isinstance(losses, pd.DataFrame): if "amount" in losses.columns: losses_array: np.ndarray = np.asarray(losses["amount"].values) else: # Use the first numeric column numeric_cols = losses.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: losses_array = np.asarray(losses[numeric_cols[0]].values) else: raise ValueError("DataFrame must have at least one numeric column") else: # Convert to numpy array if needed losses_array = np.asarray(losses) losses = losses_array # Calculate return periods if not provided if return_periods is None: # Sort losses in descending order sorted_losses = np.sort(losses)[::-1] n = len(sorted_losses) # Calculate empirical return periods return_periods = n / np.arange(1, n + 1) losses = sorted_losses else: # Ensure losses are sorted by return period sort_idx = np.argsort(return_periods) return_periods = return_periods[sort_idx] losses = losses[sort_idx] fig, ax = plt.subplots(figsize=figsize) # Plot main curve ax.semilogx( return_periods, losses / 1e6, "o-", color=WSJ_COLORS["blue"], linewidth=2.5, markersize=8, label="Base Case", markerfacecolor="white", markeredgewidth=2, ) # Plot additional scenarios if provided if scenarios: for i, (name, scenario_losses) in enumerate(scenarios.items()): ax.semilogx( return_periods, scenario_losses / 1e6, "o-", color=COLOR_SEQUENCE[(i + 1) % len(COLOR_SEQUENCE)], linewidth=2, markersize=6, label=name, alpha=0.8, ) ax.set_xlabel("Return Period (years)", fontsize=12) ax.set_ylabel("Loss Amount ($M)", fontsize=12) ax.set_title(title, fontsize=14, fontweight="bold") ax.grid(True, which="both", alpha=0.3) ax.legend(loc="upper left", frameon=True, fancybox=False, edgecolor=WSJ_COLORS["gray"]) # Add annotations for key return periods key_periods = [10, 100, 250] for period in key_periods: if period in return_periods: idx = np.where(return_periods == period)[0][0] loss_val = losses[idx] / 1e6 ax.annotate( f"{period}yr\n${loss_val:.1f}M", xy=(period, loss_val), xytext=(period, loss_val * 1.1), fontsize=9, ha="center", color=WSJ_COLORS["gray"], ) safe_tight_layout() return fig
[docs] def plot_insurance_layers( # pylint: disable=too-many-locals,too-many-statements,too-many-branches layers: Union[List[Dict[str, float]], pd.DataFrame], total_limit: Optional[float] = None, title: str = "Insurance Program Structure", figsize: Tuple[int, int] = (10, 6), losses: Optional[Union[np.ndarray, pd.DataFrame]] = None, loss_data: Optional[Union[np.ndarray, pd.DataFrame]] = None, show_expected_loss: bool = False, ) -> Figure: """Create WSJ-style insurance layer visualization. Visualizes insurance program structure with layer attachments, limits, and premium distribution in a two-panel display. Args: layers: List of layer dictionaries or DataFrame with 'attachment', 'limit' columns total_limit: Total program limit (calculated from layers if not provided) title: Plot title figsize: Figure size (width, height) losses: Optional loss data for overlay loss_data: Alias for losses parameter show_expected_loss: Whether to show expected loss line Returns: Matplotlib figure with layer structure and premium distribution Examples: >>> layers = [ ... {"attachment": 0, "limit": 5e6, "premium": 0.015}, ... {"attachment": 5e6, "limit": 10e6, "premium": 0.008}, ... ] >>> fig = plot_insurance_layers(layers) """ set_wsj_style() # Handle loss_data parameter (alias for losses) if loss_data is not None and losses is None: losses = loss_data # Handle DataFrame input if isinstance(layers, pd.DataFrame): # Check for empty DataFrame if layers.empty: fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize) ax1.text( 0.5, 0.5, "No layers defined", ha="center", va="center", transform=ax1.transAxes, fontsize=12, ) ax1.set_title("Layer Structure") ax2.text( 0.5, 0.5, "No layers defined", ha="center", va="center", transform=ax2.transAxes, fontsize=12, ) ax2.set_title("Premium Distribution") safe_tight_layout() return fig # Convert DataFrame to list of dicts layer_list = [] for _, row in layers.iterrows(): layer_dict = { "attachment": row.get("attachment", 0), "limit": row.get("limit", 0), "premium": row.get( "base_premium_rate", row.get("premium_rate", row.get("premium", 0)) ), } layer_list.append(layer_dict) layers = layer_list # Check for empty list if not layers: fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize) ax1.text( 0.5, 0.5, "No layers defined", ha="center", va="center", transform=ax1.transAxes, fontsize=12, ) ax1.set_title("Layer Structure") ax2.text( 0.5, 0.5, "No layers defined", ha="center", va="center", transform=ax2.transAxes, fontsize=12, ) ax2.set_title("Premium Distribution") safe_tight_layout() return fig # Calculate total limit if not provided if total_limit is None: total_limit = max(layer["attachment"] + layer["limit"] for layer in layers) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize) # Layer structure chart bottoms = [] heights = [] colors = [] labels = [] for i, layer in enumerate(layers): bottoms.append(layer["attachment"]) heights.append(layer["limit"]) colors.append(COLOR_SEQUENCE[i % len(COLOR_SEQUENCE)]) labels.append(f"Layer {i+1}") bars = ax1.bar( range(len(layers)), heights, bottom=bottoms, color=colors, alpha=0.7, edgecolor="black", linewidth=1, ) # Add layer annotations for i, (layer_bar, layer) in enumerate(zip(bars, layers)): height = layer_bar.get_height() bottom = layer_bar.get_y() # Layer info ax1.text( layer_bar.get_x() + layer_bar.get_width() / 2, bottom + height / 2, f'{format_currency(layer["limit"])}\n@ {layer["premium"]:.2%}', ha="center", va="center", fontsize=10, fontweight="bold", ) # Attachment point ax1.text( layer_bar.get_x() + layer_bar.get_width() / 2, bottom, f"{format_currency(bottom)}", ha="center", va="top", fontsize=9, color=WSJ_COLORS["gray"], ) ax1.set_ylabel("Coverage Level", fontsize=12) ax1.set_title("Layer Structure", fontsize=12) ax1.set_xticks(range(len(layers))) ax1.set_xticklabels(labels) ax1.yaxis.set_major_formatter(mticker.FuncFormatter(WSJFormatter.currency_formatter)) ax1.grid(True, axis="y", alpha=0.3) # Premium breakdown pie chart premiums = [layer["premium"] * layer["limit"] for layer in layers] _wedges, texts, autotexts = ax2.pie( premiums, labels=labels, colors=colors, autopct="%1.1f%%", startangle=90 ) # Style the pie chart for text in texts: text.set_fontsize(10) for autotext in autotexts: autotext.set_color("white") autotext.set_fontsize(10) autotext.set_fontweight("bold") ax2.set_title("Premium Distribution", fontsize=12) # Add loss overlay if provided if losses is not None: # Convert to numpy array if needed loss_values: np.ndarray if isinstance(losses, np.ndarray): # Handle numpy arrays first loss_values = losses.flatten() elif hasattr(losses, "values"): # Handle pandas DataFrame and Series values = getattr(losses, "values") if hasattr(values, "flatten"): loss_values = values.flatten() else: loss_values = values else: # Handle any other array-like objects loss_values = np.asarray(losses).flatten() # Add loss distribution overlay on the layer structure chart if len(loss_values) > 0 and show_expected_loss: # Calculate expected loss expected_loss = np.mean(loss_values) # Add horizontal line for expected loss ax1.axhline( y=expected_loss, color=WSJ_COLORS["orange"], linestyle="--", linewidth=2, label=f"Expected Loss: {format_currency(expected_loss)}", alpha=0.7, ) # Add text annotation for expected loss ax1.text( len(layers) - 0.5, expected_loss, f"Expected: {format_currency(expected_loss)}", va="bottom", ha="right", fontsize=9, color=WSJ_COLORS["orange"], fontweight="bold", ) # Add legend if we have the expected loss line ax1.legend(loc="upper left", framealpha=0.9) # Add total premium annotation total_premium = sum(premiums) fig.text( 0.5, 0.02, f"Total Annual Premium: {format_currency(total_premium)}", ha="center", fontsize=11, fontweight="bold", ) plt.suptitle(title, fontsize=14, fontweight="bold") safe_tight_layout() return fig
[docs] def plot_roe_ruin_frontier( # pylint: disable=too-many-locals,too-many-statements,too-many-branches results: Union[Dict[float, pd.DataFrame], pd.DataFrame], company_sizes: Optional[List[float]] = None, title: str = "ROE-Ruin Efficient Frontier", figsize: Tuple[int, int] = (12, 8), highlight_sweet_spots: bool = True, show_optimal_zones: bool = True, export_dpi: Optional[int] = None, log_scale_y: bool = True, grid: bool = True, annotations: bool = True, color_scheme: Optional[List[str]] = None, ) -> Figure: """Create ROE-Ruin efficient frontier visualization. Visualizes the Pareto frontier showing trade-offs between Return on Equity (ROE) and Ruin Probability for different company sizes. This helps executives understand optimal insurance purchasing decisions. Args: results: Either a dict of company_size (float) -> optimization results DataFrame, or a single DataFrame with 'company_size' column company_sizes: List of company sizes to plot (e.g., [1e6, 1e7, 1e8]) If None, will use all available sizes in results title: Plot title figsize: Figure size (width, height) highlight_sweet_spots: Whether to highlight knee points on curves show_optimal_zones: Whether to show shaded optimal zones export_dpi: DPI for export (150 for web, 300 for print, None for screen) log_scale_y: Whether to use log scale for ruin probability axis grid: Whether to show grid lines annotations: Whether to show annotations for key points color_scheme: List of colors for different company sizes Returns: Matplotlib figure with ROE-Ruin efficient frontier plots Raises: ValueError: If results format is invalid or no data available Examples: >>> # With dictionary of results >>> results = { ... 1e6: pd.DataFrame({'roe': [0.1, 0.15, 0.2], ... 'ruin_prob': [0.05, 0.02, 0.01]}), ... 1e7: pd.DataFrame({'roe': [0.08, 0.12, 0.18], ... 'ruin_prob': [0.03, 0.015, 0.008]}) ... } >>> fig = plot_roe_ruin_frontier(results) >>> # With single DataFrame >>> df = pd.DataFrame({ ... 'company_size': [1e6, 1e6, 1e7, 1e7], ... 'roe': [0.1, 0.15, 0.08, 0.12], ... 'ruin_prob': [0.05, 0.02, 0.03, 0.015] ... }) >>> fig = plot_roe_ruin_frontier(df) """ set_wsj_style() # Process input data data_dict = {} if isinstance(results, pd.DataFrame): # Single DataFrame with company_size column if "company_size" not in results.columns: raise ValueError("DataFrame must have 'company_size' column") for size in results["company_size"].unique(): size_data = results[results["company_size"] == size].copy() data_dict[size] = size_data elif isinstance(results, dict): # Dictionary of company_size -> DataFrame data_dict = results else: raise ValueError("Results must be DataFrame or dict of DataFrames") # Determine company sizes to plot if company_sizes is None: company_sizes = sorted(data_dict.keys()) else: # Filter to requested sizes that exist in data company_sizes = [s for s in company_sizes if s in data_dict] if not company_sizes: raise ValueError("No valid company sizes found in data") # Set up color scheme if color_scheme is None: # Default corporate blues with good contrast color_scheme = [ WSJ_COLORS["blue"], # Primary blue "#4A90E2", # Lighter blue "#1E3A8A", # Darker blue WSJ_COLORS["orange"], # Accent for contrast if > 3 sizes WSJ_COLORS["red"], # Additional contrast ] # Create figure fig, ax = plt.subplots(figsize=figsize) # Storage for sweet spots sweet_spots = [] # Plot each company size for idx, size in enumerate(company_sizes): df = data_dict[size] # Ensure required columns exist required_cols = ["roe", "ruin_prob"] if not all(col in df.columns for col in required_cols): # Try alternative column names roe_col = next( (c for c in df.columns if "roe" in c.lower() or "return" in c.lower()), None ) ruin_col = next((c for c in df.columns if "ruin" in c.lower()), None) if roe_col and ruin_col: df = df.rename(columns={roe_col: "roe", ruin_col: "ruin_prob"}) else: raise ValueError(f"Company size {size} missing ROE or ruin probability data") # Sort by ROE for proper curve drawing df = df.sort_values("roe") # Extract data roe_values = df["roe"].values * 100 # Convert to percentage ruin_values = df["ruin_prob"].values * 100 # Convert to percentage # Apply smoothing if enough points if len(roe_values) > 3: try: # Create smooth curve using spline interpolation x_smooth = np.linspace(roe_values.min(), roe_values.max(), 100) spline = make_interp_spline(roe_values, ruin_values, k=min(3, len(roe_values) - 1)) y_smooth = spline(x_smooth) y_smooth = np.clip(y_smooth, 0, 100) # Ensure values stay in valid range except Exception: # pylint: disable=broad-except # Fallback to linear interpolation on error x_smooth = roe_values y_smooth = ruin_values else: x_smooth = roe_values y_smooth = ruin_values # Plot the frontier curve color = color_scheme[idx % len(color_scheme)] label = f"${format_currency(size, decimals=0).replace('$', '')} Company" ax.plot(x_smooth, y_smooth, "-", color=color, linewidth=2.5, label=label) ax.scatter(roe_values, ruin_values, color=color, s=50, alpha=0.6, zorder=5) # Find and highlight sweet spot (knee point) if highlight_sweet_spots and len(roe_values) > 2: sweet_spot_idx = _find_knee_point(roe_values, ruin_values) sweet_roe = roe_values[sweet_spot_idx] sweet_ruin = ruin_values[sweet_spot_idx] sweet_spots.append((sweet_roe, sweet_ruin, size)) # Mark sweet spot ax.scatter( sweet_roe, sweet_ruin, color=color, s=200, marker="*", edgecolor="white", linewidth=2, zorder=10, ) # Add annotation if enabled if annotations: ax.annotate( f"Sweet Spot\n{sweet_roe:.1f}% ROE\n{sweet_ruin:.2f}% Risk", xy=(sweet_roe, sweet_ruin), xytext=(sweet_roe + 2, sweet_ruin * 1.5), fontsize=9, color=color, fontweight="bold", arrowprops={"arrowstyle": "->", "color": color, "alpha": 0.5, "lw": 1}, bbox={ "boxstyle": "round,pad=0.3", "facecolor": "white", "edgecolor": color, "alpha": 0.8, }, ) # Add optimal zones if requested if show_optimal_zones and sweet_spots: # Calculate optimal zone bounds based on sweet spots roe_range = [min(s[0] for s in sweet_spots) - 2, max(s[0] for s in sweet_spots) + 2] ruin_range = [min(s[1] for s in sweet_spots) * 0.5, max(s[1] for s in sweet_spots) * 2] # Add shaded optimal zone ax.axvspan(roe_range[0], roe_range[1], alpha=0.05, color="green", label="Optimal Zone") ax.axhspan(ruin_range[0], ruin_range[1], alpha=0.05, color="green") # Format axes ax.set_xlabel("Return on Equity (%)", fontsize=12) ax.set_ylabel("Ruin Probability (%)", fontsize=12) ax.set_title(title, fontsize=14, fontweight="bold") # Apply log scale to y-axis if requested if log_scale_y: ax.set_yscale("log") ax.yaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"{x:.2f}%")) else: ax.yaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"{x:.1f}%")) ax.xaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"{x:.0f}%")) # Add grid if requested if grid: ax.grid(True, which="both", alpha=0.3, linestyle="--") # Add legend ax.legend(loc="upper right", frameon=True, fancybox=False, edgecolor=WSJ_COLORS["gray"]) # Set axis limits for better visualization ax.set_xlim(left=0) if not log_scale_y: ax.set_ylim(bottom=0) # Adjust layout safe_tight_layout() # Set export DPI if specified if export_dpi: fig.dpi = export_dpi return fig
[docs] def plot_ruin_cliff( # pylint: disable=too-many-locals,too-many-statements,too-many-branches retention_range: Optional[Tuple[float, float]] = None, n_points: int = 50, company_size: float = 10_000_000, simulation_data: Optional[Dict[str, Any]] = None, title: str = "The Ruin Cliff: Retention vs Failure Risk", figsize: Tuple[int, int] = (14, 8), show_inset: bool = True, show_warnings: bool = True, show_3d_effect: bool = True, export_dpi: Optional[int] = None, ) -> Figure: """Create dramatic ruin cliff visualization with 3D effects. Visualizes the relationship between insurance retention (deductible) levels and ruin probability, highlighting the "cliff edge" where risk dramatically increases. Features 3D-style gradient effects and warning zones. Args: retention_range: Tuple of (min, max) retention values in dollars. Default: (10_000, 10_000_000) n_points: Number of points to calculate along retention axis company_size: Company asset size for scaling retention levels simulation_data: Optional pre-computed simulation results with keys: 'retentions', 'ruin_probs', 'confidence_intervals' title: Plot title figsize: Figure size (width, height) show_inset: Whether to show zoomed inset of critical region show_warnings: Whether to show warning callouts and annotations show_3d_effect: Whether to add 3D gradient background effects export_dpi: DPI for export (150 for web, 300 for print) Returns: Matplotlib figure with ruin cliff visualization Examples: >>> # Basic usage with synthetic data >>> fig = plot_ruin_cliff() >>> # With custom retention range >>> fig = plot_ruin_cliff(retention_range=(5000, 5_000_000)) >>> # With pre-computed simulation data >>> data = { ... 'retentions': np.logspace(4, 7, 50), ... 'ruin_probs': np.array([...]), ... } >>> fig = plot_ruin_cliff(simulation_data=data) Notes: The visualization uses a log scale for retention values to show the full range from small to large deductibles. The cliff edge is detected using derivative analysis to find the steepest point of increase in ruin probability. """ set_wsj_style() # Set default retention range if not provided if retention_range is None: retention_range = (10_000, 10_000_000) # Generate or use provided data if simulation_data is not None: retentions = simulation_data["retentions"] ruin_probs = simulation_data["ruin_probs"] else: # Generate synthetic demonstration data retentions = np.logspace( np.log10(retention_range[0]), np.log10(retention_range[1]), n_points ) # Create realistic ruin probability curve with cliff effect # Low retention = high ruin probability, high retention = low probability # but with a steep cliff in the middle log_ret = np.log10(retentions) log_ret_norm = (log_ret - log_ret.min()) / (log_ret.max() - log_ret.min()) # Sigmoid-like function with adjustable steepness for cliff effect cliff_center = 0.3 # Position of cliff (30% along log scale) cliff_steepness = 15 # How steep the cliff is base_curve = 1 / (1 + np.exp(cliff_steepness * (log_ret_norm - cliff_center))) # Add some noise and ensure realistic bounds noise = np.random.default_rng(42).normal(0, 0.01, len(retentions)) ruin_probs = np.clip(base_curve + noise, 0.001, 0.99) # Find cliff edge (steepest point) derivatives = np.gradient(ruin_probs, np.log10(retentions)) cliff_idx = np.argmax(np.abs(derivatives)) cliff_retention = retentions[cliff_idx] cliff_ruin = ruin_probs[cliff_idx] # Create figure fig, ax = plt.subplots(figsize=figsize) # Add 3D gradient effect background if requested if show_3d_effect: # Create gradient mesh for contour fill x_mesh = np.logspace(np.log10(retentions.min()), np.log10(retentions.max()), 100) y_mesh = np.linspace(0, 1, 100) X_mesh, Y_mesh = np.meshgrid(x_mesh, y_mesh) # Create Z values based on distance from the curve Z_mesh = np.zeros_like(X_mesh) for i, x_val in enumerate(x_mesh): # Interpolate ruin probability at this retention interp_ruin = np.interp(x_val, retentions, ruin_probs) for j, y_val in enumerate(y_mesh): # Distance from the curve determines intensity dist = abs(y_val - interp_ruin) Z_mesh[j, i] = 1 - dist # Create custom colormap for 3D effect from matplotlib.colors import LinearSegmentedColormap colors_3d = ["#ffffff", "#e8f4fd", "#c5e4fd", "#7ec0ee", "#4a90e2", "#ff6b6b"] n_bins = 100 cmap_3d = LinearSegmentedColormap.from_list("ruin_cliff", colors_3d, N=n_bins) # Add gradient background _contour = ax.contourf( X_mesh, Y_mesh, Z_mesh, levels=20, cmap=cmap_3d, alpha=0.3, antialiased=True ) # Define color zones based on ruin probability danger_threshold = 0.05 # 5% ruin probability warning_threshold = 0.02 # 2% ruin probability # Add danger zones ax.axhspan(danger_threshold, 1.0, alpha=0.1, color="red", label="Danger Zone (>5% risk)") ax.axhspan( warning_threshold, danger_threshold, alpha=0.1, color="orange", label="Warning Zone (2-5% risk)", ) ax.axhspan(0, warning_threshold, alpha=0.1, color="green", label="Safe Zone (<2% risk)") # Plot main curve with color gradient based on risk level for i in range(len(retentions) - 1): # Determine color based on ruin probability if ruin_probs[i] > danger_threshold: color = "#ff4444" # Red elif ruin_probs[i] > warning_threshold: color = "#ff8800" # Orange else: color = "#00aa00" # Green ax.plot(retentions[i : i + 2], ruin_probs[i : i + 2], color=color, linewidth=3, alpha=0.9) # Mark the cliff edge ax.scatter( cliff_retention, cliff_ruin, s=300, color="red", marker="o", edgecolor="darkred", linewidth=3, zorder=10, label=f"Cliff Edge: ${cliff_retention:,.0f}", ) # Add warning callouts if requested if show_warnings: # Cliff edge warning ax.annotate( f"[!] CLIFF EDGE\n${cliff_retention:,.0f} retention\n{cliff_ruin:.1%} ruin risk", xy=(cliff_retention, cliff_ruin), xytext=(cliff_retention * 3, cliff_ruin + 0.15), fontsize=11, fontweight="bold", color="darkred", bbox={ "boxstyle": "round,pad=0.5", "facecolor": "yellow", "edgecolor": "red", "alpha": 0.9, "linewidth": 2, }, arrowprops={ "arrowstyle": "-|>", "connectionstyle": "arc3,rad=0.3", "color": "red", "linewidth": 2, "shrinkA": 5, "shrinkB": 5, }, ) # Safe zone indicator safe_idx = np.where(ruin_probs < warning_threshold)[0] if len(safe_idx) > 0: safe_retention = retentions[safe_idx[0]] ax.annotate( f"[OK] Safe Zone\nRetention > ${safe_retention:,.0f}", xy=(safe_retention * 2, 0.01), fontsize=10, color="darkgreen", fontweight="bold", bbox={ "boxstyle": "round,pad=0.3", "facecolor": "lightgreen", "edgecolor": "green", "alpha": 0.8, }, ) # Add inset plot for critical region if requested if show_inset: # Define inset region around cliff inset_range = (cliff_retention * 0.3, cliff_retention * 3) inset_mask = (retentions >= inset_range[0]) & (retentions <= inset_range[1]) if np.any(inset_mask): # Create inset axes ax_inset = inset_axes(ax, width="40%", height="40%", loc="upper right", borderpad=3) # Plot zoomed region ax_inset.semilogx(retentions[inset_mask], ruin_probs[inset_mask], "b-", linewidth=2) ax_inset.scatter( cliff_retention, cliff_ruin, s=100, color="red", marker="o", edgecolor="darkred", linewidth=2, ) # Style inset ax_inset.set_xlabel("Retention ($)", fontsize=9) ax_inset.set_ylabel("Ruin Probability", fontsize=9) ax_inset.set_title("Critical Region Detail", fontsize=10, fontweight="bold") ax_inset.grid(True, alpha=0.3) ax_inset.xaxis.set_major_formatter( mticker.FuncFormatter(WSJFormatter.currency_formatter) ) ax_inset.yaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"{x:.1%}")) # Add shading to inset ax_inset.axhspan(danger_threshold, 1.0, alpha=0.2, color="red") ax_inset.axhspan(warning_threshold, danger_threshold, alpha=0.2, color="orange") # Format main axes ax.set_xscale("log") ax.set_xlabel("Retention Level (Deductible)", fontsize=13, fontweight="bold") ax.set_ylabel("10-Year Ruin Probability", fontsize=13, fontweight="bold") ax.set_title(title, fontsize=16, fontweight="bold", pad=20) # Format axes ax.xaxis.set_major_formatter(mticker.FuncFormatter(WSJFormatter.currency_formatter)) ax.yaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"{x:.0%}")) # Set sensible limits ax.set_xlim(retentions.min() * 0.8, retentions.max() * 1.2) ax.set_ylim(0, min(1.0, ruin_probs.max() * 1.1)) # Add grid ax.grid(True, which="both", alpha=0.3, linestyle="--") ax.grid(True, which="minor", alpha=0.1, linestyle=":") # Add legend ax.legend( loc="upper left", frameon=True, fancybox=False, edgecolor="black", framealpha=0.95, fontsize=10, ) # Add subtitle with company info fig.text( 0.5, 0.94, f"Company Size: {format_currency(company_size)} | Analysis: 10-Year Time Horizon", ha="center", fontsize=11, style="italic", color=WSJ_COLORS["gray"], ) # Adjust layout safe_tight_layout() # Set export DPI if specified if export_dpi: fig.dpi = export_dpi return fig
[docs] def plot_simulation_architecture( # pylint: disable=too-many-locals title: str = "Simulation Architecture Flow", figsize: Tuple[int, int] = (14, 8), export_dpi: Optional[int] = None, show_icons: bool = True, ) -> Figure: """Create simulation architecture flow diagram. Visualizes the data flow from parameters through simulation to insights using a clean flowchart style with boxes and arrows. Args: title: Plot title figsize: Figure size (width, height) export_dpi: DPI for export (150 for web, 300 for print) show_icons: Whether to show icons in boxes Returns: Matplotlib figure with architecture diagram Examples: >>> fig = plot_simulation_architecture() >>> fig.savefig("architecture.png", dpi=150) """ set_wsj_style() fig, ax = plt.subplots(figsize=figsize) # Define box positions and sizes box_width = 0.15 box_height = 0.12 _arrow_width = 0.02 # Main flow boxes (left to right) boxes = [ {"x": 0.1, "y": 0.5, "label": "Parameters\n& Config", "color": WSJ_COLORS["blue"]}, {"x": 0.35, "y": 0.5, "label": "Monte Carlo\nSimulation", "color": WSJ_COLORS["orange"]}, {"x": 0.6, "y": 0.5, "label": "Analysis\nEngine", "color": WSJ_COLORS["red"]}, {"x": 0.85, "y": 0.5, "label": "Insights &\nDecisions", "color": WSJ_COLORS["green"]}, ] # Sub-component boxes sub_boxes = [ { "x": 0.1, "y": 0.75, "label": "Company\nProfile", "color": WSJ_COLORS["gray"], "size": 0.8, }, { "x": 0.1, "y": 0.25, "label": "Insurance\nProgram", "color": WSJ_COLORS["gray"], "size": 0.8, }, { "x": 0.35, "y": 0.75, "label": "Loss\nGeneration", "color": WSJ_COLORS["gray"], "size": 0.8, }, { "x": 0.35, "y": 0.25, "label": "Financial\nDynamics", "color": WSJ_COLORS["gray"], "size": 0.8, }, { "x": 0.6, "y": 0.75, "label": "Ergodic\nCalculations", "color": WSJ_COLORS["gray"], "size": 0.8, }, {"x": 0.6, "y": 0.25, "label": "Risk\nMetrics", "color": WSJ_COLORS["gray"], "size": 0.8}, ] # Draw main boxes for box in boxes: rect = plt.Rectangle( (float(box["x"]) - box_width / 2, float(box["y"]) - box_height / 2), # type: ignore[arg-type] box_width, box_height, facecolor=box["color"], alpha=0.3, edgecolor=box["color"], linewidth=2, ) ax.add_patch(rect) # Add text ax.text( float(box["x"]), # type: ignore[arg-type] float(box["y"]), # type: ignore[arg-type] str(box["label"]), ha="center", va="center", fontsize=12, fontweight="bold", color=box["color"], ) # Draw sub-component boxes for box in sub_boxes: size_factor = box.get("size", 1.0) rect = plt.Rectangle( ( float(box["x"]) - box_width * float(size_factor) * 0.4, # type: ignore[arg-type] float(box["y"]) - box_height * float(size_factor) * 0.4, # type: ignore[arg-type] ), box_width * float(size_factor) * 0.8, # type: ignore[arg-type] box_height * float(size_factor) * 0.8, # type: ignore[arg-type] facecolor="white", alpha=0.8, edgecolor=box["color"], linewidth=1, linestyle="--", ) ax.add_patch(rect) ax.text( float(box["x"]), # type: ignore[arg-type] float(box["y"]), # type: ignore[arg-type] str(box["label"]), ha="center", va="center", fontsize=9, style="italic", color=box["color"], ) # Draw arrows between main boxes arrow_props = { "arrowstyle": "-|>", "connectionstyle": "arc3,rad=0", "color": WSJ_COLORS["gray"], "linewidth": 2, "alpha": 0.7, } for i in range(len(boxes) - 1): ax.annotate( "", xy=(float(boxes[i + 1]["x"]) - box_width / 2 - 0.01, float(boxes[i + 1]["y"])), # type: ignore[arg-type] xytext=(float(boxes[i]["x"]) + box_width / 2 + 0.01, float(boxes[i]["y"])), # type: ignore[arg-type] arrowprops=arrow_props, ) # Draw connecting arrows from sub-components thin_arrow_props = { "arrowstyle": "->", "connectionstyle": "arc3,rad=0", # Straight arrows, no curve "color": WSJ_COLORS["gray"], "linewidth": 1, "alpha": 0.5, } # Connect sub-boxes to their parent main boxes (downward arrows) sub_connections = [ (0.1, 0.75, 0.1, 0.5 + box_height / 2), # Company Profile -> Parameters (0.1, 0.25, 0.1, 0.5 - box_height / 2), # Insurance Program -> Parameters (0.35, 0.75, 0.35, 0.5 + box_height / 2), # Loss Generation -> Monte Carlo (0.35, 0.25, 0.35, 0.5 - box_height / 2), # Financial Dynamics -> Monte Carlo (0.6, 0.75, 0.6, 0.5 + box_height / 2), # Ergodic Calculations -> Analysis (0.6, 0.25, 0.6, 0.5 - box_height / 2), # Risk Metrics -> Analysis ] for x1, y1, x2, y2 in sub_connections: # Adjust arrow direction based on position if y1 > 0.5: # Box is above main box ax.annotate( "", xy=(x2, y2), xytext=(x1, y1 - box_height * 0.32), arrowprops=thin_arrow_props, ) else: # Box is below main box ax.annotate( "", xy=(x2, y2), xytext=(x1, y1 + box_height * 0.32), arrowprops=thin_arrow_props, ) # Add title at the top ax.text( 0.5, 0.95, title, ha="center", fontsize=14, fontweight="bold", color=WSJ_COLORS["black"], ) # Add annotation at the bottom ax.text( 0.5, 0.05, "Each stage transforms data to extract actionable business insights", ha="center", fontsize=10, color=WSJ_COLORS["gray"], ) # Style the plot ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.axis("off") ax.set_title(title, fontsize=16, fontweight="bold", pad=20) safe_tight_layout() # Set export DPI if specified if export_dpi: fig.dpi = export_dpi return fig
[docs] def plot_sample_paths( # pylint: disable=too-many-locals,too-many-branches,too-many-statements simulation_data: Optional[Dict[str, Any]] = None, n_paths: int = 5, short_horizon: int = 10, long_horizon: int = 100, company_size: float = 10_000_000, title: str = "Sample Path Visualization", figsize: Tuple[int, int] = (14, 8), show_failures: bool = True, export_dpi: Optional[int] = None, ) -> Figure: """Create sample path visualization showing trajectory evolution. Displays representative paths over short and long time horizons, highlighting survivors vs failed companies with transparency effects. Args: simulation_data: Optional pre-computed simulation results with paths n_paths: Number of paths to display (default 5) short_horizon: Years for short-term view (default 10) long_horizon: Years for long-term view (default 100) company_size: Starting company size title: Plot title figsize: Figure size (width, height) show_failures: Whether to highlight failed paths export_dpi: DPI for export (150 for web, 300 for print) Returns: Matplotlib figure with dual-panel path visualization Examples: >>> fig = plot_sample_paths(n_paths=5) >>> fig.savefig("sample_paths.png", dpi=150) """ set_wsj_style() fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize) # Generate or use provided data if simulation_data is not None: paths_short = simulation_data.get("paths_short", None) paths_long = simulation_data.get("paths_long", None) else: # Generate synthetic demonstration paths rng = np.random.default_rng(42) # Short horizon paths time_short = np.linspace(0, short_horizon, 100) paths_short = [] for i in range(n_paths): # Generate path with random walk and drift drift = rng.normal(0.08, 0.02) # Annual growth volatility = rng.uniform(0.15, 0.25) shocks = rng.normal(0, volatility, len(time_short)) cumulative = np.cumsum(drift / len(time_short) + shocks / np.sqrt(len(time_short))) path = company_size * np.exp(cumulative) # Randomly determine if path fails fails = rng.random() < 0.2 # 20% failure rate if fails and show_failures: fail_time = rng.uniform(short_horizon * 0.3, short_horizon * 0.9) fail_idx = int(fail_time / short_horizon * len(time_short)) path[fail_idx:] = path[fail_idx] * np.exp(-np.arange(len(path) - fail_idx) * 0.1) paths_short.append({"values": path, "failed": fails}) # Long horizon paths time_long = np.linspace(0, long_horizon, 500) paths_long = [] for i in range(n_paths): drift = rng.normal(0.08, 0.02) volatility = rng.uniform(0.15, 0.30) shocks = rng.normal(0, volatility, len(time_long)) cumulative = np.cumsum(drift / len(time_long) + shocks / np.sqrt(len(time_long))) path = company_size * np.exp(cumulative) fails = rng.random() < 0.3 # 30% failure rate long-term if fails and show_failures: fail_time = rng.uniform(long_horizon * 0.2, long_horizon * 0.8) fail_idx = int(fail_time / long_horizon * len(time_long)) path[fail_idx:] = path[fail_idx] * np.exp(-np.arange(len(path) - fail_idx) * 0.05) paths_long.append({"values": path, "failed": fails}) # Plot short horizon if paths_short is not None: for i, path_data in enumerate(paths_short): path = path_data["values"] failed = path_data.get("failed", False) if failed: color = WSJ_COLORS["red"] alpha = 0.7 linewidth = 1.5 label = "Failed" if i == 0 else None else: color = WSJ_COLORS["blue"] alpha = 0.8 linewidth = 2 label = "Survivor" if i == 0 else None ax1.plot( ( time_short if "time_short" in locals() else np.linspace(0, short_horizon, len(path)) ), path / 1e6, # Convert to millions color=color, alpha=alpha, linewidth=linewidth, label=label, ) # Add starting point marker ax1.scatter( 0, company_size / 1e6, s=100, color=WSJ_COLORS["green"], marker="o", zorder=10, label=f"Start: ${company_size/1e6:.0f}M", ) ax1.set_xlabel("Years", fontsize=11) ax1.set_ylabel("Company Value ($M)", fontsize=11) ax1.set_title(f"{short_horizon}-Year Horizon", fontsize=12, fontweight="bold") ax1.grid(True, alpha=0.3) ax1.legend(loc="upper left", fontsize=9) ax1.yaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"${x:.0f}M")) # Plot long horizon if paths_long is not None: for i, path_data in enumerate(paths_long): path = path_data["values"] failed = path_data.get("failed", False) if failed: color = WSJ_COLORS["red"] alpha = 0.6 linewidth = 1.5 else: color = WSJ_COLORS["blue"] alpha = 0.7 linewidth = 2 ax2.plot( time_long if "time_long" in locals() else np.linspace(0, long_horizon, len(path)), path / 1e6, color=color, alpha=alpha, linewidth=linewidth, ) # Add starting point ax2.scatter(0, company_size / 1e6, s=100, color=WSJ_COLORS["green"], marker="o", zorder=10) ax2.set_xlabel("Years", fontsize=11) ax2.set_ylabel("Company Value ($M)", fontsize=11) ax2.set_title(f"{long_horizon}-Year Horizon", fontsize=12, fontweight="bold") ax2.grid(True, alpha=0.3) ax2.yaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"${x:.0f}M")) # Use log scale for long horizon ax2.set_yscale("log") # Main title fig.suptitle(title, fontsize=14, fontweight="bold") # Add annotation fig.text( 0.5, 0.02, "Trajectories show potential outcomes: survivors grow exponentially, failures decline to zero", ha="center", fontsize=10, style="italic", color=WSJ_COLORS["gray"], ) safe_tight_layout() # Set export DPI if specified if export_dpi: fig.dpi = export_dpi return fig
[docs] def plot_optimal_coverage_heatmap( # pylint: disable=too-many-locals optimization_results: Optional[Dict[str, Any]] = None, company_sizes: Optional[List[float]] = None, title: str = "Optimal Coverage Heatmap", figsize: Tuple[int, int] = (16, 6), show_contours: bool = True, export_dpi: Optional[int] = None, ) -> Figure: """Create optimal insurance coverage heatmap for different company sizes. Visualizes the relationship between retention, limit, and growth rate using color intensity to show optimal configurations. Args: optimization_results: Optional pre-computed optimization data company_sizes: List of company sizes (default: [1e6, 1e7, 1e8]) title: Plot title figsize: Figure size (width, height) show_contours: Whether to show contour lines export_dpi: DPI for export (150 for web, 300 for print) Returns: Matplotlib figure with 3-panel heatmap Examples: >>> fig = plot_optimal_coverage_heatmap( ... company_sizes=[1e6, 1e7, 1e8] ... ) >>> fig.savefig("coverage_heatmap.png", dpi=150) """ set_wsj_style() if company_sizes is None: company_sizes = [1_000_000, 10_000_000, 100_000_000] fig, axes = plt.subplots(1, 3, figsize=figsize) # First pass: collect all data and determine global percentage ranges all_growth_rates = [] all_retention_pcts = [] all_limit_pcts = [] data_for_plots = [] for idx, company_size in enumerate(company_sizes): # Generate or use provided data if optimization_results is not None: data = optimization_results.get(f"company_{company_size}", None) if data is not None: retention_values = data["retention"] limit_values = data["limit"] growth_rates = data["growth_rate"] else: # Generate synthetic demonstration data retention_values = np.logspace(4, np.log10(company_size * 0.5), 20) limit_values = np.logspace(5, np.log10(company_size * 5), 20) # Create mesh grid R, L = np.meshgrid(retention_values, limit_values) # Synthetic growth rate function # Higher growth with moderate retention and appropriate limits optimal_retention = company_size * 0.01 # 1% of company size optimal_limit = company_size * 0.5 # 50% of company size growth_rates = 0.12 * np.exp( -0.5 * ( (np.log10(R) - np.log10(optimal_retention)) ** 2 / 1.5 + (np.log10(L) - np.log10(optimal_limit)) ** 2 / 2.0 ) ) # Add some noise growth_rates += np.random.default_rng(42 + idx).normal(0, 0.005, growth_rates.shape) # Convert to percentages retention_pcts = retention_values / company_size * 100 # As percentage limit_pcts = limit_values / company_size * 100 # As percentage # Store data for second pass data_for_plots.append( { "retention_pcts": retention_pcts, "limit_pcts": limit_pcts, "growth_rates": growth_rates, "company_size": company_size, } ) # Collect all data for unified scales all_growth_rates.append(growth_rates * 100) # Convert to percentage all_retention_pcts.extend(retention_pcts) all_limit_pcts.extend(limit_pcts) # Determine global ranges for consistent axes # Focus on the useful range around optimal configurations # Find the range that contains the high-growth regions (top 80% of growth rates) # Collect optimal regions for each company size optimal_retention_pcts = [] optimal_limit_pcts = [] for idx, plot_data in enumerate(data_for_plots): growth_rates = plot_data["growth_rates"] retention_pcts = plot_data["retention_pcts"] limit_pcts = plot_data["limit_pcts"] # Find the threshold for "good" growth (e.g., top 25% of growth rates for tighter focus) growth_threshold = np.percentile(growth_rates, 75) # Find retention and limit ranges where growth is above threshold R, L = np.meshgrid(retention_pcts, limit_pcts) good_growth_mask = growth_rates >= growth_threshold if np.any(good_growth_mask): optimal_retention_pcts.extend(R[good_growth_mask]) optimal_limit_pcts.extend(L[good_growth_mask]) # Set ranges to cover the optimal regions with some padding # Use percentiles to avoid outliers if optimal_retention_pcts: min_retention_pct = max(0, np.percentile(optimal_retention_pcts, 10) * 0.7) max_retention_pct = min(100, np.percentile(optimal_retention_pcts, 90) * 1.3) else: # Fallback to reasonable defaults if no optimal region found min_retention_pct = 0.1 # 0.1% max_retention_pct = 20 # 20% if optimal_limit_pcts: min_limit_pct = max(0, np.percentile(optimal_limit_pcts, 10) * 0.7) max_limit_pct = min(1000, np.percentile(optimal_limit_pcts, 90) * 1.3) else: # Fallback to reasonable defaults min_limit_pct = 10 # 10% max_limit_pct = 200 # 200% # Ensure we have reasonable minimum ranges if max_retention_pct - min_retention_pct < 5: # Ensure at least 5% range max_retention_pct = min_retention_pct + 5 if max_limit_pct - min_limit_pct < 50: # Ensure at least 50% range max_limit_pct = min_limit_pct + 50 # Round to nice numbers for cleaner axes min_retention_pct = np.floor(min_retention_pct * 2) / 2 # Round down to nearest 0.5% max_retention_pct = np.ceil(max_retention_pct * 2) / 2 # Round up to nearest 0.5% min_limit_pct = np.floor(min_limit_pct / 10) * 10 # Round down to nearest 10% max_limit_pct = np.ceil(max_limit_pct / 10) * 10 # Round up to nearest 10% # Determine global min and max for unified color scale vmin = min(np.min(rates) for rates in all_growth_rates) vmax = max(np.max(rates) for rates in all_growth_rates) # Create unified levels for all plots levels = np.linspace(vmin, vmax, 20) # Define consistent percentage grid for interpolation # Use linear scale for percentages (not log) for clearer interpretation common_retention_pcts = np.linspace(min_retention_pct, max_retention_pct, 50) common_limit_pcts = np.linspace(min_limit_pct, max_limit_pct, 50) # Second pass: create plots with unified scale and axes for idx, (ax, plot_data) in enumerate(zip(axes, data_for_plots)): retention_pcts = plot_data["retention_pcts"] limit_pcts = plot_data["limit_pcts"] growth_rates = plot_data["growth_rates"] company_size = plot_data["company_size"] # Create common mesh grid R_common, L_common = np.meshgrid(common_retention_pcts, common_limit_pcts) # Interpolate growth rates to common grid from scipy.interpolate import griddata # Create points from original data R_orig, L_orig = np.meshgrid(retention_pcts, limit_pcts) points = np.column_stack((R_orig.ravel(), L_orig.ravel())) values = (growth_rates * 100).ravel() # Convert to percentage # Interpolate to common grid growth_interp = griddata( points, values, (R_common, L_common), method="cubic", fill_value=np.nan ) # Create heatmap with unified scale and axes im = ax.contourf( R_common, L_common, growth_interp, levels=levels, cmap="RdYlGn", alpha=0.8, vmin=vmin, vmax=vmax, extend="both", ) # Add contour lines if requested if show_contours: contours = ax.contour( R_common, L_common, growth_interp, levels=5, colors="black", alpha=0.3, linewidths=1, ) ax.clabel(contours, inline=True, fontsize=8, fmt="%.1f%%") # Find and mark optimal point in original data max_idx = np.unravel_index(growth_rates.argmax(), growth_rates.shape) ax.scatter( retention_pcts[max_idx[1]], limit_pcts[max_idx[0]], s=200, color="white", marker="*", edgecolor="black", linewidth=2, zorder=10, label="Optimal", ) # Set consistent axes limits for all plots ax.set_xlim(min_retention_pct, max_retention_pct) ax.set_ylim(min_limit_pct, max_limit_pct) # Format axes labels ax.set_xlabel("Retention (% of Company Size)", fontsize=10) if idx == 0: ax.set_ylabel("Coverage Limit (% of Company Size)", fontsize=10) # Format tick labels as percentages ax.xaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"{x:.0f}%")) ax.yaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"{x:.0f}%")) # Set consistent tick locations x_ticks = np.linspace(0, max_retention_pct, 6) y_ticks = np.linspace(0, max_limit_pct, 6) ax.set_xticks(x_ticks) ax.set_yticks(y_ticks) # Add company size label ax.set_title( f"${format_currency(company_size, decimals=0).replace('$', '')} Company", fontsize=11, fontweight="bold", ) # Add colorbar only to the rightmost plot if idx == 2: # Only for the last (rightmost) plot cbar = plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04) cbar.set_label("Growth Rate (%)", fontsize=9) ax.grid(True, alpha=0.2, which="both") ax.legend(loc="upper left", fontsize=8) plt.suptitle(title, fontsize=14, fontweight="bold") safe_tight_layout() # Set export DPI if specified if export_dpi: fig.dpi = export_dpi return fig
[docs] def plot_sensitivity_tornado( # pylint: disable=too-many-locals sensitivity_data: Optional[Dict[str, float]] = None, baseline_value: Optional[float] = None, title: str = "Sensitivity Analysis - Tornado Chart", figsize: Tuple[int, int] = (10, 8), show_percentages: bool = True, export_dpi: Optional[int] = None, ) -> Figure: """Create tornado chart showing parameter sensitivity analysis. Visualizes the impact of parameter variations on key metrics using horizontal bars sorted by influence magnitude. Args: sensitivity_data: Dict of parameter names to impact values baseline_value: Baseline metric value for reference title: Plot title figsize: Figure size (width, height) show_percentages: Whether to show percentage labels export_dpi: DPI for export (150 for web, 300 for print) Returns: Matplotlib figure with tornado chart Examples: >>> sensitivity = { ... "Premium Rate": 0.15, ... "Loss Frequency": -0.12, ... "Loss Severity": -0.08, ... } >>> fig = plot_sensitivity_tornado(sensitivity) """ set_wsj_style() # Generate or use provided data if sensitivity_data is None: # Generate synthetic demonstration data sensitivity_data = { "Premium Rate": 0.18, "Loss Frequency": -0.15, "Loss Severity": -0.12, "Retention Level": 0.10, "Coverage Limit": 0.08, "Investment Return": 0.07, "Operating Margin": 0.06, "Tax Rate": -0.05, "Working Capital": -0.04, "Growth Rate": 0.03, } if baseline_value is None: baseline_value = 0.12 # 12% ROE baseline # Sort by absolute impact sorted_params = sorted(sensitivity_data.items(), key=lambda x: abs(x[1]), reverse=True) fig, ax = plt.subplots(figsize=figsize) # Prepare data params = [item[0] for item in sorted_params] impacts = [item[1] for item in sorted_params] # Calculate bar positions y_pos = np.arange(len(params)) # Color coding based on impact magnitude colors = [] for impact in impacts: abs_impact = abs(impact) if abs_impact >= 0.10: # High sensitivity (>10%) colors.append(WSJ_COLORS["red"]) elif abs_impact >= 0.05: # Moderate sensitivity (5-10%) colors.append(WSJ_COLORS["orange"]) else: # Low sensitivity (<5%) colors.append(WSJ_COLORS["green"]) # Create bars bars = ax.barh(y_pos, impacts, color=colors, alpha=0.7, edgecolor="black", linewidth=1) # Add baseline reference line ax.axvline(x=0, color=WSJ_COLORS["gray"], linewidth=2, linestyle="-", alpha=0.5) ax.text( 0, len(params), "Baseline", ha="center", va="bottom", fontsize=10, color=WSJ_COLORS["gray"], fontweight="bold", ) # Add percentage labels if show_percentages: for bar_item, _param, impact in zip(bars, params, impacts): width = bar_item.get_width() label_x = width * 1.02 if width > 0 else width * 1.02 ax.text( label_x, bar_item.get_y() + bar_item.get_height() / 2, f"{impact:+.1%}", ha="left" if width > 0 else "right", va="center", fontsize=9, fontweight="bold", ) # Format axes ax.set_yticks(y_pos) ax.set_yticklabels(params) ax.set_xlabel("Impact on Key Metric (%)", fontsize=11) ax.set_title(title, fontsize=14, fontweight="bold", pad=20) # Format x-axis as percentage ax.xaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"{x:+.0%}")) # Add grid ax.grid(True, axis="x", alpha=0.3) # Add legend from matplotlib.patches import Patch legend_elements = [ Patch(facecolor=WSJ_COLORS["red"], alpha=0.7, label="High Sensitivity (>10%)"), Patch(facecolor=WSJ_COLORS["orange"], alpha=0.7, label="Moderate (5-10%)"), Patch(facecolor=WSJ_COLORS["green"], alpha=0.7, label="Low (<5%)"), ] ax.legend(handles=legend_elements, loc="lower right", fontsize=9) # Add annotation fig.text( 0.5, 0.02, f"Baseline Value: {baseline_value:.1%} | Analysis shows ±20% parameter variation impact", ha="center", fontsize=10, style="italic", color=WSJ_COLORS["gray"], ) safe_tight_layout() # Set export DPI if specified if export_dpi: fig.dpi = export_dpi return fig
[docs] def plot_robustness_heatmap( # pylint: disable=too-many-locals,too-many-statements robustness_data: Optional[np.ndarray] = None, frequency_range: Optional[Tuple[float, float]] = None, severity_range: Optional[Tuple[float, float]] = None, title: str = "Insurance Program Robustness Analysis", figsize: Tuple[int, int] = (10, 8), show_reference: bool = True, export_dpi: Optional[int] = None, ) -> Figure: """Create robustness heatmap showing stability across parameter variations. Visualizes how optimal coverage changes with variations in loss frequency and severity parameters. Args: robustness_data: 2D array of stability metrics frequency_range: Tuple of (min, max) frequency variation (default 0.7-1.3) severity_range: Tuple of (min, max) severity variation (default 0.7-1.3) title: Plot title figsize: Figure size (width, height) show_reference: Whether to show reference point at 100%/100% export_dpi: DPI for export (150 for web, 300 for print) Returns: Matplotlib figure with robustness heatmap Examples: >>> fig = plot_robustness_heatmap() >>> fig.savefig("robustness.png", dpi=150) """ set_wsj_style() if frequency_range is None: frequency_range = (0.7, 1.3) if severity_range is None: severity_range = (0.7, 1.3) fig, ax = plt.subplots(figsize=figsize) # Generate or use provided data if robustness_data is None: # Generate synthetic demonstration data n_points = 20 freq_values = np.linspace(frequency_range[0], frequency_range[1], n_points) sev_values = np.linspace(severity_range[0], severity_range[1], n_points) F, S = np.meshgrid(freq_values, sev_values) # Create stability metric (1 = stable, 0 = unstable) # More stable near baseline (1.0, 1.0) distance_from_baseline = np.sqrt((F - 1.0) ** 2 + (S - 1.0) ** 2) robustness_data = np.exp(-2 * distance_from_baseline) # Add some structure/noise robustness_data = robustness_data + 0.1 * np.sin(5 * F) * np.cos(5 * S) robustness_data = cast(np.ndarray, robustness_data) # Guaranteed not None after assignment robustness_data = np.clip(robustness_data, 0, 1) else: n_points = robustness_data.shape[0] freq_values = np.linspace(frequency_range[0], frequency_range[1], n_points) sev_values = np.linspace(severity_range[0], severity_range[1], n_points) F, S = np.meshgrid(freq_values, sev_values) # Create heatmap im = ax.contourf( F * 100, # Convert to percentage S * 100, robustness_data, levels=20, cmap="RdYlGn", alpha=0.8, ) # Add contour lines contours = ax.contour( F * 100, S * 100, robustness_data, levels=[0.2, 0.4, 0.6, 0.8], colors="black", alpha=0.3, linewidths=1, ) ax.clabel(contours, inline=True, fontsize=8, fmt="%.1f") # Mark reference point if requested if show_reference: ax.scatter( 100, 100, # Baseline at 100% s=200, color="white", marker="o", edgecolor="black", linewidth=3, zorder=10, label="Baseline", ) # Add crosshairs ax.axhline(y=100, color=WSJ_COLORS["gray"], linestyle="--", alpha=0.5, linewidth=1) ax.axvline(x=100, color=WSJ_COLORS["gray"], linestyle="--", alpha=0.5, linewidth=1) # Add regions # Stable region (robustness > 0.7) assert robustness_data is not None # Guaranteed by branches above stable_mask = robustness_data > 0.7 if np.any(stable_mask): ax.contour( F * 100, S * 100, stable_mask.astype(float), levels=[0.5], colors=WSJ_COLORS["green"], linewidths=2, linestyles="-", alpha=0.7, ) # Find center of stable region stable_indices = np.where(stable_mask) if len(stable_indices[0]) > 0: center_y = S[stable_indices] * 100 center_x = F[stable_indices] * 100 ax.text( center_x.mean(), center_y.mean(), "STABLE\nREGION", ha="center", va="center", fontsize=11, fontweight="bold", color=WSJ_COLORS["green"], alpha=0.7, ) # Format axes ax.set_xlabel("Loss Frequency Variation (%)", fontsize=11) ax.set_ylabel("Loss Severity Variation (%)", fontsize=11) ax.set_title(title, fontsize=14, fontweight="bold", pad=20) # Set axis limits ax.set_xlim(frequency_range[0] * 100, frequency_range[1] * 100) ax.set_ylim(severity_range[0] * 100, severity_range[1] * 100) # Format tick labels ax.xaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"{x:.0f}%")) ax.yaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"{x:.0f}%")) # Add colorbar cbar = plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04) cbar.set_label("Coverage Stability (0=Unstable, 1=Stable)", fontsize=10) # Add grid ax.grid(True, alpha=0.2) # Add legend only if there are labeled artists handles, _labels = ax.get_legend_handles_labels() if handles: ax.legend(loc="upper right", fontsize=9) # Add annotations fig.text( 0.5, 0.02, "Green regions indicate robust insurance configurations that remain optimal despite parameter uncertainty", ha="center", fontsize=10, style="italic", color=WSJ_COLORS["gray"], ) safe_tight_layout() # Set export DPI if specified if export_dpi: fig.dpi = export_dpi return fig
[docs] def plot_premium_multiplier( # pylint: disable=too-many-locals,too-many-statements optimization_results: Optional[Dict[float, Dict[str, Any]]] = None, company_sizes: Optional[List[float]] = None, title: str = "Premium Multiplier Analysis", figsize: Tuple[int, int] = (12, 8), show_confidence: bool = True, show_reference_lines: bool = True, show_annotations: bool = True, export_dpi: Optional[int] = None, ) -> Figure: """Create premium multiplier analysis visualization. Visualizes the optimal premium as a multiple of expected loss for different company sizes, demonstrating why premiums 2-5× expected losses are optimal from an ergodic perspective. Args: optimization_results: Dict of company_size -> optimization data containing 'expected_loss', 'optimal_premium', and optionally 'confidence_bounds' for each company size company_sizes: List of company sizes to analyze (default: [1e6, 1e7, 1e8]) title: Plot title figsize: Figure size (width, height) show_confidence: Whether to show confidence intervals show_reference_lines: Whether to show horizontal reference lines show_annotations: Whether to add explanatory annotations export_dpi: DPI for export (150 for web, 300 for print) Returns: Matplotlib figure with premium multiplier analysis Examples: >>> results = { ... 1e6: {'expected_loss': 50000, 'optimal_premium': 150000}, ... 1e7: {'expected_loss': 200000, 'optimal_premium': 600000} ... } >>> fig = plot_premium_multiplier(results) """ set_wsj_style() if company_sizes is None: company_sizes = [1_000_000, 10_000_000, 100_000_000] fig, ax = plt.subplots(figsize=figsize) # Generate or use provided data if optimization_results is None: # Generate synthetic demonstration data optimization_results = {} for size in company_sizes: # Expected loss scales with company size expected_loss = size * 0.005 # 0.5% of company size # Optimal premium multiplier varies by company size # Smaller companies need higher multiples for safety size_factor = np.log10(size / 1_000_000) base_multiplier = 3.5 - 0.3 * size_factor # Decreases with size # Add some structure optimal_premium = expected_loss * base_multiplier # Generate confidence bounds confidence_lower = optimal_premium * 0.85 confidence_upper = optimal_premium * 1.15 optimization_results[size] = { "expected_loss": expected_loss, "optimal_premium": optimal_premium, "confidence_bounds": (confidence_lower, confidence_upper), "percentiles": { "25": optimal_premium * 0.9, "75": optimal_premium * 1.1, }, } # Prepare data for plotting sizes_log = np.array([np.log10(s) for s in company_sizes]) multipliers = [] confidence_lower = [] confidence_upper = [] for size in company_sizes: data = optimization_results[size] multiplier = data["optimal_premium"] / data["expected_loss"] multipliers.append(multiplier) if "confidence_bounds" in data and show_confidence: confidence_lower.append(data["confidence_bounds"][0] / data["expected_loss"]) confidence_upper.append(data["confidence_bounds"][1] / data["expected_loss"]) # Create smooth interpolation for better visualization sizes_smooth = np.linspace(sizes_log.min(), sizes_log.max(), 100) # Use cubic spline for smooth curve spline = make_interp_spline(sizes_log, multipliers, k=min(3, len(multipliers) - 1)) multipliers_smooth = spline(sizes_smooth) # Convert back to actual company sizes for x-axis company_sizes_smooth = 10**sizes_smooth # Main curve ax.semilogx( company_sizes_smooth, multipliers_smooth, color=WSJ_COLORS["blue"], linewidth=3, label="Optimal Premium/Expected Loss", zorder=5, ) # Data points ax.scatter( company_sizes, multipliers, color=WSJ_COLORS["blue"], s=100, edgecolor="white", linewidth=2, zorder=10, ) # Confidence intervals if confidence_lower and show_confidence: # Interpolate confidence bounds spline_lower = make_interp_spline( sizes_log, confidence_lower, k=min(3, len(confidence_lower) - 1) ) spline_upper = make_interp_spline( sizes_log, confidence_upper, k=min(3, len(confidence_upper) - 1) ) confidence_lower_smooth = spline_lower(sizes_smooth) confidence_upper_smooth = spline_upper(sizes_smooth) ax.fill_between( company_sizes_smooth, confidence_lower_smooth, confidence_upper_smooth, alpha=0.2, color=WSJ_COLORS["blue"], label="95% Confidence Interval", ) # Reference lines if show_reference_lines: reference_levels = [1, 2, 3, 5] colors = [WSJ_COLORS["gray"], WSJ_COLORS["orange"], WSJ_COLORS["green"], WSJ_COLORS["red"]] styles = [":", "--", "--", "--"] for level, color, style in zip(reference_levels, colors, styles): ax.axhline( y=level, color=color, linestyle=style, alpha=0.5, linewidth=1.5, label=f"{level}× Expected Loss", ) # Annotations if show_annotations: # Annotate key insight regions ax.annotate( "Small Companies\nNeed Higher\nMultiples", xy=(company_sizes[0], multipliers[0]), xytext=(company_sizes[0] * 0.3, multipliers[0] + 0.5), fontsize=10, color=WSJ_COLORS["gray"], fontweight="bold", ha="center", arrowprops={"arrowstyle": "->", "color": WSJ_COLORS["gray"], "alpha": 0.5, "lw": 1}, ) if len(company_sizes) > 2: ax.annotate( "Large Companies\nCan Accept\nLower Multiples", xy=(company_sizes[-1], multipliers[-1]), xytext=(company_sizes[-1] * 1.5, multipliers[-1] - 0.5), fontsize=10, color=WSJ_COLORS["gray"], fontweight="bold", ha="center", arrowprops={"arrowstyle": "->", "color": WSJ_COLORS["gray"], "alpha": 0.5, "lw": 1}, ) # Add shaded optimal zone ax.axhspan(2, 5, alpha=0.05, color="green", label="Optimal Zone (2-5×)") # Format axes ax.set_xlabel("Company Size (Assets)", fontsize=12) ax.set_ylabel("Premium Multiple (Premium / Expected Loss)", fontsize=12) ax.set_title(title, fontsize=14, fontweight="bold") # Format x-axis ax.xaxis.set_major_formatter(mticker.FuncFormatter(WSJFormatter.currency_formatter)) # Format y-axis ax.yaxis.set_major_formatter(mticker.FuncFormatter(lambda x, p: f"{x:.1f}×")) # Grid ax.grid(True, which="both", alpha=0.3) ax.grid(True, which="minor", alpha=0.1) # Legend ax.legend(loc="upper right", frameon=True, fancybox=False, edgecolor=WSJ_COLORS["gray"]) # Set reasonable y-limits ax.set_ylim(0, max(multipliers) * 1.3) safe_tight_layout() # Set export DPI if specified if export_dpi: fig.dpi = export_dpi return fig
[docs] def plot_breakeven_timeline( # pylint: disable=too-many-locals,too-many-statements,too-many-branches simulation_results: Optional[Dict[float, Dict[str, Any]]] = None, company_sizes: Optional[List[float]] = None, time_horizon: int = 30, title: str = "Insurance Break-even Timeline Analysis", figsize: Tuple[int, int] = (14, 8), show_percentiles: bool = True, show_breakeven_markers: bool = True, export_dpi: Optional[int] = None, ) -> Figure: """Create break-even timeline visualization. Shows when the cumulative benefits of optimal insurance exceed the cumulative excess premiums paid (premiums above expected losses), demonstrating the long-term value proposition. Args: simulation_results: Dict of company_size -> simulation data containing 'cumulative_benefit', 'cumulative_excess_premium', and optionally percentile data company_sizes: List of company sizes to analyze (default: [1e6, 1e7, 1e8]) time_horizon: Years to simulate (default: 30) title: Plot title figsize: Figure size (width, height) show_percentiles: Whether to show 25th/75th percentile bands show_breakeven_markers: Whether to mark break-even points export_dpi: DPI for export (150 for web, 300 for print) Returns: Matplotlib figure with break-even timeline analysis Examples: >>> results = { ... 1e6: { ... 'cumulative_benefit': np.array([...]), ... 'cumulative_excess_premium': np.array([...]) ... } ... } >>> fig = plot_breakeven_timeline(results) """ set_wsj_style() if company_sizes is None: company_sizes = [1_000_000, 10_000_000, 100_000_000] # Create subplot grid n_companies = len(company_sizes) fig, axes = plt.subplots(1, n_companies, figsize=figsize, sharey=True) if n_companies == 1: axes = [axes] # Generate or use provided data if simulation_results is None: # Generate synthetic demonstration data simulation_results = {} rng = np.random.default_rng(42) for size in company_sizes: years = np.arange(time_horizon) # Expected annual loss and optimal premium expected_loss = size * 0.005 # 0.5% of size size_factor = np.log10(size / 1_000_000) premium_multiplier = 3.5 - 0.3 * size_factor optimal_premium = expected_loss * premium_multiplier excess_premium = optimal_premium - expected_loss # Cumulative excess premium paid (linear accumulation) cumulative_excess = excess_premium * (years + 1) # Cumulative benefit (grows exponentially due to avoided ruin) # Starts slow, accelerates over time base_benefit = size * 0.001 # 0.1% annual benefit initially growth_factor = 1.08 # Benefit compounds cumulative_benefit = base_benefit * ( (growth_factor ** (years + 1) - 1) / (growth_factor - 1) ) # Add some realistic noise noise = rng.normal(0, size * 0.0005, len(years)) cumulative_benefit += np.cumsum(noise) # Calculate percentiles benefit_25 = cumulative_benefit * 0.85 benefit_75 = cumulative_benefit * 1.15 simulation_results[size] = { "years": years, "cumulative_benefit": cumulative_benefit, "cumulative_excess_premium": cumulative_excess, "benefit_25": benefit_25, "benefit_75": benefit_75, "net_benefit": cumulative_benefit - cumulative_excess, } # Plot each company size for idx, (ax, company_size) in enumerate(zip(axes, company_sizes)): data = simulation_results[company_size] years = data.get("years", np.arange(time_horizon)) # Main lines ax.plot( years, data["cumulative_benefit"] / 1e6, color=WSJ_COLORS["blue"], linewidth=2.5, label="Cumulative Benefit", ) ax.plot( years, data["cumulative_excess_premium"] / 1e6, color=WSJ_COLORS["red"], linewidth=2.5, label="Excess Premium Paid", ) # Percentile bands if show_percentiles and "benefit_25" in data: ax.fill_between( years, data["benefit_25"] / 1e6, data["benefit_75"] / 1e6, alpha=0.2, color=WSJ_COLORS["blue"], label="25-75% Percentile", ) # Find break-even point net_benefit = data["cumulative_benefit"] - data["cumulative_excess_premium"] breakeven_idx = np.where(net_benefit > 0)[0] if len(breakeven_idx) > 0 and show_breakeven_markers: breakeven_year = years[breakeven_idx[0]] breakeven_benefit = data["cumulative_benefit"][breakeven_idx[0]] # Mark break-even point ax.scatter( breakeven_year, breakeven_benefit / 1e6, s=200, color=WSJ_COLORS["green"], marker="*", edgecolor="white", linewidth=2, zorder=10, label=f"Break-even: Year {breakeven_year}", ) # Add vertical line at break-even ax.axvline( x=breakeven_year, color=WSJ_COLORS["green"], linestyle="--", alpha=0.5, linewidth=1.5, ) # Add annotation ax.annotate( f"Break-even\nYear {breakeven_year}", xy=(breakeven_year, breakeven_benefit / 1e6), xytext=(breakeven_year + 2, breakeven_benefit / 1e6 * 0.8), fontsize=9, fontweight="bold", color=WSJ_COLORS["green"], arrowprops={ "arrowstyle": "->", "color": WSJ_COLORS["green"], "alpha": 0.5, "lw": 1, }, ) # Shade positive NPV region positive_region = net_benefit > 0 if np.any(positive_region): ax.fill_between( years, 0, (data["cumulative_benefit"] - data["cumulative_excess_premium"]) / 1e6, where=positive_region, alpha=0.1, color="green", label="Positive NPV", ) # Format axes ax.set_xlabel("Years", fontsize=11) if idx == 0: ax.set_ylabel("Cumulative Value ($M)", fontsize=11) # Title for each subplot ax.set_title( f"${format_currency(company_size, decimals=0).replace('$', '')} Company", fontsize=11, fontweight="bold", ) # Grid ax.grid(True, alpha=0.3) # Legend (only on first subplot to avoid clutter) if idx == 0: ax.legend(loc="upper left", fontsize=9) # Set reasonable limits ax.set_xlim(0, time_horizon) ax.set_ylim(bottom=0) # Main title fig.suptitle(title, fontsize=14, fontweight="bold") # Add footer annotation fig.text( 0.5, 0.02, "Analysis shows when insurance transforms from cost to investment through compound benefits", ha="center", fontsize=10, style="italic", color=WSJ_COLORS["gray"], ) safe_tight_layout() # Set export DPI if specified if export_dpi: fig.dpi = export_dpi return fig
def _find_knee_point(x: np.ndarray, y: np.ndarray) -> int: """Find the knee point (elbow) in a curve using curvature analysis. Args: x: X-axis values (ROE) y: Y-axis values (Ruin probability) Returns: Index of the knee point """ # Normalize data x_norm = (x - x.min()) / (x.max() - x.min() + 1e-10) y_norm = (y - y.min()) / (y.max() - y.min() + 1e-10) # Calculate distances from each point to the line between start and end start = np.array([x_norm[0], y_norm[0]]) end = np.array([x_norm[-1], y_norm[-1]]) distances = [] for x_val, y_val in zip(x_norm, y_norm): point = np.array([x_val, y_val]) # Distance from point to line using 2D cross product formula # For 2D vectors, cross product gives scalar: (a × b) = ax*by - ay*bx line_vec = end - start point_vec = point - start cross_prod = line_vec[0] * point_vec[1] - line_vec[1] * point_vec[0] dist = np.abs(cross_prod) / np.linalg.norm(line_vec) distances.append(dist) # The knee point is where distance is maximum knee_idx = np.argmax(distances) # Refine by looking for the point where curvature changes most if len(x) > 5: # Calculate curvature using second derivative approximation curvature = np.abs(np.gradient(np.gradient(y_norm))) # Find local maxima in curvature maxima = argrelextrema(curvature, np.greater)[0] if len(maxima) > 0: # Choose the maximum closest to our initial estimate distances_to_knee = np.abs(maxima - knee_idx) knee_idx = maxima[np.argmin(distances_to_knee)] return int(knee_idx)