"""Executive report generation for insurance optimization analysis.
This module provides the ExecutiveReport class that generates concise,
high-level reports targeted at executive audiences.
"""
import logging
from pathlib import Path
from typing import Any, Dict, Optional
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from ..risk_metrics import RiskMetrics
from ..visualization.style_manager import StyleManager
from .config import FigureConfig, ReportConfig, create_executive_config
from .report_builder import ReportBuilder
logger = logging.getLogger(__name__)
[docs]
class ExecutiveReport(ReportBuilder):
"""Generate executive summary reports.
This class creates concise, visually-rich reports designed for
executive audiences, focusing on key findings, recommendations,
and decision-critical information.
Attributes:
results: Simulation or analysis results.
style_manager: Visualization style manager.
key_metrics: Dictionary of key performance metrics.
"""
def __init__(
self,
results: Dict[str, Any],
config: Optional[ReportConfig] = None,
cache_dir: Optional[Path] = None,
):
"""Initialize ExecutiveReport.
Args:
results: Analysis results dictionary.
config: Report configuration (uses default if None).
cache_dir: Optional cache directory.
"""
if config is None:
config = create_executive_config()
super().__init__(config, cache_dir)
self.results = results
self.style_manager = StyleManager()
self.key_metrics = self._extract_key_metrics()
[docs]
def generate(self) -> Path:
"""Generate the executive report.
Returns:
Path to generated report file.
"""
logger.info("Generating executive report...")
# Update configuration with actual results
self._update_config_with_results()
# Generate report in all requested formats
output_paths = []
for output_format in self.config.output_formats:
path = self.save(output_format)
output_paths.append(path)
logger.info(f"Executive report generated: {output_paths}")
return output_paths[0] # Return primary format
def _extract_key_metrics(self) -> Dict[str, Any]:
"""Extract key metrics from results.
Returns:
Dictionary of key metrics.
"""
metrics = {}
# Extract ROE metrics
if "roe" in self.results:
metrics["roe"] = self.results["roe"]
metrics["roe_improvement"] = self._calculate_improvement("roe")
# Extract ruin probability
if "ruin_probability" in self.results:
metrics["ruin_prob"] = self.results["ruin_probability"]
metrics["ruin_reduction"] = self._calculate_reduction("ruin_probability")
# Extract growth metrics
if "growth_rate" in self.results:
metrics["growth_rate"] = self.results["growth_rate"]
metrics["growth_improvement"] = self._calculate_improvement("growth_rate")
# Extract insurance metrics
if "optimal_limits" in self.results:
metrics["optimal_limits"] = self.results["optimal_limits"]
metrics["premium_ratio"] = self._calculate_premium_ratio()
# Risk metrics
if "trajectories" in self.results:
trajectories = self.results["trajectories"]
# Use the last values from trajectories for risk metrics
final_values = trajectories[:, -1] if len(trajectories.shape) == 2 else trajectories
risk_calc = RiskMetrics(final_values)
metrics["var_95"] = risk_calc.var(0.95)
metrics["cvar_95"] = risk_calc.tvar(0.95)
metrics["max_drawdown"] = self._calculate_max_drawdown(trajectories)
return metrics
def _calculate_improvement(self, metric: str) -> float:
"""Calculate percentage improvement over baseline.
Args:
metric: Metric name.
Returns:
Percentage improvement.
"""
if f"{metric}_baseline" in self.results and metric in self.results:
baseline = self.results[f"{metric}_baseline"]
current = self.results[metric]
if baseline > 0:
return float((current / baseline - 1) * 100)
return 0.0
def _calculate_reduction(self, metric: str) -> float:
"""Calculate percentage reduction from baseline.
Args:
metric: Metric name.
Returns:
Percentage reduction.
"""
if f"{metric}_baseline" in self.results and metric in self.results:
baseline = self.results[f"{metric}_baseline"]
current = self.results[metric]
if baseline > 0:
return float((1 - current / baseline) * 100)
return 0.0
def _calculate_premium_ratio(self) -> float:
"""Calculate premium to expected loss ratio.
Returns:
Premium ratio.
"""
if "total_premium" in self.results and "expected_losses" in self.results:
premium = self.results["total_premium"]
expected = self.results["expected_losses"]
if expected > 0:
return float(premium / expected)
return 0.0
def _calculate_max_drawdown(self, trajectories: np.ndarray) -> float:
"""Calculate maximum drawdown from trajectories.
Args:
trajectories: Asset trajectories.
Returns:
Maximum drawdown.
"""
if len(trajectories.shape) == 2:
# Calculate for each trajectory
drawdowns = []
for traj in trajectories:
peak = np.maximum.accumulate(traj)
drawdown = (traj - peak) / peak
drawdowns.append(np.min(drawdown))
return float(np.mean(drawdowns))
return 0.0
def _update_config_with_results(self):
"""Update report configuration with actual results."""
# Update metadata
self.config.metadata.abstract = self._generate_abstract()
# Update sections with actual content
for section in self.config.sections:
if section.title == "Key Findings":
section.content = self._generate_key_findings()
elif section.title == "Recommendations":
section.content = self._generate_recommendations()
def _generate_abstract(self) -> str:
"""Generate executive abstract.
Returns:
Abstract text.
"""
abstract_parts = []
# Main finding
if self.key_metrics.get("roe_improvement", 0) > 0:
abstract_parts.append(
f"Analysis demonstrates that optimized insurance strategies can improve "
f"ROE by {self.key_metrics['roe_improvement']:.1f}% while reducing "
f"ruin probability by {self.key_metrics.get('ruin_reduction', 0):.1f}%."
)
# Key insight
if self.key_metrics.get("premium_ratio", 0) > 2:
abstract_parts.append(
f"Optimal insurance premiums exceed expected losses by "
f"{(self.key_metrics['premium_ratio']-1)*100:.0f}%, validating "
f"the ergodic approach to insurance as a growth enabler."
)
# Recommendation
abstract_parts.append(
"Implementation of recommended insurance structures is projected to "
"deliver significant long-term value creation."
)
return " ".join(abstract_parts)
def _generate_key_findings(self) -> str:
"""Generate key findings section.
Returns:
Key findings text.
"""
findings = []
# ROE finding
if "roe" in self.key_metrics:
findings.append(
f"**Optimized ROE:** {self.key_metrics['roe']:.1%} "
f"(+{self.key_metrics.get('roe_improvement', 0):.1f}% vs baseline)"
)
# Ruin probability finding
if "ruin_prob" in self.key_metrics:
findings.append(
f"**Ruin Probability:** {self.key_metrics['ruin_prob']:.2%} "
f"(-{self.key_metrics.get('ruin_reduction', 0):.1f}% reduction)"
)
# Growth rate finding
if "growth_rate" in self.key_metrics:
findings.append(
f"**Annualized Growth:** {self.key_metrics['growth_rate']:.1%} "
f"(+{self.key_metrics.get('growth_improvement', 0):.1f}% improvement)"
)
# Risk metrics
if "var_95" in self.key_metrics:
findings.append(
f"**95% VaR:** ${self.key_metrics['var_95']:,.0f} | "
f"**95% CVaR:** ${self.key_metrics['cvar_95']:,.0f}"
)
# Insurance structure
if "optimal_limits" in self.key_metrics:
limits = self.key_metrics["optimal_limits"]
findings.append(
f"**Optimal Structure:** Primary limit ${limits[0]/1e6:.1f}M, "
f"Total limit ${sum(limits)/1e6:.1f}M"
)
return "\n\n".join(findings)
def _generate_recommendations(self) -> str:
"""Generate recommendations section.
Returns:
Recommendations text.
"""
recommendations = []
# Primary recommendation
recommendations.append(
"### 1. Implement Optimized Insurance Structure\n"
"Adopt the identified optimal insurance layers to maximize "
"long-term growth while maintaining acceptable risk levels."
)
# Risk management
if self.key_metrics.get("ruin_prob", 1) < 0.01:
recommendations.append(
"### 2. Maintain Current Risk Management Framework\n"
"The optimized structure achieves ruin probability below 1%, "
"meeting best-practice standards for financial stability."
)
else:
recommendations.append(
"### 2. Enhance Risk Management Controls\n"
"Consider additional risk mitigation measures to further "
"reduce ruin probability below the 1% threshold."
)
# Premium optimization
if self.key_metrics.get("premium_ratio", 0) > 2.5:
recommendations.append(
"### 3. Review Premium Negotiation Strategy\n"
"Current analysis suggests room for premium optimization. "
"Consider competitive bidding or alternative structures."
)
# Monitoring
recommendations.append(
"### 4. Establish Quarterly Review Process\n"
"Implement regular monitoring of key metrics to ensure "
"continued optimization as market conditions evolve."
)
return "\n\n".join(recommendations)
[docs]
def generate_roe_frontier(self, fig_config: FigureConfig) -> plt.Figure:
"""Generate ROE-Ruin frontier plot.
Args:
fig_config: Figure configuration.
Returns:
Matplotlib figure.
"""
if "frontier_data" in self.results:
data = self.results["frontier_data"]
# Create figure directly without using plot_roe_ruin_frontier
fig, ax = plt.subplots(figsize=(fig_config.width, fig_config.height))
# Plot the frontier
ax.plot(
data["ruin_probs"],
data["roes"],
"o-",
linewidth=2,
markersize=8,
color="#2E86C1",
label="Efficient Frontier",
)
# Add optimal point if present
if "optimal_point" in data and data["optimal_point"]:
opt_x, opt_y = data["optimal_point"]
ax.plot(
opt_x,
opt_y,
"*",
markersize=15,
color="#E74C3C",
label="Optimal Point",
zorder=5,
)
ax.set_xlabel("Ruin Probability", fontsize=12)
ax.set_ylabel("Return on Equity", fontsize=12)
ax.set_title("ROE-Ruin Efficient Frontier", fontsize=14, fontweight="bold")
ax.grid(True, alpha=0.3)
ax.legend(loc="best")
# Format axes
ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f"{x:.1%}"))
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda y, p: f"{y:.1%}"))
else:
# Create placeholder
fig, ax = plt.subplots(figsize=(fig_config.width, fig_config.height))
ax.set_xlabel("Ruin Probability")
ax.set_ylabel("Return on Equity")
ax.set_title("ROE-Ruin Efficient Frontier")
return fig
[docs]
def generate_decision_matrix(self) -> pd.DataFrame:
"""Generate decision matrix table.
Returns:
Decision matrix DataFrame.
"""
# Create decision criteria
criteria = ["ROE", "Risk Level", "Growth Rate", "Implementation Cost", "Complexity"]
# Define alternatives
alternatives = ["No Insurance", "Basic Coverage", "Optimized Structure", "Full Coverage"]
# Generate scores (example - would be based on actual analysis)
scores = np.array(
[
[0.12, 0.2, 0.08, 1.0, 0.9], # No Insurance
[0.14, 0.5, 0.10, 0.7, 0.7], # Basic
[0.18, 0.8, 0.15, 0.5, 0.5], # Optimized
[0.15, 0.9, 0.12, 0.3, 0.3], # Full
]
)
# Create DataFrame
df = pd.DataFrame(scores, index=alternatives, columns=criteria)
# Add weighted total
weights = [0.3, 0.25, 0.2, 0.15, 0.1]
df["Weighted Score"] = (scores * weights).sum(axis=1)
return df
[docs]
def generate_convergence_plot(self, fig_config: FigureConfig) -> plt.Figure:
"""Generate convergence diagnostics plot.
Args:
fig_config: Figure configuration.
Returns:
Matplotlib figure.
"""
fig, axes = plt.subplots(2, 1, figsize=(fig_config.width, fig_config.height))
if "convergence_data" in self.results:
data = self.results["convergence_data"]
# Plot running mean
axes[0].plot(data["iterations"], data["running_mean"])
axes[0].set_xlabel("Iterations")
axes[0].set_ylabel("Running Mean")
axes[0].set_title("Convergence of Mean Estimate")
axes[0].grid(True, alpha=0.3)
# Plot standard error
axes[1].plot(data["iterations"], data["std_error"])
axes[1].set_xlabel("Iterations")
axes[1].set_ylabel("Standard Error")
axes[1].set_title("Standard Error Reduction")
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
return fig
[docs]
def generate_convergence_table(self) -> pd.DataFrame:
"""Generate convergence metrics table.
Returns:
Convergence metrics DataFrame.
"""
metrics = []
if "convergence_metrics" in self.results:
conv = self.results["convergence_metrics"]
metrics = [
{
"Metric": "Gelman-Rubin R-hat",
"Value": conv.get("gelman_rubin", "N/A"),
"Target": "< 1.1",
"Status": "PASS" if conv.get("gelman_rubin", 2) < 1.1 else "FAIL",
},
{
"Metric": "Effective Sample Size",
"Value": conv.get("ess", "N/A"),
"Target": "> 1000",
"Status": "PASS" if conv.get("ess", 0) > 1000 else "FAIL",
},
{
"Metric": "Autocorrelation",
"Value": f"{conv.get('autocorr', 'N/A'):.3f}",
"Target": "< 0.1",
"Status": "PASS" if conv.get("autocorr", 1) < 0.1 else "FAIL",
},
{
"Metric": "Batch Means p-value",
"Value": f"{conv.get('batch_p', 'N/A'):.3f}",
"Target": "> 0.05",
"Status": "PASS" if conv.get("batch_p", 0) > 0.05 else "FAIL",
},
]
else:
# Generate placeholder data
metrics = [
{
"Metric": "Gelman-Rubin R-hat",
"Value": 1.02,
"Target": "< 1.1",
"Status": "PASS",
},
{
"Metric": "Effective Sample Size",
"Value": 5234,
"Target": "> 1000",
"Status": "PASS",
},
{"Metric": "Autocorrelation", "Value": 0.045, "Target": "< 0.1", "Status": "PASS"},
{
"Metric": "Batch Means p-value",
"Value": 0.342,
"Target": "> 0.05",
"Status": "PASS",
},
]
return pd.DataFrame(metrics)