Source code for ergodic_insurance.trajectory_storage

"""Memory-efficient storage system for simulation trajectories.

This module provides a lightweight storage system for Monte Carlo simulation
trajectories that minimizes RAM usage while storing both partial time series
data and comprehensive summary statistics.

Features:
    - Memory-mapped numpy arrays for efficient storage
    - Optional HDF5 backend with compression
    - Configurable time series sampling (store every Nth year)
    - Lazy loading to minimize memory footprint
    - Automatic disk space management
    - CSV/JSON export for analysis tools
    - <2GB RAM usage for 100K simulations
    - <1GB disk usage with sampling

Example:
    >>> from ergodic_insurance.trajectory_storage import TrajectoryStorage
    >>> storage = TrajectoryStorage(
    ...     storage_dir="./trajectories",
    ...     sample_interval=5,  # Store every 5th year
    ...     max_disk_usage_gb=1.0
    ... )
    >>> # During simulation
    >>> storage.store_simulation(
    ...     sim_id=0,
    ...     annual_losses=losses,
    ...     final_assets=assets,
    ...     summary_stats=stats
    ... )
    >>> # Later retrieval
    >>> data = storage.load_simulation(sim_id=0)
"""

from dataclasses import dataclass
import gc
import json
from pathlib import Path
import shutil
from typing import Any, Dict, List, Optional
import warnings

import h5py
import numpy as np
import pandas as pd


