Source code for ergodic_insurance.benchmarking

"""Comprehensive benchmarking suite for Monte Carlo simulations.

This module provides tools for benchmarking Monte Carlo engine performance,
targeting 100K simulations in under 60 seconds on 4-core CPUs with <4GB memory.

Key features:
    - Performance benchmarking at multiple scales (1K, 10K, 100K)
    - Memory usage tracking and profiling
    - CPU efficiency monitoring
    - Cache effectiveness measurement
    - Automated performance report generation
    - Comparison of optimization strategies

Example:
    >>> from benchmarking import BenchmarkSuite, BenchmarkConfig
    >>> from monte_carlo import MonteCarloEngine
    >>>
    >>> suite = BenchmarkSuite()
    >>> config = BenchmarkConfig(scales=[1000, 10000, 100000])
    >>>
    >>> # Run comprehensive benchmarks
    >>> results = suite.run_comprehensive_benchmark(engine, config)
    >>> print(results.summary())
    >>>
    >>> # Check if performance targets are met
    >>> if results.meets_requirements():
    ...     print("✓ All performance targets achieved!")

Google-style docstrings are used throughout for Sphinx documentation.
"""

from dataclasses import dataclass, field
from datetime import datetime
import gc
import json
import time
from typing import Any, Callable, Dict, List, Optional, Tuple, cast

import numpy as np
import psutil
from tabulate import tabulate  # type: ignore[import-untyped]


