"""High-performance caching system for expensive Monte Carlo computations.
This module implements a robust caching system that stores Monte Carlo simulation
results and processed data using efficient storage formats (HDF5, Parquet) with
hash-based invalidation and memory-mapped loading for optimal performance.
The caching system achieves:
- <1 second load time for 10,000 paths × 1,000 years
- Automatic cache invalidation on parameter changes
- Memory-efficient loading with memory mapping
- Support for both local and cloud storage backends
Example:
>>> from ergodic_insurance.reporting import CacheManager
>>> cache = CacheManager()
>>> # Cache expensive simulation
>>> key = cache.cache_simulation_paths(
... params={'n_sims': 10000, 'n_years': 1000},
... paths=simulation_data,
... metadata={'seed': 42}
... )
>>> # Load from cache (100x faster)
>>> cached_paths = cache.load_simulation_paths(params)
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import hashlib
import json
from pathlib import Path
import pickle
import shutil
import time
from typing import Any, Callable, Dict, List, Optional
import warnings
import h5py
import numpy as np
import pandas as pd
from ..safe_pickle import safe_dump, safe_load
[docs]
class StorageBackend(Enum):
"""Supported storage backend types."""
LOCAL = "local"
S3 = "s3"
AZURE = "azure"
GCS = "gcs"
[docs]
@dataclass
class CacheConfig:
"""Configuration for the cache manager.
Attributes:
cache_dir: Root directory for cache storage
max_cache_size_gb: Maximum cache size in gigabytes
ttl_hours: Time-to-live for cache entries in hours
compression: Compression algorithm for HDF5 ('gzip', 'lzf', None)
compression_level: Compression level (1-9 for gzip)
enable_memory_mapping: Use memory mapping for large files
backend: Storage backend type
backend_config: Backend-specific configuration
"""
cache_dir: Path = Path("./cache")
max_cache_size_gb: float = 10.0
ttl_hours: Optional[int] = None
compression: Optional[str] = "gzip"
compression_level: int = 4
enable_memory_mapping: bool = True
backend: StorageBackend = StorageBackend.LOCAL
backend_config: Dict[str, Any] = field(default_factory=dict)
[docs]
def __post_init__(self):
"""Validate and initialize configuration."""
self.cache_dir = Path(self.cache_dir)
if self.compression and self.compression not in ["gzip", "lzf"]:
raise ValueError(f"Invalid compression: {self.compression}")
if self.compression and (self.compression_level < 1 or self.compression_level > 9):
raise ValueError(f"Compression level must be 1-9, got {self.compression_level}")
[docs]
@dataclass
class CacheStats:
"""Statistics about cache usage and performance.
Attributes:
total_size_bytes: Total size of cached data
n_entries: Number of cache entries
n_hits: Number of cache hits
n_misses: Number of cache misses
hit_rate: Cache hit rate (0-1)
avg_load_time_ms: Average load time in milliseconds
avg_save_time_ms: Average save time in milliseconds
oldest_entry: Timestamp of oldest cache entry
newest_entry: Timestamp of newest cache entry
"""
total_size_bytes: int = 0
n_entries: int = 0
n_hits: int = 0
n_misses: int = 0
hit_rate: float = 0.0
avg_load_time_ms: float = 0.0
avg_save_time_ms: float = 0.0
oldest_entry: Optional[datetime] = None
newest_entry: Optional[datetime] = None
[docs]
def update_hit_rate(self):
"""Update the cache hit rate."""
total = self.n_hits + self.n_misses
self.hit_rate = self.n_hits / total if total > 0 else 0.0
[docs]
@dataclass
class CacheKey:
"""Cache key with metadata for cache entries.
Attributes:
hash_key: SHA256 hash of parameters
params: Original parameters dictionary
timestamp: Creation timestamp
size_bytes: Size of cached data
access_count: Number of times accessed
last_accessed: Last access timestamp
"""
hash_key: str
params: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
size_bytes: int = 0
access_count: int = 0
last_accessed: datetime = field(default_factory=datetime.now)
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"hash_key": self.hash_key,
"params": self.params,
"timestamp": self.timestamp.isoformat(),
"size_bytes": self.size_bytes,
"access_count": self.access_count,
"last_accessed": self.last_accessed.isoformat(),
}
[docs]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "CacheKey":
"""Create from dictionary."""
return cls(
hash_key=data["hash_key"],
params=data["params"],
timestamp=datetime.fromisoformat(data["timestamp"]),
size_bytes=data["size_bytes"],
access_count=data["access_count"],
last_accessed=datetime.fromisoformat(data["last_accessed"]),
)
[docs]
class BaseStorageBackend(ABC):
"""Abstract base class for storage backends."""
[docs]
@abstractmethod
def exists(self, path: Path) -> bool:
"""Check if path exists."""
[docs]
@abstractmethod
def save(self, path: Path, data: Any, file_format: str = "pickle") -> int:
"""Save data to path, return size in bytes."""
[docs]
@abstractmethod
def load(self, path: Path, file_format: str = "pickle") -> Any:
"""Load data from path."""
[docs]
@abstractmethod
def delete(self, path: Path) -> bool:
"""Delete data at path."""
[docs]
@abstractmethod
def list_files(self, pattern: str = "*") -> List[Path]:
"""List files matching pattern."""
[docs]
@abstractmethod
def get_size(self, path: Path) -> int:
"""Get size of file in bytes."""
[docs]
class LocalStorageBackend(BaseStorageBackend):
"""Local filesystem storage backend."""
def __init__(self, root_dir: Path):
"""Initialize local storage backend.
Args:
root_dir: Root directory for storage
"""
self.root_dir = Path(root_dir)
self.root_dir.mkdir(parents=True, exist_ok=True)
[docs]
def exists(self, path: Path) -> bool:
"""Check if path exists."""
full_path = self.root_dir / path
return full_path.exists()
[docs]
def save(self, path: Path, data: Any, file_format: str = "pickle") -> int:
"""Save data to path."""
full_path = self.root_dir / path
full_path.parent.mkdir(parents=True, exist_ok=True)
if file_format == "pickle":
with open(full_path, "wb") as f:
safe_dump(data, f)
elif file_format == "json":
with open(full_path, "w") as f:
json.dump(data, f)
elif file_format == "parquet":
if isinstance(data, pd.DataFrame):
data.to_parquet(full_path, compression="snappy")
else:
raise ValueError("Parquet format requires pandas DataFrame")
else:
raise ValueError(f"Unsupported format: {file_format}")
return full_path.stat().st_size
[docs]
def load(self, path: Path, file_format: str = "pickle") -> Any:
"""Load data from path."""
full_path = self.root_dir / path
if file_format == "pickle":
with open(full_path, "rb") as f:
return safe_load(f)
elif file_format == "json":
with open(full_path, "r") as f:
return json.load(f)
elif file_format == "parquet":
return pd.read_parquet(full_path)
else:
raise ValueError(f"Unsupported format: {file_format}")
[docs]
def delete(self, path: Path) -> bool:
"""Delete data at path."""
full_path = self.root_dir / path
if full_path.exists():
if full_path.is_dir():
shutil.rmtree(full_path)
else:
full_path.unlink()
return True
return False
[docs]
def list_files(self, pattern: str = "*") -> List[Path]:
"""List files matching pattern."""
return list(self.root_dir.glob(pattern))
[docs]
def get_size(self, path: Path) -> int:
"""Get size of file in bytes."""
full_path = self.root_dir / path
if full_path.exists():
if full_path.is_file():
return full_path.stat().st_size
return sum(f.stat().st_size for f in full_path.rglob("*") if f.is_file())
return 0
[docs]
class CacheManager:
"""High-performance cache manager for Monte Carlo simulations.
This class provides efficient caching of simulation results with automatic
invalidation, memory-mapped loading, and configurable storage backends.
Attributes:
config: Cache configuration
stats: Cache statistics
backend: Storage backend instance
Example:
>>> cache = CacheManager(config=CacheConfig(cache_dir="./cache"))
>>> # Cache simulation results
>>> cache.cache_simulation_paths(
... params={'n_sims': 10000},
... paths=np.random.randn(10000, 100)
... )
>>> # Load from cache
>>> paths = cache.load_simulation_paths({'n_sims': 10000})
"""
def __init__(self, config: Optional[CacheConfig] = None):
"""Initialize cache manager.
Args:
config: Cache configuration, uses defaults if None
"""
self.config = config or CacheConfig()
self.stats = CacheStats()
# Initialize storage backend
if self.config.backend == StorageBackend.LOCAL:
self.backend = LocalStorageBackend(self.config.cache_dir)
else:
raise NotImplementedError(f"Backend {self.config.backend} not implemented")
# Create cache directory structure
self._init_cache_structure()
# Initialize cache index
self._cache_index: Dict[str, CacheKey] = {}
# Load existing cache metadata
self._load_metadata()
# Track timing for statistics
self._load_times: List[float] = []
self._save_times: List[float] = []
def _init_cache_structure(self):
"""Initialize cache directory structure."""
subdirs = [
"raw_simulations",
"processed_results",
"figures/executive",
"figures/technical",
"metadata",
]
for subdir in subdirs:
dir_path = self.config.cache_dir / subdir
dir_path.mkdir(parents=True, exist_ok=True)
def _generate_cache_key(self, params: Dict[str, Any]) -> str:
"""Generate hash-based cache key from parameters.
Args:
params: Dictionary of parameters
Returns:
SHA256 hash string
"""
# Sort keys for consistent hashing
sorted_params = json.dumps(params, sort_keys=True, default=str)
return hashlib.sha256(sorted_params.encode()).hexdigest()
def _load_metadata(self):
"""Load cache metadata from disk."""
metadata_file = self.config.cache_dir / "metadata" / "cache_index.json"
if metadata_file.exists():
try:
with open(metadata_file, "r") as f:
data = json.load(f)
self.stats = CacheStats(**data.get("stats", {}))
self._cache_index = {
k: CacheKey.from_dict(v) for k, v in data.get("index", {}).items()
}
except (json.JSONDecodeError, KeyError, ValueError) as e:
warnings.warn(f"Failed to load cache metadata: {e}")
self._cache_index = {}
else:
self._cache_index = {}
def _save_metadata(self):
"""Save cache metadata to disk."""
metadata_file = self.config.cache_dir / "metadata" / "cache_index.json"
try:
data = {
"stats": {
"total_size_bytes": self.stats.total_size_bytes,
"n_entries": self.stats.n_entries,
"n_hits": self.stats.n_hits,
"n_misses": self.stats.n_misses,
"hit_rate": self.stats.hit_rate,
"avg_load_time_ms": self.stats.avg_load_time_ms,
"avg_save_time_ms": self.stats.avg_save_time_ms,
},
"index": {k: v.to_dict() for k, v in self._cache_index.items()},
}
with open(metadata_file, "w") as f:
json.dump(data, f, indent=2, default=str)
except (OSError, TypeError) as e:
warnings.warn(f"Failed to save cache metadata: {e}")
[docs]
def cache_simulation_paths(
self, params: Dict[str, Any], paths: np.ndarray, metadata: Optional[Dict[str, Any]] = None
) -> str:
"""Cache Monte Carlo simulation paths to HDF5.
Stores large simulation arrays efficiently using HDF5 with optional
compression. Supports arrays up to 10,000 x 1,000 dimensions.
Args:
params: Simulation parameters (used for cache key)
paths: Numpy array of simulation paths (n_simulations, n_years)
metadata: Optional metadata to store with paths
Returns:
Cache key for retrieval
Example:
>>> paths = np.random.randn(10000, 1000)
>>> key = cache.cache_simulation_paths(
... params={'n_sims': 10000, 'seed': 42},
... paths=paths,
... metadata={'generator': 'numpy'}
... )
"""
start_time = time.time()
# Generate cache key
cache_key = self._generate_cache_key(params)
file_path = Path("raw_simulations") / f"{cache_key}.h5"
full_path = self.config.cache_dir / file_path
# Save to HDF5
with h5py.File(full_path, "w") as f:
# Create dataset with compression
if self.config.compression:
if self.config.compression == "lzf":
# LZF doesn't support compression options
f.create_dataset("paths", data=paths, compression="lzf")
else:
# GZIP supports compression level
f.create_dataset(
"paths",
data=paths,
compression=self.config.compression,
compression_opts=self.config.compression_level,
)
else:
f.create_dataset("paths", data=paths)
# Store metadata
if metadata:
meta_group = f.create_group("metadata")
for key, value in metadata.items():
if isinstance(value, (int, float, str, bool)):
meta_group.attrs[key] = value
else:
meta_group.attrs[key] = json.dumps(value, default=str)
# Store parameters
f.attrs["params"] = json.dumps(params, default=str)
f.attrs["timestamp"] = datetime.now().isoformat()
# Update cache index
size_bytes = full_path.stat().st_size
self._cache_index[cache_key] = CacheKey(
hash_key=cache_key, params=params, size_bytes=size_bytes
)
# Update statistics
save_time = (time.time() - start_time) * 1000
self._save_times.append(save_time)
self.stats.avg_save_time_ms = float(np.mean(self._save_times[-100:]))
self.stats.n_entries = len(self._cache_index)
self.stats.total_size_bytes += size_bytes
# Save metadata
self._save_metadata()
# Check cache size limit
self._enforce_size_limit()
return cache_key
[docs]
def load_simulation_paths(
self, params: Dict[str, Any], memory_map: Optional[bool] = None
) -> Optional[np.ndarray]:
"""Load simulation paths from cache.
Retrieves cached simulation data with optional memory mapping for
efficient loading of large arrays.
Args:
params: Simulation parameters (must match cached params)
memory_map: Use memory mapping for large files (None=auto)
Returns:
Numpy array of paths or None if not cached
Example:
>>> paths = cache.load_simulation_paths(
... params={'n_sims': 10000, 'seed': 42}
... )
>>> if paths is not None:
... print(f"Loaded {paths.shape} from cache")
"""
start_time = time.time()
# Generate cache key
cache_key = self._generate_cache_key(params)
# Check if cached
if cache_key not in self._cache_index:
self.stats.n_misses += 1
self.stats.update_hit_rate()
return None
file_path = Path("raw_simulations") / f"{cache_key}.h5"
full_path = self.config.cache_dir / file_path
if not full_path.exists():
# Cache index out of sync
del self._cache_index[cache_key]
self.stats.n_misses += 1
self.stats.update_hit_rate()
return None
# Check TTL if configured
if self.config.ttl_hours:
cache_entry = self._cache_index[cache_key]
age_hours = (datetime.now() - cache_entry.timestamp).total_seconds() / 3600
if age_hours > self.config.ttl_hours:
# Cache expired
self.invalidate_cache(params)
self.stats.n_misses += 1
self.stats.update_hit_rate()
return None
# Load from HDF5
try:
# Determine memory mapping
if memory_map is None:
memory_map = self.config.enable_memory_mapping
if memory_map:
# Memory-mapped loading (lazy)
f = h5py.File(full_path, "r")
paths = f["paths"]
# Convert to numpy array (still memory-mapped)
paths = np.array(paths)
f.close()
else:
# Load entire array into memory
with h5py.File(full_path, "r") as f:
paths = f["paths"][:]
# Update cache entry
cache_entry = self._cache_index[cache_key]
cache_entry.access_count += 1
cache_entry.last_accessed = datetime.now()
# Update statistics
load_time = (time.time() - start_time) * 1000
self._load_times.append(load_time)
self.stats.avg_load_time_ms = float(np.mean(self._load_times[-100:]))
self.stats.n_hits += 1
self.stats.update_hit_rate()
return paths # type: ignore[no-any-return]
except (OSError, KeyError, ValueError) as e:
warnings.warn(f"Failed to load cached data: {e}")
self.stats.n_misses += 1
self.stats.update_hit_rate()
return None
[docs]
def cache_processed_results(
self, params: Dict[str, Any], results: pd.DataFrame, result_type: str = "generic"
) -> str:
"""Cache processed results as Parquet.
Stores tabular results efficiently using Parquet format with columnar
compression for fast queries.
Args:
params: Processing parameters (used for cache key)
results: Pandas DataFrame with results
result_type: Type of results (e.g., 'efficient_frontier')
Returns:
Cache key for retrieval
Example:
>>> df = pd.DataFrame({'limit': [1e6, 2e6], 'premium': [1e4, 2e4]})
>>> key = cache.cache_processed_results(
... params={'optimization': 'pareto'},
... results=df,
... result_type='efficient_frontier'
... )
"""
start_time = time.time()
# Generate cache key
cache_key = self._generate_cache_key(params)
file_path = Path("processed_results") / f"{result_type}_{cache_key}.parquet"
full_path = self.config.cache_dir / file_path
# Save to Parquet
results.to_parquet(full_path, compression="snappy", index=True)
# Update cache index
size_bytes = full_path.stat().st_size
self._cache_index[f"{result_type}_{cache_key}"] = CacheKey(
hash_key=cache_key, params=params, size_bytes=size_bytes
)
# Update statistics
save_time = (time.time() - start_time) * 1000
self._save_times.append(save_time)
self.stats.avg_save_time_ms = float(np.mean(self._save_times[-100:]))
self.stats.n_entries = len(self._cache_index)
self.stats.total_size_bytes += size_bytes
# Save metadata
self._save_metadata()
return cache_key
[docs]
def load_processed_results(
self, params: Dict[str, Any], result_type: str = "generic"
) -> Optional[pd.DataFrame]:
"""Load processed results from cache.
Retrieves cached tabular results from Parquet storage.
Args:
params: Processing parameters (must match cached params)
result_type: Type of results to load
Returns:
DataFrame with results or None if not cached
Example:
>>> df = cache.load_processed_results(
... params={'optimization': 'pareto'},
... result_type='efficient_frontier'
... )
"""
start_time = time.time()
# Generate cache key
cache_key = self._generate_cache_key(params)
full_key = f"{result_type}_{cache_key}"
# Check if cached
if full_key not in self._cache_index:
self.stats.n_misses += 1
self.stats.update_hit_rate()
return None
file_path = Path("processed_results") / f"{result_type}_{cache_key}.parquet"
full_path = self.config.cache_dir / file_path
if not full_path.exists():
del self._cache_index[full_key]
self.stats.n_misses += 1
self.stats.update_hit_rate()
return None
try:
# Load from Parquet
results = pd.read_parquet(full_path)
# Update cache entry
cache_entry = self._cache_index[full_key]
cache_entry.access_count += 1
cache_entry.last_accessed = datetime.now()
# Update statistics
load_time = (time.time() - start_time) * 1000
self._load_times.append(load_time)
self.stats.avg_load_time_ms = float(np.mean(self._load_times[-100:]))
self.stats.n_hits += 1
self.stats.update_hit_rate()
return results
except (OSError, KeyError, ValueError) as e:
warnings.warn(f"Failed to load cached results: {e}")
self.stats.n_misses += 1
self.stats.update_hit_rate()
return None
[docs]
def invalidate_cache(self, params: Optional[Dict[str, Any]] = None):
"""Invalidate cache entries.
Args:
params: If provided, only invalidate entries matching these params.
If None, invalidate all cache.
"""
if params is None:
# Clear all cache
self.clear_cache()
else:
# Invalidate specific entries
cache_key = self._generate_cache_key(params)
# Find and remove all entries with this key
keys_to_remove = [k for k in self._cache_index.keys() if cache_key in k]
for key in keys_to_remove:
# Determine file path based on key prefix
if key.startswith("fig_"):
# Figure cache
_parts = key.split("_", 2)
file_path = Path("figures") / "*" / f"*{cache_key}*"
elif "_" in key and not key.startswith("fig_"):
# Processed results
file_path = Path("processed_results") / f"*{cache_key}*"
else:
# Raw simulations
file_path = Path("raw_simulations") / f"{cache_key}.h5"
# Delete files
for f in self.config.cache_dir.glob(str(file_path)):
f.unlink()
# Update stats
if key in self._cache_index:
self.stats.total_size_bytes -= self._cache_index[key].size_bytes
del self._cache_index[key]
self.stats.n_entries = len(self._cache_index)
self._save_metadata()
[docs]
def clear_cache(self, confirm: bool = True):
"""Clear all cached data.
Args:
confirm: Require confirmation before clearing
"""
if confirm:
response = input(f"Clear {self.stats.total_size_bytes / 1e9:.2f} GB of cache? (y/n): ")
if response.lower() != "y":
print("Cache clear cancelled")
return
# Remove all cached files
for subdir in ["raw_simulations", "processed_results", "figures"]:
dir_path = self.config.cache_dir / subdir
if dir_path.exists():
shutil.rmtree(dir_path)
dir_path.mkdir(parents=True, exist_ok=True)
# Reset cache index and stats
self._cache_index = {}
self.stats = CacheStats()
self._save_metadata()
print("Cache cleared successfully")
[docs]
def get_cache_stats(self) -> CacheStats:
"""Get current cache statistics.
Returns:
CacheStats object with usage information
"""
# Update total size
total_size = 0
for subdir in ["raw_simulations", "processed_results", "figures"]:
dir_path = self.config.cache_dir / subdir
if dir_path.exists():
for f in dir_path.rglob("*"):
if f.is_file():
total_size += f.stat().st_size
self.stats.total_size_bytes = total_size
self.stats.n_entries = len(self._cache_index)
# Find oldest and newest entries
if self._cache_index:
timestamps = [entry.timestamp for entry in self._cache_index.values()]
self.stats.oldest_entry = min(timestamps)
self.stats.newest_entry = max(timestamps)
return self.stats
[docs]
def warm_cache(
self,
scenarios: List[Dict[str, Any]],
compute_func: Callable[[Dict[str, Any]], Any],
result_type: str = "simulation",
) -> int:
"""Pre-compute and cache results for common scenarios.
Args:
scenarios: List of parameter dictionaries to compute
compute_func: Function that takes params and returns results
result_type: Type of results being cached
Returns:
Number of scenarios cached
Example:
>>> scenarios = [
... {'n_sims': 1000, 'seed': i} for i in range(10)
... ]
>>> n_cached = cache.warm_cache(
... scenarios,
... lambda p: np.random.randn(p['n_sims'], 100)
... )
"""
n_cached = 0
for params in scenarios:
# Check if already cached
cache_key = self._generate_cache_key(params)
if result_type == "simulation":
if cache_key in self._cache_index:
continue
# Compute and cache
try:
results = compute_func(params)
self.cache_simulation_paths(params, results)
n_cached += 1
except (OSError, ValueError, TypeError) as e:
warnings.warn(f"Failed to warm cache for {params}: {e}")
print(f"Warmed cache with {n_cached} new entries")
return n_cached
def _enforce_size_limit(self):
"""Enforce maximum cache size limit using LRU eviction."""
if self.config.max_cache_size_gb <= 0:
return
max_size_bytes = self.config.max_cache_size_gb * 1e9
if self.stats.total_size_bytes > max_size_bytes:
# Sort entries by last accessed time (LRU)
sorted_entries = sorted(self._cache_index.items(), key=lambda x: x[1].last_accessed)
# Remove oldest entries until under limit
while self.stats.total_size_bytes > max_size_bytes and sorted_entries:
key, entry = sorted_entries.pop(0)
# Delete file
if key.startswith("fig_"):
# Figure cache - need to find the actual file
pattern = f"*{entry.hash_key}*"
for f in (self.config.cache_dir / "figures").rglob(pattern):
f.unlink()
elif "_" in key and not key.startswith("fig_"):
# Processed results
pattern = f"*{entry.hash_key}*"
for f in (self.config.cache_dir / "processed_results").glob(pattern):
f.unlink()
else:
# Raw simulations
file_path = self.config.cache_dir / "raw_simulations" / f"{entry.hash_key}.h5"
if file_path.exists():
file_path.unlink()
# Update stats
self.stats.total_size_bytes -= entry.size_bytes
self.stats.n_entries -= 1
del self._cache_index[key]
self._save_metadata()
def _get_cache_files(self, key: str, entry: CacheKey) -> List[Path]:
"""Get files associated with a cache key."""
if key.startswith("fig_"):
pattern = f"*{entry.hash_key}*"
return list((self.config.cache_dir / "figures").rglob(pattern))
if "_" in key and not key.startswith("fig_"):
pattern = f"*{entry.hash_key}*"
return list((self.config.cache_dir / "processed_results").glob(pattern))
file_path = self.config.cache_dir / "raw_simulations" / f"{entry.hash_key}.h5"
return [file_path] if file_path.exists() else []
def _validate_file(self, key: str, file_path: Path) -> bool:
"""Validate a single cache file."""
try:
if key.startswith("fig_"):
with open(file_path, "rb") as f:
safe_load(f)
elif "_" in key and not key.startswith("fig_"):
pd.read_parquet(file_path)
else:
with h5py.File(file_path, "r") as f:
_ = f["paths"].shape
return True
except (OSError, KeyError, ValueError, pickle.PickleError):
return False
[docs]
def validate_cache(self) -> Dict[str, Any]:
"""Validate cache integrity and consistency.
Returns:
Dictionary with validation results
"""
results: Dict[str, Any] = {
"valid_entries": 0,
"missing_files": [],
"orphaned_files": [],
"corrupted_files": [],
"total_issues": 0,
}
# Check all index entries have corresponding files
for key, entry in self._cache_index.items():
files = self._get_cache_files(key, entry)
if not files:
results["missing_files"].append(key)
elif self._validate_file(key, files[0]):
results["valid_entries"] += 1
else:
results["corrupted_files"].append(str(files[0]))
# Check for orphaned files (files without index entries)
all_cached_files = set()
for subdir in ["raw_simulations", "processed_results", "figures"]:
dir_path = self.config.cache_dir / subdir
if dir_path.exists():
for f in dir_path.rglob("*"):
if f.is_file() and not f.name.startswith("."):
all_cached_files.add(f)
indexed_hashes = {entry.hash_key for entry in self._cache_index.values()}
for file_path in all_cached_files:
# Extract hash from filename
file_hash = None
for hash_str in indexed_hashes:
if hash_str in str(file_path):
file_hash = hash_str
break
if not file_hash:
results["orphaned_files"].append(str(file_path))
results["total_issues"] = (
len(results["missing_files"])
+ len(results["orphaned_files"])
+ len(results["corrupted_files"])
)
return results