[docs] @dataclass class StorageConfig: """Configuration for trajectory storage system.""" storage_dir: str = "./trajectory_storage" backend: str = "memmap" # 'memmap' or 'hdf5' sample_interval: int = 10 # Store every 10th year by default max_disk_usage_gb: float = 1.0 compression: bool = True compression_level: int = 4 # Medium compression chunk_size: int = 1000 # Process 1000 simulations at a time enable_summary_stats: bool = True enable_time_series: bool = True dtype: Any = np.float32 # Use float32 for memory efficiency
[docs] @dataclass class SimulationSummary: """Summary statistics for a single simulation.""" sim_id: int final_assets: float total_losses: float total_recoveries: float mean_annual_loss: float max_annual_loss: float min_annual_loss: float growth_rate: float ruin_occurred: bool ruin_year: Optional[int] = None volatility: Optional[float] = None
[docs] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for export.""" return { "sim_id": self.sim_id, "final_assets": float(self.final_assets), "total_losses": float(self.total_losses), "total_recoveries": float(self.total_recoveries), "mean_annual_loss": float(self.mean_annual_loss), "max_annual_loss": float(self.max_annual_loss), "min_annual_loss": float(self.min_annual_loss), "growth_rate": float(self.growth_rate), "ruin_occurred": self.ruin_occurred, "ruin_year": self.ruin_year, "volatility": float(self.volatility) if self.volatility else None, }
[docs] class TrajectoryStorage: """Memory-efficient storage for simulation trajectories. Provides lightweight storage using memory-mapped arrays or HDF5, with configurable sampling and automatic disk space management. """ def __init__(self, config: Optional[StorageConfig] = None): """Initialize trajectory storage. Args: config: Storage configuration """ self.config = config or StorageConfig() self.storage_path = Path(self.config.storage_dir) self.storage_path.mkdir(parents=True, exist_ok=True) # Initialize storage structures self._summaries: Dict[int, SimulationSummary] = {} self._memmap_files: Dict[str, np.memmap] = {} self._hdf5_file: Optional[h5py.File] = None # Track storage statistics self._total_simulations = 0 self._disk_usage = 0.0 # Setup backend if self.config.backend == "hdf5": self._setup_hdf5() else: self._setup_memmap() def _setup_memmap(self) -> None: """Setup memory-mapped array storage.""" # Create directories for different data types (self.storage_path / "summaries").mkdir(exist_ok=True) if self.config.enable_time_series: (self.storage_path / "time_series").mkdir(exist_ok=True) def _setup_hdf5(self) -> None: """Setup HDF5 storage backend.""" hdf5_path = self.storage_path / "trajectories.h5" # Open or create HDF5 file self._hdf5_file = h5py.File(hdf5_path, "a") # Create groups if they don't exist if "summaries" not in self._hdf5_file: self._hdf5_file.create_group("summaries") if self.config.enable_time_series and "time_series" not in self._hdf5_file: self._hdf5_file.create_group("time_series")
[docs] def store_simulation( self, sim_id: int, annual_losses: np.ndarray, insurance_recoveries: np.ndarray, retained_losses: np.ndarray, final_assets: float, initial_assets: float, ruin_occurred: bool = False, ruin_year: Optional[int] = None, ) -> None: """Store simulation trajectory with automatic sampling. Args: sim_id: Simulation identifier annual_losses: Array of annual losses insurance_recoveries: Array of insurance recoveries retained_losses: Array of retained losses final_assets: Final asset value initial_assets: Initial asset value ruin_occurred: Whether ruin occurred ruin_year: Year of ruin (if applicable) """ # Check disk usage limit if not self._check_disk_space(): warnings.warn( f"Disk usage limit ({self.config.max_disk_usage_gb}GB) exceeded", UserWarning ) return # Calculate and store summary statistics if self.config.enable_summary_stats: summary = self._calculate_summary( sim_id=sim_id, annual_losses=annual_losses, insurance_recoveries=insurance_recoveries, final_assets=final_assets, initial_assets=initial_assets, ruin_occurred=ruin_occurred, ruin_year=ruin_year, ) self._store_summary(summary) # Store sampled time series data if self.config.enable_time_series: self._store_time_series( sim_id=sim_id, annual_losses=annual_losses, insurance_recoveries=insurance_recoveries, retained_losses=retained_losses, ) self._total_simulations += 1 # Periodic cleanup to manage memory if self._total_simulations % self.config.chunk_size == 0: self._cleanup_memory()
def _calculate_summary( self, sim_id: int, annual_losses: np.ndarray, insurance_recoveries: np.ndarray, final_assets: float, initial_assets: float, ruin_occurred: bool, ruin_year: Optional[int], ) -> SimulationSummary: """Calculate summary statistics for a simulation. Args: sim_id: Simulation identifier annual_losses: Annual loss amounts insurance_recoveries: Insurance recovery amounts final_assets: Final asset value initial_assets: Initial asset value ruin_occurred: Whether ruin occurred ruin_year: Year of ruin Returns: SimulationSummary with calculated statistics """ n_years = len(annual_losses) # Calculate growth rate if final_assets > 0 and initial_assets > 0: growth_rate = np.log(final_assets / initial_assets) / n_years else: growth_rate = -np.inf # Calculate volatility (simplified) if n_years > 1: # Avoid division by zero non_zero_mask = annual_losses[:-1] != 0 if np.any(non_zero_mask): returns = np.diff(annual_losses)[non_zero_mask] / annual_losses[:-1][non_zero_mask] volatility = np.std(returns) if len(returns) > 0 else 0.0 else: volatility = 0.0 else: volatility = 0.0 return SimulationSummary( sim_id=sim_id, final_assets=float(final_assets), total_losses=float(np.sum(annual_losses)), total_recoveries=float(np.sum(insurance_recoveries)), mean_annual_loss=float(np.mean(annual_losses)), max_annual_loss=float(np.max(annual_losses)), min_annual_loss=float(np.min(annual_losses)), growth_rate=float(growth_rate), ruin_occurred=ruin_occurred, ruin_year=ruin_year, volatility=float(volatility), ) def _store_summary(self, summary: SimulationSummary) -> None: """Store summary statistics. Args: summary: Simulation summary to store """ self._summaries[summary.sim_id] = summary # Persist to disk periodically if len(self._summaries) >= self.config.chunk_size: self._persist_summaries() def _store_time_series( self, sim_id: int, annual_losses: np.ndarray, insurance_recoveries: np.ndarray, retained_losses: np.ndarray, ) -> None: """Store sampled time series data. Args: sim_id: Simulation identifier annual_losses: Annual loss amounts insurance_recoveries: Insurance recovery amounts retained_losses: Retained loss amounts """ # Sample data according to interval sample_indices = np.arange(0, len(annual_losses), self.config.sample_interval) if self.config.backend == "hdf5": self._store_time_series_hdf5( sim_id, sample_indices, annual_losses, insurance_recoveries, retained_losses ) else: self._store_time_series_memmap( sim_id, sample_indices, annual_losses, insurance_recoveries, retained_losses ) def _store_time_series_memmap( self, sim_id: int, sample_indices: np.ndarray, annual_losses: np.ndarray, insurance_recoveries: np.ndarray, retained_losses: np.ndarray, ) -> None: """Store time series using memory-mapped arrays. Args: sim_id: Simulation identifier sample_indices: Indices to sample annual_losses: Annual loss amounts insurance_recoveries: Insurance recovery amounts retained_losses: Retained loss amounts """ # Create memory-mapped file for this simulation ts_path = self.storage_path / "time_series" / f"sim_{sim_id}.dat" # Stack sampled data sampled_data = np.vstack( [ annual_losses[sample_indices], insurance_recoveries[sample_indices], retained_losses[sample_indices], ] ).astype(self.config.dtype) # Write to memory-mapped file mmap = np.memmap( ts_path, dtype=self.config.dtype, mode="w+", shape=sampled_data.shape, ) mmap[:] = sampled_data mmap.flush() del mmap # Close the file def _store_time_series_hdf5( self, sim_id: int, sample_indices: np.ndarray, annual_losses: np.ndarray, insurance_recoveries: np.ndarray, retained_losses: np.ndarray, ) -> None: """Store time series using HDF5. Args: sim_id: Simulation identifier sample_indices: Indices to sample annual_losses: Annual loss amounts insurance_recoveries: Insurance recovery amounts retained_losses: Retained loss amounts """ if not self._hdf5_file: return # Create dataset for this simulation ts_group = self._hdf5_file["time_series"] sim_group = ts_group.create_group(f"sim_{sim_id}") # Store sampled data with compression compression = "gzip" if self.config.compression else None compression_opts = self.config.compression_level if self.config.compression else None sim_group.create_dataset( "annual_losses", data=annual_losses[sample_indices].astype(self.config.dtype), compression=compression, compression_opts=compression_opts, ) sim_group.create_dataset( "insurance_recoveries", data=insurance_recoveries[sample_indices].astype(self.config.dtype), compression=compression, compression_opts=compression_opts, ) sim_group.create_dataset( "retained_losses", data=retained_losses[sample_indices].astype(self.config.dtype), compression=compression, compression_opts=compression_opts, ) sim_group.attrs["sample_indices"] = sample_indices # Flush to disk self._hdf5_file.flush()
[docs] def load_simulation(self, sim_id: int, load_time_series: bool = False) -> Dict[str, Any]: """Load simulation data with lazy loading. Args: sim_id: Simulation identifier load_time_series: Whether to load time series data Returns: Dictionary with simulation data """ result = {} # Load summary if available if sim_id in self._summaries: result["summary"] = self._summaries[sim_id].to_dict() else: # Try loading from disk summary = self._load_summary_from_disk(sim_id) if summary: result["summary"] = summary.to_dict() # Load time series if requested if load_time_series: time_series = self._load_time_series(sim_id) if time_series: result["time_series"] = time_series return result
def _load_summary_from_disk(self, sim_id: int) -> Optional[SimulationSummary]: """Load summary from disk storage. Args: sim_id: Simulation identifier Returns: SimulationSummary or None if not found """ if self.config.backend == "hdf5" and self._hdf5_file: if f"sim_{sim_id}" in self._hdf5_file["summaries"]: data = self._hdf5_file[f"summaries/sim_{sim_id}"] return SimulationSummary(**{k: data.attrs[k] for k in data.attrs}) else: summary_file = self.storage_path / "summaries" / f"sim_{sim_id}.json" if summary_file.exists(): with open(summary_file, "r") as f: data = json.load(f) return SimulationSummary(**data) return None def _load_time_series(self, sim_id: int) -> Optional[Dict[str, np.ndarray]]: """Load time series data for a simulation. Args: sim_id: Simulation identifier Returns: Dictionary with time series arrays or None """ if self.config.backend == "hdf5" and self._hdf5_file: if f"sim_{sim_id}" in self._hdf5_file["time_series"]: sim_group = self._hdf5_file[f"time_series/sim_{sim_id}"] return { "annual_losses": np.array(sim_group["annual_losses"]), "insurance_recoveries": np.array(sim_group["insurance_recoveries"]), "retained_losses": np.array(sim_group["retained_losses"]), "sample_indices": sim_group.attrs["sample_indices"], } else: ts_path = self.storage_path / "time_series" / f"sim_{sim_id}.dat" if ts_path.exists(): # Determine shape from first file mmap = np.memmap(ts_path, dtype=self.config.dtype, mode="r") # Reshape assuming 3 rows (losses, recoveries, retained) n_samples = len(mmap) // 3 reshaped = np.array(mmap).reshape((3, n_samples)) return { "annual_losses": reshaped[0], "insurance_recoveries": reshaped[1], "retained_losses": reshaped[2], } return None
[docs] def export_summaries_csv(self, output_path: str) -> None: """Export all summary statistics to CSV. Args: output_path: Path for CSV output file """ # Ensure all summaries are persisted self._persist_summaries() # Collect all summaries all_summaries: List[SimulationSummary] = [] # From memory all_summaries.extend(self._summaries.values()) # From disk (if using memmap) if self.config.backend == "memmap": summary_dir = self.storage_path / "summaries" for summary_file in summary_dir.glob("*.json"): with open(summary_file, "r") as f: data = json.load(f) all_summaries.append(SimulationSummary(**data)) # Write to CSV if all_summaries: df = pd.DataFrame([s.to_dict() for s in all_summaries]) df.to_csv(output_path, index=False) print(f"Exported {len(all_summaries)} summaries to {output_path}")
[docs] def export_summaries_json(self, output_path: str) -> None: """Export all summary statistics to JSON. Args: output_path: Path for JSON output file """ # Ensure all summaries are persisted self._persist_summaries() # Collect all summaries all_summaries: List[Dict[str, Any]] = [] # From memory all_summaries.extend([s.to_dict() for s in self._summaries.values()]) # From disk (if using memmap) if self.config.backend == "memmap": summary_dir = self.storage_path / "summaries" for summary_file in summary_dir.glob("*.json"): with open(summary_file, "r") as f: all_summaries.append(json.load(f)) # Write to JSON with open(output_path, "w") as f: json.dump(all_summaries, f, indent=2) print(f"Exported {len(all_summaries)} summaries to {output_path}")
def _persist_summaries(self) -> None: """Persist in-memory summaries to disk.""" if self.config.backend == "hdf5" and self._hdf5_file: # Store in HDF5 summary_group = self._hdf5_file["summaries"] for sim_id, summary in self._summaries.items(): if f"sim_{sim_id}" not in summary_group: sim_group = summary_group.create_group(f"sim_{sim_id}") for key, value in summary.to_dict().items(): sim_group.attrs[key] = value if value is not None else -1 self._hdf5_file.flush() else: # Store as JSON files summary_dir = self.storage_path / "summaries" for sim_id, summary in self._summaries.items(): summary_file = summary_dir / f"sim_{sim_id}.json" with open(summary_file, "w") as f: json.dump(summary.to_dict(), f) # Clear memory cache after persisting self._summaries.clear() def _check_disk_space(self) -> bool: """Check if disk usage is within limits. Returns: True if within limits, False otherwise """ # Calculate current disk usage total_size = 0 for path in self.storage_path.rglob("*"): if path.is_file(): total_size += path.stat().st_size self._disk_usage = total_size / (1024**3) # Convert to GB return self._disk_usage < self.config.max_disk_usage_gb def _cleanup_memory(self) -> None: """Periodic cleanup to manage memory usage.""" # Persist any cached summaries if self._summaries: self._persist_summaries() # Force garbage collection gc.collect()
[docs] def get_storage_stats(self) -> Dict[str, Any]: """Get storage statistics. Returns: Dictionary with storage statistics """ self._check_disk_space() return { "total_simulations": self._total_simulations, "disk_usage_gb": self._disk_usage, "disk_limit_gb": self.config.max_disk_usage_gb, "backend": self.config.backend, "sample_interval": self.config.sample_interval, "compression_enabled": self.config.compression, "storage_directory": str(self.storage_path), }
[docs] def clear_storage(self) -> None: """Clear all stored data.""" # Close HDF5 file if open if self._hdf5_file: self._hdf5_file.close() self._hdf5_file = None # Clear memory caches self._summaries.clear() self._memmap_files.clear() # Remove storage directory if self.storage_path.exists(): shutil.rmtree(self.storage_path) self.storage_path.mkdir(parents=True, exist_ok=True) # Reset counters self._total_simulations = 0 self._disk_usage = 0.0 # Reinitialize backend if self.config.backend == "hdf5": self._setup_hdf5() else: self._setup_memmap()
[docs] def __enter__(self): """Context manager entry.""" return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit - ensure data is persisted.""" # Persist any remaining summaries if self._summaries: self._persist_summaries() # Close HDF5 file if open if self._hdf5_file: self._hdf5_file.close() self._hdf5_file = None # Clear memory-mapped files self._memmap_files.clear()