[docs] @dataclass class BenchmarkMetrics: """Metrics collected during benchmarking. Attributes: execution_time: Total execution time in seconds simulations_per_second: Throughput metric memory_peak_mb: Peak memory usage in MB memory_average_mb: Average memory usage in MB cpu_utilization: Average CPU utilization percentage cache_hit_rate: Cache effectiveness percentage accuracy_score: Numerical accuracy score convergence_iterations: Iterations to convergence """ execution_time: float simulations_per_second: float memory_peak_mb: float memory_average_mb: float cpu_utilization: float = 0.0 cache_hit_rate: float = 0.0 accuracy_score: float = 1.0 convergence_iterations: int = 0
[docs] def to_dict(self) -> Dict[str, Any]: """Convert metrics to dictionary. Returns: Dictionary representation of metrics. """ return { "execution_time": self.execution_time, "simulations_per_second": self.simulations_per_second, "memory_peak_mb": self.memory_peak_mb, "memory_average_mb": self.memory_average_mb, "cpu_utilization": self.cpu_utilization, "cache_hit_rate": self.cache_hit_rate, "accuracy_score": self.accuracy_score, "convergence_iterations": self.convergence_iterations, }
[docs] @dataclass class BenchmarkResult: """Results from a benchmark run. Attributes: scale: Number of simulations metrics: Performance metrics configuration: Configuration used timestamp: When benchmark was run system_info: System information optimizations: Optimizations applied """ scale: int metrics: BenchmarkMetrics configuration: Dict[str, Any] timestamp: datetime system_info: Dict[str, Any] = field(default_factory=dict) optimizations: List[str] = field(default_factory=list)
[docs] def meets_target(self, target_time: float, target_memory: float) -> bool: """Check if result meets performance targets. Args: target_time: Maximum execution time in seconds. target_memory: Maximum memory usage in MB. Returns: True if targets are met. """ return ( self.metrics.execution_time <= target_time and self.metrics.memory_peak_mb <= target_memory )
[docs] def summary(self) -> str: """Generate result summary. Returns: Formatted summary string. """ summary = f"Benchmark Result - {self.scale:,} simulations\n" summary += f" Time: {self.metrics.execution_time:.2f}s " summary += f"({self.metrics.simulations_per_second:.0f} sims/s)\n" summary += f" Memory: {self.metrics.memory_peak_mb:.1f} MB peak, " summary += f"{self.metrics.memory_average_mb:.1f} MB avg\n" summary += f" CPU: {self.metrics.cpu_utilization:.1f}%\n" if self.metrics.cache_hit_rate > 0: summary += f" Cache: {self.metrics.cache_hit_rate:.1f}% hit rate\n" if self.metrics.accuracy_score < 1.0: summary += f" Accuracy: {self.metrics.accuracy_score:.4f}\n" return summary
[docs] @dataclass class BenchmarkConfig: """Configuration for benchmarking. Attributes: scales: List of simulation counts to test n_years: Years per simulation n_workers: Number of parallel workers memory_limit_mb: Memory limit for testing target_times: Target execution times per scale repetitions: Number of repetitions per test warmup_runs: Number of warmup runs enable_profiling: Enable detailed profiling """ scales: List[int] = field(default_factory=lambda: [1000, 10000, 100000]) n_years: int = 10 n_workers: int = 4 memory_limit_mb: float = 4000.0 target_times: Dict[int, float] = field( default_factory=lambda: { 1000: 1.0, # 1 second for 1K 10000: 10.0, # 10 seconds for 10K 100000: 60.0, # 60 seconds for 100K } ) repetitions: int = 3 warmup_runs: int = 2 enable_profiling: bool = True
[docs] class SystemProfiler: """Profile system resources during benchmarking.""" def __init__(self): """Initialize system profiler.""" self.process = psutil.Process() self.cpu_samples = [] self.memory_samples = [] self.initial_memory = 0.0
[docs] def start(self) -> None: """Start profiling.""" self.cpu_samples = [] self.memory_samples = [] self.initial_memory = self.process.memory_info().rss / (1024 * 1024)
[docs] def sample(self) -> None: """Take a resource sample.""" self.cpu_samples.append(self.process.cpu_percent()) self.memory_samples.append(self.process.memory_info().rss / (1024 * 1024))
[docs] def get_metrics(self) -> Tuple[float, float, float]: """Get profiling metrics. Returns: Tuple of (avg_cpu, peak_memory, avg_memory). """ if not self.cpu_samples: return 0.0, self.initial_memory, self.initial_memory avg_cpu = np.mean(self.cpu_samples[1:]) if len(self.cpu_samples) > 1 else 0 peak_memory = max(self.memory_samples) if self.memory_samples else self.initial_memory avg_memory = np.mean(self.memory_samples) if self.memory_samples else self.initial_memory return avg_cpu, peak_memory, avg_memory
[docs] @staticmethod def get_system_info() -> Dict[str, Any]: """Get system information. Returns: Dictionary of system information. """ import platform cpu_info = { "cores_physical": psutil.cpu_count(logical=False), "cores_logical": psutil.cpu_count(logical=True), "cpu_freq_mhz": psutil.cpu_freq().current if psutil.cpu_freq() else 0, } memory_info = { "total_gb": psutil.virtual_memory().total / (1024**3), "available_gb": psutil.virtual_memory().available / (1024**3), } return { "platform": platform.platform(), "processor": platform.processor(), "python_version": platform.python_version(), "cpu": cpu_info, "memory": memory_info, "timestamp": datetime.now().isoformat(), }
[docs] class BenchmarkRunner: """Run individual benchmarks with monitoring.""" def __init__(self, profiler: Optional[SystemProfiler] = None): """Initialize benchmark runner. Args: profiler: System profiler instance. """ self.profiler = profiler or SystemProfiler()
[docs] def run_single_benchmark( self, func: Callable, args: Tuple = (), kwargs: Optional[Dict] = None, monitor_interval: float = 0.1, ) -> BenchmarkMetrics: """Run a single benchmark with monitoring. Args: func: Function to benchmark. args: Positional arguments for function. kwargs: Keyword arguments for function. monitor_interval: Monitoring interval in seconds. Returns: BenchmarkMetrics from the run. """ kwargs = kwargs or {} # Force garbage collection gc.collect() # Start profiling self.profiler.start() # Start monitoring in background start_time = time.time() # Run the function try: result = func(*args, **kwargs) # Sample resources during execution while time.time() - start_time < 0.1: self.profiler.sample() time.sleep(monitor_interval) except Exception as e: raise RuntimeError(f"Benchmark failed: {e}") from e execution_time = time.time() - start_time # Get final metrics avg_cpu, peak_memory, avg_memory = self.profiler.get_metrics() # Calculate throughput n_simulations = kwargs.get("n_simulations", getattr(result, "n_simulations", 1)) simulations_per_second = n_simulations / execution_time if execution_time > 0 else 0 # Extract additional metrics if available cache_hit_rate = 0.0 if hasattr(result, "cache_hit_rate"): cache_hit_rate = result.cache_hit_rate elif hasattr(func, "__self__") and hasattr(func.__self__, "cache"): cache = func.__self__.cache if hasattr(cache, "hit_rate"): cache_hit_rate = cache.hit_rate accuracy_score = 1.0 if hasattr(result, "accuracy_score"): accuracy_score = result.accuracy_score convergence_iterations = 0 if hasattr(result, "convergence_iterations"): convergence_iterations = result.convergence_iterations return BenchmarkMetrics( execution_time=execution_time, simulations_per_second=simulations_per_second, memory_peak_mb=peak_memory, memory_average_mb=avg_memory, cpu_utilization=avg_cpu, cache_hit_rate=cache_hit_rate, accuracy_score=accuracy_score, convergence_iterations=convergence_iterations, )
[docs] def run_with_warmup( self, func: Callable, args: Tuple = (), kwargs: Optional[Dict] = None, warmup_runs: int = 2, benchmark_runs: int = 3, ) -> List[BenchmarkMetrics]: """Run benchmark with warmup. Args: func: Function to benchmark. args: Positional arguments. kwargs: Keyword arguments. warmup_runs: Number of warmup runs. benchmark_runs: Number of benchmark runs. Returns: List of benchmark metrics. """ kwargs = kwargs or {} # Warmup runs for i in range(warmup_runs): print(f" Warmup {i+1}/{warmup_runs}...", end="") _ = func(*args, **kwargs) print(" done") gc.collect() # Benchmark runs metrics_list = [] for i in range(benchmark_runs): print(f" Run {i+1}/{benchmark_runs}...", end="") metrics = self.run_single_benchmark(func, args, kwargs) metrics_list.append(metrics) print(f" {metrics.execution_time:.2f}s") gc.collect() return metrics_list
[docs] class BenchmarkSuite: """Comprehensive benchmark suite for Monte Carlo simulations. Provides tools to benchmark performance across different scales and configurations, generating detailed reports. """ def __init__(self): """Initialize benchmark suite.""" self.runner = BenchmarkRunner() self.results: List[BenchmarkResult] = [] self.system_info = SystemProfiler.get_system_info()
[docs] def benchmark_scale( self, engine, scale: int, config: BenchmarkConfig, optimizations: Optional[List[str]] = None ) -> BenchmarkResult: """Benchmark at a specific scale. Args: engine: Monte Carlo engine to benchmark. scale: Number of simulations. config: Benchmark configuration. optimizations: List of applied optimizations. Returns: BenchmarkResult for this scale. """ optimizations = optimizations or [] print(f"\nBenchmarking {scale:,} simulations...") # Prepare engine configuration engine_config = engine.config engine_config.n_simulations = scale engine_config.n_years = config.n_years engine_config.n_workers = config.n_workers engine_config.progress_bar = False # Run benchmark with warmup metrics_list = self.runner.run_with_warmup( engine.run, warmup_runs=config.warmup_runs, benchmark_runs=config.repetitions ) # Average metrics avg_metrics = BenchmarkMetrics( execution_time=float(np.mean([m.execution_time for m in metrics_list])), simulations_per_second=float(np.mean([m.simulations_per_second for m in metrics_list])), memory_peak_mb=float(np.max([m.memory_peak_mb for m in metrics_list])), memory_average_mb=float(np.mean([m.memory_average_mb for m in metrics_list])), cpu_utilization=float(np.mean([m.cpu_utilization for m in metrics_list])), cache_hit_rate=float(np.mean([m.cache_hit_rate for m in metrics_list])), accuracy_score=float(np.mean([m.accuracy_score for m in metrics_list])), convergence_iterations=int(np.mean([m.convergence_iterations for m in metrics_list])), ) # Create result result = BenchmarkResult( scale=scale, metrics=avg_metrics, configuration={ "n_years": config.n_years, "n_workers": config.n_workers, "memory_limit": config.memory_limit_mb, }, timestamp=datetime.now(), system_info=self.system_info, optimizations=optimizations, ) print(result.summary()) # Check targets target_time = config.target_times.get(scale, float("inf")) if result.meets_target(target_time, config.memory_limit_mb): print(f"✓ Meets targets (time<{target_time}s, memory<{config.memory_limit_mb}MB)") else: print(f"✗ Misses targets (time<{target_time}s, memory<{config.memory_limit_mb}MB)") return result
[docs] def run_comprehensive_benchmark( self, engine, config: Optional[BenchmarkConfig] = None ) -> "ComprehensiveBenchmarkResult": """Run comprehensive benchmark suite. Args: engine: Monte Carlo engine to benchmark. config: Benchmark configuration. Returns: ComprehensiveBenchmarkResult with all results. """ config = config or BenchmarkConfig() print("=" * 60) print("COMPREHENSIVE BENCHMARK SUITE") print("=" * 60) print(f"System: {self.system_info['platform']}") print(f"CPU: {self.system_info['cpu']['cores_physical']} cores") print(f"Memory: {self.system_info['memory']['available_gb']:.1f} GB available") print("=" * 60) # Test different scales for scale in config.scales: result = self.benchmark_scale(engine, scale, config) self.results.append(result) # Test with optimizations if hasattr(engine, "enable_optimizations"): print("\nTesting with optimizations enabled...") engine.enable_optimizations() for scale in config.scales: result = self.benchmark_scale( engine, scale, config, optimizations=["vectorization", "caching", "parallel"] ) self.results.append(result) return ComprehensiveBenchmarkResult(self.results, config, self.system_info)
[docs] def compare_configurations( self, engine_factory: Callable, configurations: List[Dict[str, Any]], scale: int = 10000 ) -> "ConfigurationComparison": """Compare different configurations. Args: engine_factory: Factory function to create engines. configurations: List of configuration dictionaries. scale: Number of simulations to test. Returns: ConfigurationComparison results. """ comparison_results = [] for i, config_dict in enumerate(configurations): print(f"\nConfiguration {i+1}: {config_dict.get('name', 'unnamed')}") # Create engine with configuration engine = engine_factory(**config_dict) # Run benchmark config = BenchmarkConfig(scales=[scale]) result = self.benchmark_scale(engine, scale, config) comparison_results.append({"configuration": config_dict, "result": result}) return ConfigurationComparison(comparison_results)
[docs] @dataclass class ComprehensiveBenchmarkResult: """Results from comprehensive benchmark suite. Attributes: results: List of individual benchmark results config: Configuration used system_info: System information """ results: List[BenchmarkResult] config: BenchmarkConfig system_info: Dict[str, Any]
[docs] def meets_requirements(self) -> bool: """Check if all requirements are met. Returns: True if all performance requirements are satisfied. """ for result in self.results: scale = result.scale target_time = self.config.target_times.get(scale, float("inf")) if not result.meets_target(target_time, self.config.memory_limit_mb): return False # Special check for 100K requirement for result in self.results: if result.scale == 100000: if result.metrics.execution_time > 60 or result.metrics.memory_peak_mb > 4000: return False if result.metrics.accuracy_score < 0.9999: return False return True
[docs] def summary(self) -> str: """Generate comprehensive summary. Returns: Formatted summary string. """ summary = "BENCHMARK RESULTS SUMMARY\n" + "=" * 60 + "\n" # Table of results table_data = [] for result in self.results: opts = ", ".join(result.optimizations) if result.optimizations else "none" table_data.append( [ f"{result.scale:,}", f"{result.metrics.execution_time:.2f}s", f"{result.metrics.simulations_per_second:.0f}", f"{result.metrics.memory_peak_mb:.1f} MB", f"{result.metrics.cpu_utilization:.1f}%", f"{result.metrics.cache_hit_rate:.1f}%", opts, ] ) headers = ["Scale", "Time", "Sims/s", "Memory", "CPU", "Cache", "Optimizations"] summary += tabulate(table_data, headers=headers, tablefmt="grid") + "\n\n" # Performance targets check summary += "PERFORMANCE TARGETS\n" + "-" * 30 + "\n" checks = { "100K in <60s": False, "Memory <4GB": False, "Accuracy >99.99%": False, "CPU Efficiency >75%": False, "Cache Hit Rate >85%": False, } for result in self.results: if result.scale == 100000: checks["100K in <60s"] = result.metrics.execution_time < 60 checks["Memory <4GB"] = result.metrics.memory_peak_mb < 4000 checks["Accuracy >99.99%"] = result.metrics.accuracy_score > 0.9999 checks["CPU Efficiency >75%"] = result.metrics.cpu_utilization > 75 checks["Cache Hit Rate >85%"] = result.metrics.cache_hit_rate > 85 for check, passed in checks.items(): status = "✓" if passed else "✗" summary += f"{status} {check}\n" # Overall verdict if self.meets_requirements(): summary += "\n✓ ALL REQUIREMENTS MET - Ready for production\n" else: summary += "\n✗ REQUIREMENTS NOT MET - Further optimization needed\n" return summary
[docs] def save_report(self, filepath: str) -> None: """Save benchmark report to file. Args: filepath: Path to save report. """ report = { "timestamp": datetime.now().isoformat(), "system_info": self.system_info, "configuration": { "scales": self.config.scales, "n_years": self.config.n_years, "n_workers": self.config.n_workers, "memory_limit_mb": self.config.memory_limit_mb, }, "results": [ { "scale": r.scale, "metrics": r.metrics.to_dict(), "optimizations": r.optimizations, "meets_target": r.meets_target( self.config.target_times.get(r.scale, float("inf")), self.config.memory_limit_mb, ), } for r in self.results ], "meets_all_requirements": self.meets_requirements(), } with open(filepath, "w") as f: json.dump(report, f, indent=2, default=str) print(f"Report saved to: {filepath}")
[docs] @dataclass class ConfigurationComparison: """Results from configuration comparison.""" results: List[Dict[str, Any]]
[docs] def best_configuration(self) -> Dict[str, Any]: """Find best configuration. Returns: Best configuration based on execution time. """ best = min(self.results, key=lambda x: x["result"].metrics.execution_time) return cast(Dict[str, Any], best["configuration"])
[docs] def summary(self) -> str: """Generate comparison summary. Returns: Formatted summary string. """ summary = "CONFIGURATION COMPARISON\n" + "=" * 60 + "\n" table_data = [] for item in self.results: config = item["configuration"] result = item["result"] table_data.append( [ config.get("name", "unnamed"), f"{result.metrics.execution_time:.2f}s", f"{result.metrics.memory_peak_mb:.1f} MB", f"{result.metrics.cpu_utilization:.1f}%", ] ) headers = ["Configuration", "Time", "Memory", "CPU"] summary += tabulate(table_data, headers=headers, tablefmt="grid") # Best configuration best = self.best_configuration() summary += f"\n\nBest configuration: {best.get('name', 'unnamed')}\n" return summary
[docs] def run_quick_benchmark(engine, n_simulations: int = 10000) -> BenchmarkMetrics: """Run a quick benchmark. Args: engine: Monte Carlo engine to benchmark. n_simulations: Number of simulations. Returns: BenchmarkMetrics from the run. """ runner = BenchmarkRunner() # Configure engine engine.config.n_simulations = n_simulations engine.config.progress_bar = False # Run benchmark return runner.run_single_benchmark(engine.run)
if __name__ == "__main__": # Example usage from ergodic_insurance.config import ManufacturerConfig from ergodic_insurance.insurance_program import EnhancedInsuranceLayer, InsuranceProgram from ergodic_insurance.loss_distributions import ManufacturingLossGenerator from ergodic_insurance.manufacturer import WidgetManufacturer from ergodic_insurance.monte_carlo import MonteCarloEngine, SimulationConfig # Setup simulation loss_generator = ManufacturingLossGenerator() layers = [ EnhancedInsuranceLayer(0, 1_000_000, 0.015), EnhancedInsuranceLayer(1_000_000, 4_000_000, 0.008), ] insurance_program = InsuranceProgram(layers=layers) manufacturer_config = ManufacturerConfig( initial_assets=10_000_000, asset_turnover_ratio=0.5, base_operating_margin=0.08, tax_rate=0.25, retention_ratio=0.6, ) manufacturer = WidgetManufacturer(manufacturer_config) # Create engine sim_config = SimulationConfig(n_simulations=1000, n_years=10, parallel=True, n_workers=4) engine = MonteCarloEngine( loss_generator=loss_generator, insurance_program=insurance_program, manufacturer=manufacturer, config=sim_config, ) # Run benchmarks suite = BenchmarkSuite() config = BenchmarkConfig(scales=[1000, 10000]) results = suite.run_comprehensive_benchmark(engine, config) print("\n" + results.summary()) # Save report results.save_report("benchmark_report.json")