Source code for ergodic_insurance.performance_optimizer
"""Performance optimization module for Monte Carlo simulations.
This module provides tools and strategies to optimize the performance of
Monte Carlo simulations, targeting 100K simulations in under 60 seconds
on budget hardware (4-core CPU, 8GB RAM).
Key features:
- Execution profiling and bottleneck identification
- Vectorized operations for loss generation and insurance calculations
- Smart caching for repeated calculations
- Memory optimization for large-scale simulations
- Integration with parallel execution framework
Example:
>>> from performance_optimizer import PerformanceOptimizer
>>> from monte_carlo import MonteCarloEngine
>>>
>>> optimizer = PerformanceOptimizer()
>>> engine = MonteCarloEngine(config=config)
>>>
>>> # Profile execution
>>> profile_results = optimizer.profile_execution(engine, n_simulations=1000)
>>> print(profile_results.bottlenecks)
>>>
>>> # Apply optimizations
>>> optimized_engine = optimizer.optimize_engine(engine)
>>> results = optimized_engine.run()
Google-style docstrings are used throughout for Sphinx documentation.
"""
import cProfile
from collections import defaultdict
from dataclasses import dataclass, field
from functools import lru_cache, wraps
import gc
import io
import pstats
import time
from typing import Any, Callable, Dict, List, Optional, Tuple, cast
import numpy as np
import psutil
# Try to import numba for JIT compilation, but make it optional
try:
from numba import jit, prange
NUMBA_AVAILABLE = True
except ImportError:
NUMBA_AVAILABLE = False
# Define dummy decorators when numba is not available
prange = range
[docs]
@dataclass
class ProfileResult:
"""Results from performance profiling.
Attributes:
total_time: Total execution time in seconds
bottlenecks: List of performance bottlenecks identified
function_times: Dictionary mapping function names to execution times
memory_usage: Peak memory usage in MB
recommendations: List of optimization recommendations
"""
total_time: float
bottlenecks: List[str] = field(default_factory=list)
function_times: Dict[str, float] = field(default_factory=dict)
memory_usage: float = 0.0
recommendations: List[str] = field(default_factory=list)
[docs]
def summary(self) -> str:
"""Generate a summary of profiling results.
Returns:
Formatted summary string.
"""
summary = f"Performance Profile Summary\n{'='*50}\n"
summary += f"Total Time: {self.total_time:.2f}s\n"
summary += f"Peak Memory: {self.memory_usage:.1f} MB\n"
if self.bottlenecks:
summary += "\nTop Bottlenecks:\n"
for bottleneck in self.bottlenecks[:5]:
summary += f" - {bottleneck}\n"
if self.recommendations:
summary += "\nRecommendations:\n"
for rec in self.recommendations:
summary += f" - {rec}\n"
return summary
[docs]
@dataclass
class OptimizationConfig:
"""Configuration for performance optimization.
Attributes:
enable_vectorization: Use vectorized operations
enable_caching: Use smart caching
cache_size: Maximum cache entries
enable_numba: Use Numba JIT compilation
memory_limit_mb: Memory usage limit in MB
chunk_size: Chunk size for batch processing
"""
enable_vectorization: bool = True
enable_caching: bool = True
cache_size: int = 1000
enable_numba: bool = True
memory_limit_mb: float = 4000.0
chunk_size: int = 10000
[docs]
class SmartCache:
"""Smart caching system for repeated calculations.
Provides intelligent caching with memory management and
hit rate tracking.
"""
def __init__(self, max_size: int = 1000):
"""Initialize smart cache.
Args:
max_size: Maximum number of cache entries.
"""
self.cache: Dict[Tuple, Any] = {}
self.max_size = max_size
self.hits = 0
self.misses = 0
self.access_counts: Dict[Tuple, int] = defaultdict(int)
[docs]
def get(self, key: Tuple) -> Optional[Any]:
"""Get value from cache.
Args:
key: Cache key (must be hashable).
Returns:
Cached value or None if not found.
"""
if key in self.cache:
self.hits += 1
self.access_counts[key] += 1
return self.cache[key]
self.misses += 1
return None
[docs]
def set(self, key: Tuple, value: Any) -> None:
"""Set value in cache.
Args:
key: Cache key (must be hashable).
value: Value to cache.
"""
if len(self.cache) >= self.max_size:
# Evict least recently accessed
least_accessed = min(self.access_counts.keys(), key=lambda k: self.access_counts[k])
del self.cache[least_accessed]
del self.access_counts[least_accessed]
self.cache[key] = value
self.access_counts[key] = 1
@property
def hit_rate(self) -> float:
"""Calculate cache hit rate.
Returns:
Hit rate as percentage.
"""
total = self.hits + self.misses
return (self.hits / total * 100) if total > 0 else 0.0
[docs]
def clear(self) -> None:
"""Clear the cache."""
self.cache.clear()
self.access_counts.clear()
self.hits = 0
self.misses = 0
[docs]
class VectorizedOperations:
"""Vectorized operations for performance optimization."""
[docs]
@staticmethod
@jit(nopython=True, parallel=True)
def calculate_growth_rates(
final_assets: np.ndarray, initial_assets: float, n_years: float
) -> np.ndarray:
"""Calculate growth rates using vectorized operations.
Args:
final_assets: Array of final asset values.
initial_assets: Initial asset value.
n_years: Number of years.
Returns:
Array of growth rates.
"""
n = len(final_assets)
growth_rates = np.zeros(n)
for i in prange(n):
if final_assets[i] > 0 and initial_assets > 0:
growth_rates[i] = np.log(final_assets[i] / initial_assets) / n_years
return growth_rates
[docs]
@staticmethod
def apply_insurance_vectorized(
losses: np.ndarray, attachment: float, limit: float
) -> Tuple[np.ndarray, np.ndarray]:
"""Apply insurance coverage using vectorized operations.
Args:
losses: Array of loss amounts.
attachment: Insurance attachment point.
limit: Insurance limit.
Returns:
Tuple of (retained_losses, recovered_amounts).
"""
# Vectorized insurance calculation
excess_losses = np.maximum(losses - attachment, 0)
recovered = np.minimum(excess_losses, limit)
retained = losses - recovered
return retained, recovered
[docs]
class PerformanceOptimizer:
"""Main performance optimization engine.
Provides profiling, optimization, and monitoring capabilities
for Monte Carlo simulations.
"""
def __init__(self, config: Optional[OptimizationConfig] = None):
"""Initialize performance optimizer.
Args:
config: Optimization configuration.
"""
self.config = config or OptimizationConfig()
self.cache = SmartCache(max_size=self.config.cache_size)
self.vectorized = VectorizedOperations()
self._profile_data = None
[docs]
def profile_execution( # pylint: disable=too-many-locals
self, func: Callable, *args, **kwargs
) -> ProfileResult:
"""Profile function execution to identify bottlenecks.
Args:
func: Function to profile.
*args: Positional arguments for function.
**kwargs: Keyword arguments for function.
Returns:
ProfileResult with profiling data.
"""
# Memory tracking
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
# CPU profiling
profiler = cProfile.Profile()
start_time = time.time()
profiler.enable()
try:
_result = func(*args, **kwargs)
finally:
profiler.disable()
total_time = time.time() - start_time
# Memory usage
final_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_usage = final_memory - initial_memory
# Analyze profile
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s)
ps.sort_stats("cumulative")
ps.print_stats(20)
# Parse top functions
function_times = {}
bottlenecks = []
lines = s.getvalue().split("\n")
for line in lines:
if "function calls" in line or line.strip() == "":
continue
parts = line.split()
if len(parts) >= 6:
try:
cumtime = float(parts[3])
func_name = parts[-1]
function_times[func_name] = cumtime
# Identify bottlenecks (>10% of total time)
if total_time > 0 and cumtime > total_time * 0.1:
bottlenecks.append(
f"{func_name}: {cumtime:.2f}s ({cumtime/total_time*100:.1f}%)"
)
except (ValueError, IndexError):
pass
# Generate recommendations
recommendations = self._generate_recommendations(function_times, memory_usage, total_time)
return ProfileResult(
total_time=total_time,
bottlenecks=bottlenecks[:5],
function_times=function_times,
memory_usage=memory_usage,
recommendations=recommendations,
)
def _generate_recommendations(
self, function_times: Dict[str, float], memory_usage: float, total_time: float
) -> List[str]:
"""Generate optimization recommendations based on profiling.
Args:
function_times: Dictionary of function execution times.
memory_usage: Memory usage in MB.
total_time: Total execution time.
Returns:
List of recommendations.
"""
recommendations = []
# Check for slow functions
for func_name, time_spent in function_times.items():
if "generate_losses" in func_name and time_spent > total_time * 0.2:
recommendations.append("Consider vectorizing loss generation")
elif "insurance" in func_name and time_spent > total_time * 0.15:
recommendations.append("Optimize insurance calculations with vectorization")
elif "loop" in func_name.lower() and time_spent > total_time * 0.1:
recommendations.append("Replace loops with vectorized operations")
# Memory recommendations
if memory_usage > 2000:
recommendations.append("High memory usage detected - consider chunked processing")
if memory_usage > 3000:
recommendations.append("Use float32 instead of float64 for memory efficiency")
# Caching recommendations
if self.cache.hit_rate < 50 and self.cache.hits + self.cache.misses > 100:
recommendations.append(
f"Low cache hit rate ({self.cache.hit_rate:.1f}%) - review cache strategy"
)
# Parallel processing
if total_time > 10 and not any("parallel" in str(f).lower() for f in function_times):
recommendations.append("Enable parallel processing for better performance")
return recommendations
[docs]
def optimize_loss_generation(self, losses: List[float], batch_size: int = 10000) -> np.ndarray:
"""Optimize loss generation using vectorization.
Args:
losses: List of loss values.
batch_size: Size of processing batches.
Returns:
Optimized loss array.
"""
if self.config.enable_vectorization:
# Convert to numpy array for vectorized operations
loss_array = np.array(
losses, dtype=np.float32 if self.config.memory_limit_mb < 4000 else np.float64
)
# Process in chunks if needed
if len(loss_array) > batch_size:
chunks = []
for i in range(0, len(loss_array), batch_size):
chunk = loss_array[i : i + batch_size]
chunks.append(chunk)
return np.concatenate(chunks)
return loss_array
return np.array(losses)
[docs]
def optimize_insurance_calculation( # pylint: disable=too-many-locals
self, losses: np.ndarray, layers: List[Tuple[float, float, float]]
) -> Dict[str, Any]:
"""Optimize insurance calculations using vectorization and caching.
Args:
losses: Array of loss amounts.
layers: List of (attachment, limit, rate) tuples.
Returns:
Dictionary with optimized results.
"""
# Try cache first
cache_key = (losses.tobytes(), tuple(layers))
cached_result = self.cache.get(cache_key) if self.config.enable_caching else None
if cached_result is not None:
return cast(Dict[str, Any], cached_result)
total_premiums = 0.0
total_recovered = np.zeros_like(losses)
retained_losses = losses.copy()
if self.config.enable_vectorization:
for attachment, limit, rate in layers:
# Vectorized insurance application
retained, recovered = self.vectorized.apply_insurance_vectorized(
retained_losses, attachment, limit
)
total_recovered += recovered
retained_losses = retained
total_premiums += limit * rate
else:
# Non-vectorized fallback
for attachment, limit, rate in layers:
for i, loss in enumerate(retained_losses):
if loss > attachment:
recovery = min(loss - attachment, limit)
total_recovered[i] += recovery
retained_losses[i] -= recovery
total_premiums += limit * rate
result = {
"retained_losses": retained_losses,
"total_recovered": total_recovered,
"total_premiums": total_premiums,
"net_losses": retained_losses + total_premiums,
}
# Cache result
if self.config.enable_caching:
self.cache.set(cache_key, result)
return result
[docs]
def optimize_memory_usage(self) -> Dict[str, Any]:
"""Optimize memory usage for large simulations.
Returns:
Dictionary with memory optimization metrics.
"""
# Force garbage collection
gc.collect()
process = psutil.Process()
memory_info = process.memory_info()
# Get system memory
virtual_memory = psutil.virtual_memory()
metrics = {
"process_memory_mb": memory_info.rss / 1024 / 1024,
"available_memory_mb": virtual_memory.available / 1024 / 1024,
"memory_percent": virtual_memory.percent,
"suggested_chunk_size": self._calculate_optimal_chunk_size(virtual_memory.available),
}
# Clear caches if memory is tight
if metrics["memory_percent"] > 80:
self.cache.clear()
gc.collect()
metrics["cache_cleared"] = True
else:
metrics["cache_cleared"] = False
return metrics
def _calculate_optimal_chunk_size(self, available_memory: int) -> int:
"""Calculate optimal chunk size based on available memory.
Args:
available_memory: Available memory in bytes.
Returns:
Optimal chunk size.
"""
# Assume each simulation needs approximately 1KB
memory_per_sim = 1024 # bytes
# Use 50% of available memory for safety
safe_memory = available_memory * 0.5
# Calculate chunk size
chunk_size = int(safe_memory / memory_per_sim)
# Bounds
chunk_size = max(1000, min(chunk_size, 100000))
# Round to nearest 1000
return (chunk_size // 1000) * 1000
[docs]
def get_optimization_summary(self) -> str:
"""Get summary of optimization status.
Returns:
Formatted optimization summary.
"""
memory_metrics = self.optimize_memory_usage()
summary = f"Performance Optimization Summary\n{'='*50}\n"
summary += "Configuration:\n"
summary += (
f" Vectorization: {'Enabled' if self.config.enable_vectorization else 'Disabled'}\n"
)
summary += f" Caching: {'Enabled' if self.config.enable_caching else 'Disabled'}\n"
summary += f" Numba JIT: {'Enabled' if self.config.enable_numba else 'Disabled'}\n"
summary += f" Memory Limit: {self.config.memory_limit_mb:.0f} MB\n"
summary += "\nCache Performance:\n"
summary += f" Hit Rate: {self.cache.hit_rate:.1f}%\n"
summary += f" Hits: {self.cache.hits:,}\n"
summary += f" Misses: {self.cache.misses:,}\n"
summary += "\nMemory Usage:\n"
summary += f" Process: {memory_metrics['process_memory_mb']:.1f} MB\n"
summary += f" Available: {memory_metrics['available_memory_mb']:.1f} MB\n"
summary += f" System Usage: {memory_metrics['memory_percent']:.1f}%\n"
summary += f" Optimal Chunk Size: {memory_metrics['suggested_chunk_size']:,}\n"
return summary
[docs]
def cached_calculation(cache_size: int = 128):
"""Decorator for caching expensive calculations.
Args:
cache_size: Maximum cache size.
Returns:
Decorated function with caching.
"""
def decorator(func):
@wraps(func)
@lru_cache(maxsize=cache_size)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
return decorator
[docs]
def profile_function(func: Callable) -> Callable:
"""Decorator to profile function execution.
Args:
func: Function to profile.
Returns:
Decorated function with profiling.
"""
@wraps(func)
def wrapper(*args, **kwargs):
optimizer = PerformanceOptimizer()
result = optimizer.profile_execution(func, *args, **kwargs)
print(result.summary())
return func(*args, **kwargs)
return wrapper
if __name__ == "__main__":
# Example usage
# Create optimizer
optimizer = PerformanceOptimizer()
# Example: Optimize loss calculations
losses = np.random.exponential(100000, 100000)
layers = [(0.0, 1000000.0, 0.015), (1000000.0, 4000000.0, 0.008)]
start = time.time()
result = optimizer.optimize_insurance_calculation(losses, layers)
elapsed = time.time() - start
print(f"Optimized calculation completed in {elapsed:.3f}s")
print(f"Cache hit rate: {optimizer.cache.hit_rate:.1f}%")
print("\n" + optimizer.get_optimization_summary())