Source code for ergodic_insurance.reporting.cache_manager

"""High-performance caching system for expensive Monte Carlo computations.

This module implements a robust caching system that stores Monte Carlo simulation
results and processed data using efficient storage formats (HDF5, Parquet) with
hash-based invalidation and memory-mapped loading for optimal performance.

The caching system achieves:
    - <1 second load time for 10,000 paths × 1,000 years
    - Automatic cache invalidation on parameter changes
    - Memory-efficient loading with memory mapping
    - Support for both local and cloud storage backends

Example:
    >>> from ergodic_insurance.reporting import CacheManager
    >>> cache = CacheManager()
    >>> # Cache expensive simulation
    >>> key = cache.cache_simulation_paths(
    ...     params={'n_sims': 10000, 'n_years': 1000},
    ...     paths=simulation_data,
    ...     metadata={'seed': 42}
    ... )
    >>> # Load from cache (100x faster)
    >>> cached_paths = cache.load_simulation_paths(params)
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import hashlib
import json
from pathlib import Path
import pickle
import shutil
import time
from typing import Any, Callable, Dict, List, Optional
import warnings

import h5py
import numpy as np
import pandas as pd

from ..safe_pickle import safe_dump, safe_load


[docs] class StorageBackend(Enum): """Supported storage backend types.""" LOCAL = "local" S3 = "s3" AZURE = "azure" GCS = "gcs"
[docs] @dataclass class CacheConfig: """Configuration for the cache manager. Attributes: cache_dir: Root directory for cache storage max_cache_size_gb: Maximum cache size in gigabytes ttl_hours: Time-to-live for cache entries in hours compression: Compression algorithm for HDF5 ('gzip', 'lzf', None) compression_level: Compression level (1-9 for gzip) enable_memory_mapping: Use memory mapping for large files backend: Storage backend type backend_config: Backend-specific configuration """ cache_dir: Path = Path("./cache") max_cache_size_gb: float = 10.0 ttl_hours: Optional[int] = None compression: Optional[str] = "gzip" compression_level: int = 4 enable_memory_mapping: bool = True backend: StorageBackend = StorageBackend.LOCAL backend_config: Dict[str, Any] = field(default_factory=dict)
[docs] def __post_init__(self): """Validate and initialize configuration.""" self.cache_dir = Path(self.cache_dir) if self.compression and self.compression not in ["gzip", "lzf"]: raise ValueError(f"Invalid compression: {self.compression}") if self.compression and (self.compression_level < 1 or self.compression_level > 9): raise ValueError(f"Compression level must be 1-9, got {self.compression_level}")
[docs] @dataclass class CacheStats: """Statistics about cache usage and performance. Attributes: total_size_bytes: Total size of cached data n_entries: Number of cache entries n_hits: Number of cache hits n_misses: Number of cache misses hit_rate: Cache hit rate (0-1) avg_load_time_ms: Average load time in milliseconds avg_save_time_ms: Average save time in milliseconds oldest_entry: Timestamp of oldest cache entry newest_entry: Timestamp of newest cache entry """ total_size_bytes: int = 0 n_entries: int = 0 n_hits: int = 0 n_misses: int = 0 hit_rate: float = 0.0 avg_load_time_ms: float = 0.0 avg_save_time_ms: float = 0.0 oldest_entry: Optional[datetime] = None newest_entry: Optional[datetime] = None
[docs] def update_hit_rate(self): """Update the cache hit rate.""" total = self.n_hits + self.n_misses self.hit_rate = self.n_hits / total if total > 0 else 0.0
[docs] @dataclass class CacheKey: """Cache key with metadata for cache entries. Attributes: hash_key: SHA256 hash of parameters params: Original parameters dictionary timestamp: Creation timestamp size_bytes: Size of cached data access_count: Number of times accessed last_accessed: Last access timestamp """ hash_key: str params: Dict[str, Any] timestamp: datetime = field(default_factory=datetime.now) size_bytes: int = 0 access_count: int = 0 last_accessed: datetime = field(default_factory=datetime.now)
[docs] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { "hash_key": self.hash_key, "params": self.params, "timestamp": self.timestamp.isoformat(), "size_bytes": self.size_bytes, "access_count": self.access_count, "last_accessed": self.last_accessed.isoformat(), }
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "CacheKey": """Create from dictionary.""" return cls( hash_key=data["hash_key"], params=data["params"], timestamp=datetime.fromisoformat(data["timestamp"]), size_bytes=data["size_bytes"], access_count=data["access_count"], last_accessed=datetime.fromisoformat(data["last_accessed"]), )
[docs] class BaseStorageBackend(ABC): """Abstract base class for storage backends."""
[docs] @abstractmethod def exists(self, path: Path) -> bool: """Check if path exists."""
[docs] @abstractmethod def save(self, path: Path, data: Any, file_format: str = "pickle") -> int: """Save data to path, return size in bytes."""
[docs] @abstractmethod def load(self, path: Path, file_format: str = "pickle") -> Any: """Load data from path."""
[docs] @abstractmethod def delete(self, path: Path) -> bool: """Delete data at path."""
[docs] @abstractmethod def list_files(self, pattern: str = "*") -> List[Path]: """List files matching pattern."""
[docs] @abstractmethod def get_size(self, path: Path) -> int: """Get size of file in bytes."""
[docs] class LocalStorageBackend(BaseStorageBackend): """Local filesystem storage backend.""" def __init__(self, root_dir: Path): """Initialize local storage backend. Args: root_dir: Root directory for storage """ self.root_dir = Path(root_dir) self.root_dir.mkdir(parents=True, exist_ok=True)
[docs] def exists(self, path: Path) -> bool: """Check if path exists.""" full_path = self.root_dir / path return full_path.exists()
[docs] def save(self, path: Path, data: Any, file_format: str = "pickle") -> int: """Save data to path.""" full_path = self.root_dir / path full_path.parent.mkdir(parents=True, exist_ok=True) if file_format == "pickle": with open(full_path, "wb") as f: safe_dump(data, f) elif file_format == "json": with open(full_path, "w") as f: json.dump(data, f) elif file_format == "parquet": if isinstance(data, pd.DataFrame): data.to_parquet(full_path, compression="snappy") else: raise ValueError("Parquet format requires pandas DataFrame") else: raise ValueError(f"Unsupported format: {file_format}") return full_path.stat().st_size
[docs] def load(self, path: Path, file_format: str = "pickle") -> Any: """Load data from path.""" full_path = self.root_dir / path if file_format == "pickle": with open(full_path, "rb") as f: return safe_load(f) elif file_format == "json": with open(full_path, "r") as f: return json.load(f) elif file_format == "parquet": return pd.read_parquet(full_path) else: raise ValueError(f"Unsupported format: {file_format}")
[docs] def delete(self, path: Path) -> bool: """Delete data at path.""" full_path = self.root_dir / path if full_path.exists(): if full_path.is_dir(): shutil.rmtree(full_path) else: full_path.unlink() return True return False
[docs] def list_files(self, pattern: str = "*") -> List[Path]: """List files matching pattern.""" return list(self.root_dir.glob(pattern))
[docs] def get_size(self, path: Path) -> int: """Get size of file in bytes.""" full_path = self.root_dir / path if full_path.exists(): if full_path.is_file(): return full_path.stat().st_size return sum(f.stat().st_size for f in full_path.rglob("*") if f.is_file()) return 0
[docs] class CacheManager: """High-performance cache manager for Monte Carlo simulations. This class provides efficient caching of simulation results with automatic invalidation, memory-mapped loading, and configurable storage backends. Attributes: config: Cache configuration stats: Cache statistics backend: Storage backend instance Example: >>> cache = CacheManager(config=CacheConfig(cache_dir="./cache")) >>> # Cache simulation results >>> cache.cache_simulation_paths( ... params={'n_sims': 10000}, ... paths=np.random.randn(10000, 100) ... ) >>> # Load from cache >>> paths = cache.load_simulation_paths({'n_sims': 10000}) """ def __init__(self, config: Optional[CacheConfig] = None): """Initialize cache manager. Args: config: Cache configuration, uses defaults if None """ self.config = config or CacheConfig() self.stats = CacheStats() # Initialize storage backend if self.config.backend == StorageBackend.LOCAL: self.backend = LocalStorageBackend(self.config.cache_dir) else: raise NotImplementedError(f"Backend {self.config.backend} not implemented") # Create cache directory structure self._init_cache_structure() # Initialize cache index self._cache_index: Dict[str, CacheKey] = {} # Load existing cache metadata self._load_metadata() # Track timing for statistics self._load_times: List[float] = [] self._save_times: List[float] = [] def _init_cache_structure(self): """Initialize cache directory structure.""" subdirs = [ "raw_simulations", "processed_results", "figures/executive", "figures/technical", "metadata", ] for subdir in subdirs: dir_path = self.config.cache_dir / subdir dir_path.mkdir(parents=True, exist_ok=True) def _generate_cache_key(self, params: Dict[str, Any]) -> str: """Generate hash-based cache key from parameters. Args: params: Dictionary of parameters Returns: SHA256 hash string """ # Sort keys for consistent hashing sorted_params = json.dumps(params, sort_keys=True, default=str) return hashlib.sha256(sorted_params.encode()).hexdigest() def _load_metadata(self): """Load cache metadata from disk.""" metadata_file = self.config.cache_dir / "metadata" / "cache_index.json" if metadata_file.exists(): try: with open(metadata_file, "r") as f: data = json.load(f) self.stats = CacheStats(**data.get("stats", {})) self._cache_index = { k: CacheKey.from_dict(v) for k, v in data.get("index", {}).items() } except (json.JSONDecodeError, KeyError, ValueError) as e: warnings.warn(f"Failed to load cache metadata: {e}") self._cache_index = {} else: self._cache_index = {} def _save_metadata(self): """Save cache metadata to disk.""" metadata_file = self.config.cache_dir / "metadata" / "cache_index.json" try: data = { "stats": { "total_size_bytes": self.stats.total_size_bytes, "n_entries": self.stats.n_entries, "n_hits": self.stats.n_hits, "n_misses": self.stats.n_misses, "hit_rate": self.stats.hit_rate, "avg_load_time_ms": self.stats.avg_load_time_ms, "avg_save_time_ms": self.stats.avg_save_time_ms, }, "index": {k: v.to_dict() for k, v in self._cache_index.items()}, } with open(metadata_file, "w") as f: json.dump(data, f, indent=2, default=str) except (OSError, TypeError) as e: warnings.warn(f"Failed to save cache metadata: {e}")
[docs] def cache_simulation_paths( self, params: Dict[str, Any], paths: np.ndarray, metadata: Optional[Dict[str, Any]] = None ) -> str: """Cache Monte Carlo simulation paths to HDF5. Stores large simulation arrays efficiently using HDF5 with optional compression. Supports arrays up to 10,000 x 1,000 dimensions. Args: params: Simulation parameters (used for cache key) paths: Numpy array of simulation paths (n_simulations, n_years) metadata: Optional metadata to store with paths Returns: Cache key for retrieval Example: >>> paths = np.random.randn(10000, 1000) >>> key = cache.cache_simulation_paths( ... params={'n_sims': 10000, 'seed': 42}, ... paths=paths, ... metadata={'generator': 'numpy'} ... ) """ start_time = time.time() # Generate cache key cache_key = self._generate_cache_key(params) file_path = Path("raw_simulations") / f"{cache_key}.h5" full_path = self.config.cache_dir / file_path # Save to HDF5 with h5py.File(full_path, "w") as f: # Create dataset with compression if self.config.compression: if self.config.compression == "lzf": # LZF doesn't support compression options f.create_dataset("paths", data=paths, compression="lzf") else: # GZIP supports compression level f.create_dataset( "paths", data=paths, compression=self.config.compression, compression_opts=self.config.compression_level, ) else: f.create_dataset("paths", data=paths) # Store metadata if metadata: meta_group = f.create_group("metadata") for key, value in metadata.items(): if isinstance(value, (int, float, str, bool)): meta_group.attrs[key] = value else: meta_group.attrs[key] = json.dumps(value, default=str) # Store parameters f.attrs["params"] = json.dumps(params, default=str) f.attrs["timestamp"] = datetime.now().isoformat() # Update cache index size_bytes = full_path.stat().st_size self._cache_index[cache_key] = CacheKey( hash_key=cache_key, params=params, size_bytes=size_bytes ) # Update statistics save_time = (time.time() - start_time) * 1000 self._save_times.append(save_time) self.stats.avg_save_time_ms = float(np.mean(self._save_times[-100:])) self.stats.n_entries = len(self._cache_index) self.stats.total_size_bytes += size_bytes # Save metadata self._save_metadata() # Check cache size limit self._enforce_size_limit() return cache_key
[docs] def load_simulation_paths( self, params: Dict[str, Any], memory_map: Optional[bool] = None ) -> Optional[np.ndarray]: """Load simulation paths from cache. Retrieves cached simulation data with optional memory mapping for efficient loading of large arrays. Args: params: Simulation parameters (must match cached params) memory_map: Use memory mapping for large files (None=auto) Returns: Numpy array of paths or None if not cached Example: >>> paths = cache.load_simulation_paths( ... params={'n_sims': 10000, 'seed': 42} ... ) >>> if paths is not None: ... print(f"Loaded {paths.shape} from cache") """ start_time = time.time() # Generate cache key cache_key = self._generate_cache_key(params) # Check if cached if cache_key not in self._cache_index: self.stats.n_misses += 1 self.stats.update_hit_rate() return None file_path = Path("raw_simulations") / f"{cache_key}.h5" full_path = self.config.cache_dir / file_path if not full_path.exists(): # Cache index out of sync del self._cache_index[cache_key] self.stats.n_misses += 1 self.stats.update_hit_rate() return None # Check TTL if configured if self.config.ttl_hours: cache_entry = self._cache_index[cache_key] age_hours = (datetime.now() - cache_entry.timestamp).total_seconds() / 3600 if age_hours > self.config.ttl_hours: # Cache expired self.invalidate_cache(params) self.stats.n_misses += 1 self.stats.update_hit_rate() return None # Load from HDF5 try: # Determine memory mapping if memory_map is None: memory_map = self.config.enable_memory_mapping if memory_map: # Memory-mapped loading (lazy) f = h5py.File(full_path, "r") paths = f["paths"] # Convert to numpy array (still memory-mapped) paths = np.array(paths) f.close() else: # Load entire array into memory with h5py.File(full_path, "r") as f: paths = f["paths"][:] # Update cache entry cache_entry = self._cache_index[cache_key] cache_entry.access_count += 1 cache_entry.last_accessed = datetime.now() # Update statistics load_time = (time.time() - start_time) * 1000 self._load_times.append(load_time) self.stats.avg_load_time_ms = float(np.mean(self._load_times[-100:])) self.stats.n_hits += 1 self.stats.update_hit_rate() return paths # type: ignore[no-any-return] except (OSError, KeyError, ValueError) as e: warnings.warn(f"Failed to load cached data: {e}") self.stats.n_misses += 1 self.stats.update_hit_rate() return None
[docs] def cache_processed_results( self, params: Dict[str, Any], results: pd.DataFrame, result_type: str = "generic" ) -> str: """Cache processed results as Parquet. Stores tabular results efficiently using Parquet format with columnar compression for fast queries. Args: params: Processing parameters (used for cache key) results: Pandas DataFrame with results result_type: Type of results (e.g., 'efficient_frontier') Returns: Cache key for retrieval Example: >>> df = pd.DataFrame({'limit': [1e6, 2e6], 'premium': [1e4, 2e4]}) >>> key = cache.cache_processed_results( ... params={'optimization': 'pareto'}, ... results=df, ... result_type='efficient_frontier' ... ) """ start_time = time.time() # Generate cache key cache_key = self._generate_cache_key(params) file_path = Path("processed_results") / f"{result_type}_{cache_key}.parquet" full_path = self.config.cache_dir / file_path # Save to Parquet results.to_parquet(full_path, compression="snappy", index=True) # Update cache index size_bytes = full_path.stat().st_size self._cache_index[f"{result_type}_{cache_key}"] = CacheKey( hash_key=cache_key, params=params, size_bytes=size_bytes ) # Update statistics save_time = (time.time() - start_time) * 1000 self._save_times.append(save_time) self.stats.avg_save_time_ms = float(np.mean(self._save_times[-100:])) self.stats.n_entries = len(self._cache_index) self.stats.total_size_bytes += size_bytes # Save metadata self._save_metadata() return cache_key
[docs] def load_processed_results( self, params: Dict[str, Any], result_type: str = "generic" ) -> Optional[pd.DataFrame]: """Load processed results from cache. Retrieves cached tabular results from Parquet storage. Args: params: Processing parameters (must match cached params) result_type: Type of results to load Returns: DataFrame with results or None if not cached Example: >>> df = cache.load_processed_results( ... params={'optimization': 'pareto'}, ... result_type='efficient_frontier' ... ) """ start_time = time.time() # Generate cache key cache_key = self._generate_cache_key(params) full_key = f"{result_type}_{cache_key}" # Check if cached if full_key not in self._cache_index: self.stats.n_misses += 1 self.stats.update_hit_rate() return None file_path = Path("processed_results") / f"{result_type}_{cache_key}.parquet" full_path = self.config.cache_dir / file_path if not full_path.exists(): del self._cache_index[full_key] self.stats.n_misses += 1 self.stats.update_hit_rate() return None try: # Load from Parquet results = pd.read_parquet(full_path) # Update cache entry cache_entry = self._cache_index[full_key] cache_entry.access_count += 1 cache_entry.last_accessed = datetime.now() # Update statistics load_time = (time.time() - start_time) * 1000 self._load_times.append(load_time) self.stats.avg_load_time_ms = float(np.mean(self._load_times[-100:])) self.stats.n_hits += 1 self.stats.update_hit_rate() return results except (OSError, KeyError, ValueError) as e: warnings.warn(f"Failed to load cached results: {e}") self.stats.n_misses += 1 self.stats.update_hit_rate() return None
[docs] def cache_figure( self, params: Dict[str, Any], figure: Any, figure_name: str, figure_type: str = "technical", file_format: str = "pickle", ) -> str: """Cache matplotlib or plotly figure. Stores figure objects with metadata for fast regeneration. Args: params: Figure parameters (used for cache key) figure: Matplotlib or Plotly figure object figure_name: Name of the figure figure_type: 'executive' or 'technical' file_format: Storage format ('pickle', 'json' for plotly) Returns: Cache key for retrieval """ # Generate cache key cache_key = self._generate_cache_key(params) file_ext = "pkl" if file_format == "pickle" else "json" file_path = Path("figures") / figure_type / f"{figure_name}_{cache_key}.{file_ext}" full_path = self.config.cache_dir / file_path # Save figure if file_format == "pickle": with open(full_path, "wb") as f: safe_dump(figure, f) elif file_format == "json": # For Plotly figures import plotly.io as pio pio.write_json(figure, str(full_path)) # Update cache index size_bytes = full_path.stat().st_size self._cache_index[f"fig_{figure_name}_{cache_key}"] = CacheKey( hash_key=cache_key, params=params, size_bytes=size_bytes ) self.stats.n_entries = len(self._cache_index) self.stats.total_size_bytes += size_bytes self._save_metadata() return cache_key
[docs] def invalidate_cache(self, params: Optional[Dict[str, Any]] = None): """Invalidate cache entries. Args: params: If provided, only invalidate entries matching these params. If None, invalidate all cache. """ if params is None: # Clear all cache self.clear_cache() else: # Invalidate specific entries cache_key = self._generate_cache_key(params) # Find and remove all entries with this key keys_to_remove = [k for k in self._cache_index.keys() if cache_key in k] for key in keys_to_remove: # Determine file path based on key prefix if key.startswith("fig_"): # Figure cache _parts = key.split("_", 2) file_path = Path("figures") / "*" / f"*{cache_key}*" elif "_" in key and not key.startswith("fig_"): # Processed results file_path = Path("processed_results") / f"*{cache_key}*" else: # Raw simulations file_path = Path("raw_simulations") / f"{cache_key}.h5" # Delete files for f in self.config.cache_dir.glob(str(file_path)): f.unlink() # Update stats if key in self._cache_index: self.stats.total_size_bytes -= self._cache_index[key].size_bytes del self._cache_index[key] self.stats.n_entries = len(self._cache_index) self._save_metadata()
[docs] def clear_cache(self, confirm: bool = True): """Clear all cached data. Args: confirm: Require confirmation before clearing """ if confirm: response = input(f"Clear {self.stats.total_size_bytes / 1e9:.2f} GB of cache? (y/n): ") if response.lower() != "y": print("Cache clear cancelled") return # Remove all cached files for subdir in ["raw_simulations", "processed_results", "figures"]: dir_path = self.config.cache_dir / subdir if dir_path.exists(): shutil.rmtree(dir_path) dir_path.mkdir(parents=True, exist_ok=True) # Reset cache index and stats self._cache_index = {} self.stats = CacheStats() self._save_metadata() print("Cache cleared successfully")
[docs] def get_cache_stats(self) -> CacheStats: """Get current cache statistics. Returns: CacheStats object with usage information """ # Update total size total_size = 0 for subdir in ["raw_simulations", "processed_results", "figures"]: dir_path = self.config.cache_dir / subdir if dir_path.exists(): for f in dir_path.rglob("*"): if f.is_file(): total_size += f.stat().st_size self.stats.total_size_bytes = total_size self.stats.n_entries = len(self._cache_index) # Find oldest and newest entries if self._cache_index: timestamps = [entry.timestamp for entry in self._cache_index.values()] self.stats.oldest_entry = min(timestamps) self.stats.newest_entry = max(timestamps) return self.stats
[docs] def warm_cache( self, scenarios: List[Dict[str, Any]], compute_func: Callable[[Dict[str, Any]], Any], result_type: str = "simulation", ) -> int: """Pre-compute and cache results for common scenarios. Args: scenarios: List of parameter dictionaries to compute compute_func: Function that takes params and returns results result_type: Type of results being cached Returns: Number of scenarios cached Example: >>> scenarios = [ ... {'n_sims': 1000, 'seed': i} for i in range(10) ... ] >>> n_cached = cache.warm_cache( ... scenarios, ... lambda p: np.random.randn(p['n_sims'], 100) ... ) """ n_cached = 0 for params in scenarios: # Check if already cached cache_key = self._generate_cache_key(params) if result_type == "simulation": if cache_key in self._cache_index: continue # Compute and cache try: results = compute_func(params) self.cache_simulation_paths(params, results) n_cached += 1 except (OSError, ValueError, TypeError) as e: warnings.warn(f"Failed to warm cache for {params}: {e}") print(f"Warmed cache with {n_cached} new entries") return n_cached
def _enforce_size_limit(self): """Enforce maximum cache size limit using LRU eviction.""" if self.config.max_cache_size_gb <= 0: return max_size_bytes = self.config.max_cache_size_gb * 1e9 if self.stats.total_size_bytes > max_size_bytes: # Sort entries by last accessed time (LRU) sorted_entries = sorted(self._cache_index.items(), key=lambda x: x[1].last_accessed) # Remove oldest entries until under limit while self.stats.total_size_bytes > max_size_bytes and sorted_entries: key, entry = sorted_entries.pop(0) # Delete file if key.startswith("fig_"): # Figure cache - need to find the actual file pattern = f"*{entry.hash_key}*" for f in (self.config.cache_dir / "figures").rglob(pattern): f.unlink() elif "_" in key and not key.startswith("fig_"): # Processed results pattern = f"*{entry.hash_key}*" for f in (self.config.cache_dir / "processed_results").glob(pattern): f.unlink() else: # Raw simulations file_path = self.config.cache_dir / "raw_simulations" / f"{entry.hash_key}.h5" if file_path.exists(): file_path.unlink() # Update stats self.stats.total_size_bytes -= entry.size_bytes self.stats.n_entries -= 1 del self._cache_index[key] self._save_metadata() def _get_cache_files(self, key: str, entry: CacheKey) -> List[Path]: """Get files associated with a cache key.""" if key.startswith("fig_"): pattern = f"*{entry.hash_key}*" return list((self.config.cache_dir / "figures").rglob(pattern)) if "_" in key and not key.startswith("fig_"): pattern = f"*{entry.hash_key}*" return list((self.config.cache_dir / "processed_results").glob(pattern)) file_path = self.config.cache_dir / "raw_simulations" / f"{entry.hash_key}.h5" return [file_path] if file_path.exists() else [] def _validate_file(self, key: str, file_path: Path) -> bool: """Validate a single cache file.""" try: if key.startswith("fig_"): with open(file_path, "rb") as f: safe_load(f) elif "_" in key and not key.startswith("fig_"): pd.read_parquet(file_path) else: with h5py.File(file_path, "r") as f: _ = f["paths"].shape return True except (OSError, KeyError, ValueError, pickle.PickleError): return False
[docs] def validate_cache(self) -> Dict[str, Any]: """Validate cache integrity and consistency. Returns: Dictionary with validation results """ results: Dict[str, Any] = { "valid_entries": 0, "missing_files": [], "orphaned_files": [], "corrupted_files": [], "total_issues": 0, } # Check all index entries have corresponding files for key, entry in self._cache_index.items(): files = self._get_cache_files(key, entry) if not files: results["missing_files"].append(key) elif self._validate_file(key, files[0]): results["valid_entries"] += 1 else: results["corrupted_files"].append(str(files[0])) # Check for orphaned files (files without index entries) all_cached_files = set() for subdir in ["raw_simulations", "processed_results", "figures"]: dir_path = self.config.cache_dir / subdir if dir_path.exists(): for f in dir_path.rglob("*"): if f.is_file() and not f.name.startswith("."): all_cached_files.add(f) indexed_hashes = {entry.hash_key for entry in self._cache_index.values()} for file_path in all_cached_files: # Extract hash from filename file_hash = None for hash_str in indexed_hashes: if hash_str in str(file_path): file_hash = hash_str break if not file_hash: results["orphaned_files"].append(str(file_path)) results["total_issues"] = ( len(results["missing_files"]) + len(results["orphaned_files"]) + len(results["corrupted_files"]) ) return results