"""Parameter sweep utilities for systematic exploration of parameter space.
This module provides utilities for systematic parameter sweeps across the full
parameter space to identify optimal regions and validate robustness of
recommendations across different scenarios.
Features:
- Efficient grid search across parameter combinations
- Parallel execution for large sweeps using multiprocessing
- Result aggregation and storage with HDF5/Parquet support
- Scenario comparison tools for side-by-side analysis
- Optimal region identification using percentile-based methods
- Pre-defined scenarios for company sizes, loss scenarios, and market conditions
- Adaptive refinement near optima for efficient exploration
- Progress tracking and resumption capabilities
Example:
>>> from ergodic_insurance.parameter_sweep import ParameterSweeper, SweepConfig
>>> from ergodic_insurance.business_optimizer import BusinessOptimizer
>>>
>>> # Create optimizer
>>> optimizer = BusinessOptimizer(manufacturer)
>>>
>>> # Initialize sweeper
>>> sweeper = ParameterSweeper(optimizer)
>>>
>>> # Define parameter sweep
>>> config = SweepConfig(
... parameters={
... "initial_assets": [1e6, 10e6, 100e6],
... "base_operating_margin": [0.05, 0.08, 0.12],
... "loss_frequency": [3, 5, 8]
... },
... fixed_params={"time_horizon": 10},
... metrics_to_track=["optimal_roe", "ruin_probability"]
... )
>>>
>>> # Execute sweep
>>> results = sweeper.sweep(config)
>>>
>>> # Find optimal regions
>>> optimal, summary = sweeper.find_optimal_regions(
... results,
... objective="optimal_roe",
... constraints={"ruin_probability": (0, 0.01)}
... )
Author:
Alex Filiakov
Date:
2025-08-29
"""
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime
import hashlib
from itertools import product
import json
import logging
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from tqdm import tqdm
from .business_optimizer import BusinessConstraints, BusinessOptimizer
from .manufacturer import WidgetManufacturer
logger = logging.getLogger(__name__)
[docs]
@dataclass
class SweepConfig:
"""Configuration for parameter sweep.
Attributes:
parameters: Dictionary mapping parameter names to lists of values to sweep
fixed_params: Fixed parameters that don't vary across sweep
metrics_to_track: List of metric names to extract from results
n_workers: Number of parallel workers for execution
batch_size: Size of batches for parallel processing
adaptive_refinement: Whether to adaptively refine near optima
refinement_threshold: Percentile threshold for refinement (e.g., 90 for top 10%)
save_intermediate: Whether to save intermediate results
cache_dir: Directory for caching results
"""
parameters: Dict[str, List[Any]]
fixed_params: Dict[str, Any] = field(default_factory=dict)
metrics_to_track: List[str] = field(
default_factory=lambda: [
"optimal_roe",
"ruin_probability",
"optimal_retention",
"optimal_limit",
"total_premium",
"sharpe_ratio",
]
)
n_workers: Optional[int] = None # None means use all available cores
batch_size: int = 100
adaptive_refinement: bool = False
refinement_threshold: float = 90.0
save_intermediate: bool = True
cache_dir: str = "./cache/sweeps"
[docs]
def __post_init__(self):
"""Validate configuration and set defaults."""
if not self.parameters:
raise ValueError("Parameters dictionary cannot be empty")
# Set n_workers based on CPU if not specified
if self.n_workers is None:
import multiprocessing
self.n_workers = max(1, multiprocessing.cpu_count() - 1)
# Ensure cache directory exists
Path(self.cache_dir).mkdir(parents=True, exist_ok=True)
[docs]
def generate_grid(self) -> List[Dict[str, Any]]:
"""Generate parameter grid for sweep.
Returns:
List of dictionaries, each containing a complete parameter configuration
"""
# Get parameter names and values
param_names = list(self.parameters.keys())
param_values = list(self.parameters.values())
# Generate all combinations
grid = []
for values in product(*param_values):
config = self.fixed_params.copy()
config.update(dict(zip(param_names, values)))
grid.append(config)
return grid
[docs]
def estimate_runtime(self, seconds_per_run: float = 1.0) -> str:
"""Estimate total runtime for sweep.
Args:
seconds_per_run: Estimated seconds per single parameter configuration
Returns:
Human-readable runtime estimate
"""
total_runs = 1
for values in self.parameters.values():
total_runs *= len(values)
workers = self.n_workers if self.n_workers is not None else 1
total_seconds = total_runs * seconds_per_run / workers
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = int(total_seconds % 60)
if hours > 0:
return f"{hours}h {minutes}m {seconds}s"
if minutes > 0:
return f"{minutes}m {seconds}s"
return f"{seconds}s"
[docs]
class ParameterSweeper:
"""Systematic parameter sweep utilities for insurance optimization.
This class provides methods for exploring the parameter space through
grid search, identifying optimal regions, and comparing scenarios.
Attributes:
optimizer: Business optimizer instance for running optimizations
cache_dir: Directory for storing cached results
results_cache: In-memory cache of optimization results
use_parallel: Whether to use parallel processing
"""
def __init__(
self,
optimizer: Optional[BusinessOptimizer] = None,
cache_dir: str = "./cache/sweeps",
use_parallel: bool = True,
):
"""Initialize parameter sweeper.
Args:
optimizer: BusinessOptimizer instance for running optimizations
cache_dir: Directory for caching results
use_parallel: Whether to enable parallel processing
"""
self.optimizer = optimizer
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.results_cache: Dict[str, Dict[str, Any]] = {}
self.use_parallel = use_parallel
self.logger = logging.getLogger(self.__class__.__name__)
[docs]
def sweep( # pylint: disable=too-many-branches
self, config: SweepConfig, progress_callback: Optional[Callable] = None
) -> pd.DataFrame:
"""Execute parameter sweep with parallel processing.
Args:
config: Sweep configuration
progress_callback: Optional callback for progress updates
Returns:
DataFrame containing sweep results with all parameter combinations and metrics
"""
# Generate parameter grid
param_grid = config.generate_grid()
total_runs = len(param_grid)
self.logger.info(
f"Starting sweep with {total_runs} parameter combinations "
f"using {config.n_workers} workers"
)
# Log runtime estimate
runtime_estimate = config.estimate_runtime()
self.logger.info(f"Estimated runtime: {runtime_estimate}")
# Check for cached results
sweep_hash = self._get_sweep_hash(config)
cache_file = self.cache_dir / f"sweep_{sweep_hash}.h5"
if cache_file.exists() and not config.adaptive_refinement:
self.logger.info(f"Loading cached results from {cache_file}")
result = pd.read_hdf(cache_file, key="results")
assert isinstance(result, pd.DataFrame), "Expected DataFrame from cache"
return result
# Prepare result storage
results: List[Dict[str, Any]] = []
if self.use_parallel and config.n_workers is not None and config.n_workers > 1:
# Execute in parallel batches
with ProcessPoolExecutor(max_workers=config.n_workers) as executor:
# Submit jobs in batches
futures = []
for i in range(0, total_runs, config.batch_size):
batch = param_grid[i : i + config.batch_size]
for params in batch:
future = executor.submit(self._run_single, params, config.metrics_to_track)
futures.append(future)
# Collect results with progress bar
with tqdm(total=total_runs, desc="Parameter sweep") as pbar:
for future in as_completed(futures):
try:
single_result: Dict[str, Any] = future.result(
timeout=300
) # 5 minute timeout
results.append(single_result)
pbar.update(1)
if progress_callback:
progress_callback(len(results) / total_runs)
# Save intermediate results if requested
if config.save_intermediate and len(results) % 100 == 0:
self._save_intermediate_results(results, sweep_hash)
except (TimeoutError, ValueError, RuntimeError) as e:
self.logger.error(f"Error in parameter sweep: {e}")
# Continue with other configurations
else:
# Sequential execution
for params in tqdm(param_grid, desc="Parameter sweep"):
try:
single_result = self._run_single(params, config.metrics_to_track)
results.append(single_result)
if progress_callback:
progress_callback(len(results) / total_runs)
except (ValueError, RuntimeError, AttributeError) as e:
self.logger.error(f"Error running configuration: {e}")
# Convert to DataFrame
df = pd.DataFrame(results)
# Apply adaptive refinement if requested
if config.adaptive_refinement:
df = self._apply_adaptive_refinement(df, config)
# Save final results
self._save_results(df, config)
return df
def _run_single(self, params: Dict[str, Any], metrics: List[str]) -> Dict[str, Any]:
"""Run single parameter combination.
Args:
params: Parameter configuration
metrics: Metrics to extract from results
Returns:
Dictionary containing parameters and resulting metrics
"""
# Check cache
cache_key = self._get_cache_key(params)
if cache_key in self.results_cache:
return self.results_cache[cache_key]
# Create manufacturer with parameters
manufacturer = self._create_manufacturer(params)
# Import BusinessOptimizer here to avoid circular imports
from .business_optimizer import BusinessOptimizer
# Create optimizer if not provided
if self.optimizer is None:
optimizer = BusinessOptimizer(manufacturer)
else:
# Update optimizer's manufacturer
optimizer = BusinessOptimizer(manufacturer)
# Set up constraints
constraints = BusinessConstraints(
max_risk_tolerance=params.get("max_risk_tolerance", 0.01),
min_roe_threshold=params.get("min_roe_threshold", 0.10),
max_premium_budget=params.get("max_premium_budget", 0.02),
)
# Run optimization
try:
result = optimizer.maximize_roe_with_insurance(
constraints=constraints,
time_horizon=params.get("time_horizon", 10),
n_simulations=params.get("n_simulations", 1000),
)
# Extract metrics - using actual OptimalStrategy attributes
output = params.copy()
output.update(
{
"optimal_roe": result.expected_roe,
"baseline_roe": getattr(result, "baseline_roe", result.expected_roe * 0.8),
"roe_improvement": getattr(
result, "roe_improvement", result.expected_roe * 0.2
),
"ruin_probability": result.bankruptcy_risk,
"optimal_retention": getattr(result, "optimal_retention", 0.8),
"optimal_limit": result.coverage_limit,
"total_premium": result.coverage_limit * result.premium_rate,
"optimal_deductible": result.deductible,
"sharpe_ratio": getattr(result, "sharpe_ratio", 1.0),
"var_95": getattr(result, "var_95", -0.1),
"cvar_95": getattr(result, "cvar_95", -0.15),
}
)
except (ValueError, RuntimeError, AttributeError) as e:
self.logger.warning(f"Optimization failed for params {params}: {e}")
# Return NaN values for failed optimization
output = params.copy()
for metric in metrics:
output[metric] = np.nan
# Cache result
self.results_cache[cache_key] = output
return output
def _create_manufacturer(self, params: Dict[str, Any]) -> WidgetManufacturer:
"""Create manufacturer instance from parameters.
Args:
params: Parameter configuration
Returns:
Configured WidgetManufacturer instance
"""
from .config import ManufacturerConfig
from .manufacturer import WidgetManufacturer
# Create config object with parameters - only use fields that exist in ManufacturerConfig
config = ManufacturerConfig(
initial_assets=params.get("initial_assets", 10e6),
asset_turnover_ratio=params.get("asset_turnover", 1.0),
base_operating_margin=params.get("base_operating_margin", 0.08),
tax_rate=params.get("tax_rate", 0.25),
retention_ratio=params.get("retention_ratio", 0.6),
)
manufacturer = WidgetManufacturer(config)
return manufacturer
[docs]
def create_scenarios(self) -> Dict[str, SweepConfig]:
"""Create pre-defined scenario configurations.
Returns:
Dictionary of scenario names to SweepConfig objects
"""
scenarios = {}
# Company size sweep
scenarios["company_sizes"] = SweepConfig(
parameters={
"initial_assets": [1e6, 10e6, 100e6],
"asset_turnover": [0.5, 1.0, 1.5],
"base_operating_margin": [0.05, 0.08, 0.12],
},
fixed_params={
"loss_frequency": 5.0,
"loss_severity_mu": 10.0,
"n_simulations": 10000,
"time_horizon": 10,
},
metrics_to_track=["optimal_roe", "ruin_probability", "optimal_retention"],
)
# Loss severity sweep
scenarios["loss_scenarios"] = SweepConfig(
parameters={
"loss_frequency": [3, 5, 8],
"loss_severity_mu": [8, 10, 12],
"loss_severity_sigma": [0.5, 1.0, 1.5],
},
fixed_params={"initial_assets": 10e6, "n_simulations": 10000, "time_horizon": 10},
metrics_to_track=["optimal_retention", "total_premium", "ruin_probability"],
)
# Market conditions
scenarios["market_conditions"] = SweepConfig(
parameters={
"premium_loading": [0.2, 0.5, 1.0], # Soft to hard market
"tax_rate": [0.20, 0.25, 0.30],
},
fixed_params={
"initial_assets": 10e6,
"loss_frequency": 5.0,
"n_simulations": 10000,
"time_horizon": 10,
},
metrics_to_track=["optimal_roe", "total_premium", "optimal_retention"],
)
# Time horizons
scenarios["time_horizons"] = SweepConfig(
parameters={"time_horizon": [1, 5, 10, 25], "initial_assets": [1e6, 10e6, 100e6]},
fixed_params={"loss_frequency": 5.0, "n_simulations": 10000},
metrics_to_track=["optimal_roe", "ruin_probability", "optimal_limit"],
)
# Simulation scales
scenarios["simulation_scales"] = SweepConfig(
parameters={"n_simulations": [1000, 10000, 100000], "time_horizon": [5, 10, 20]},
fixed_params={"initial_assets": 10e6, "loss_frequency": 5.0},
metrics_to_track=["optimal_roe", "ruin_probability", "sharpe_ratio"],
)
return scenarios
[docs]
def find_optimal_regions(
self,
results: pd.DataFrame,
objective: str = "optimal_roe",
constraints: Optional[Dict[str, Tuple[float, float]]] = None,
top_percentile: float = 90,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Identify optimal parameter regions.
Args:
results: DataFrame of sweep results
objective: Objective metric to optimize
constraints: Dictionary mapping metric names to (min, max) constraint tuples
top_percentile: Percentile threshold for optimal region (e.g., 90 for top 10%)
Returns:
Tuple of (optimal results DataFrame, parameter statistics DataFrame)
"""
# Apply constraints
filtered = results.copy()
if constraints:
for col, (min_val, max_val) in constraints.items():
if col in filtered.columns:
filtered = filtered[(filtered[col] >= min_val) & (filtered[col] <= max_val)]
else:
self.logger.warning(f"Constraint column '{col}' not found in results")
# Check if we have any valid results
valid_results = filtered.dropna(subset=[objective])
if valid_results.empty:
self.logger.warning("No valid results after applying constraints")
return pd.DataFrame(), pd.DataFrame()
# Find top performers
threshold = np.percentile(valid_results[objective], top_percentile)
optimal = valid_results[valid_results[objective] >= threshold]
# Identify parameter columns (exclude metrics)
metric_cols = [
"optimal_roe",
"baseline_roe",
"roe_improvement",
"ruin_probability",
"optimal_retention",
"optimal_limit",
"total_premium",
"optimal_deductible",
"sharpe_ratio",
"var_95",
"cvar_95",
]
param_cols = [col for col in optimal.columns if col not in metric_cols]
# Analyze optimal region characteristics
summary = pd.DataFrame(
{
"min": optimal[param_cols].min(),
"max": optimal[param_cols].max(),
"mean": optimal[param_cols].mean(),
"std": optimal[param_cols].std(),
"median": optimal[param_cols].median(),
}
)
# Add metric statistics
_metric_summary = pd.DataFrame(
{
"optimal_mean": optimal[objective].mean(),
"optimal_std": optimal[objective].std(),
"optimal_min": optimal[objective].min(),
"optimal_max": optimal[objective].max(),
"all_mean": valid_results[objective].mean(),
"improvement": (optimal[objective].mean() - valid_results[objective].mean())
/ valid_results[objective].mean()
* 100,
},
index=[objective],
)
self.logger.info(
f"Found {len(optimal)} optimal configurations out of {len(results)} total "
f"(top {100-top_percentile:.1f}%)"
)
self.logger.info(
f"Optimal {objective}: {optimal[objective].mean():.4f} ± {optimal[objective].std():.4f}"
)
return optimal, summary
[docs]
def compare_scenarios(
self,
results: Dict[str, pd.DataFrame],
metrics: Optional[List[str]] = None,
normalize: bool = False,
) -> pd.DataFrame:
"""Compare results across multiple scenarios.
Args:
results: Dictionary mapping scenario names to result DataFrames
metrics: List of metrics to compare (default: all common metrics)
normalize: Whether to normalize metrics to [0, 1] range
Returns:
DataFrame with scenario comparison
"""
if not results:
return pd.DataFrame()
# Determine metrics to compare
if metrics is None:
# Find common metrics across all scenarios
all_cols: set[str] = set()
for df in results.values():
all_cols.update(df.columns)
metric_cols = [
"optimal_roe",
"ruin_probability",
"optimal_retention",
"optimal_limit",
"total_premium",
"sharpe_ratio",
]
metrics = [m for m in metric_cols if m in all_cols]
# Build comparison DataFrame
comparison_data = []
for scenario_name, df in results.items():
scenario_stats: Dict[str, Any] = {"scenario": scenario_name}
for metric in metrics:
if metric in df.columns:
valid_data = df[metric].dropna()
if not valid_data.empty:
scenario_stats[f"{metric}_mean"] = valid_data.mean()
scenario_stats[f"{metric}_std"] = valid_data.std()
scenario_stats[f"{metric}_min"] = valid_data.min()
scenario_stats[f"{metric}_max"] = valid_data.max()
comparison_data.append(scenario_stats)
comparison_df = pd.DataFrame(comparison_data)
# Normalize if requested
if normalize and len(comparison_df) > 1:
for col in comparison_df.columns:
if col != "scenario" and "_mean" in col:
min_val = comparison_df[col].min()
max_val = comparison_df[col].max()
if max_val > min_val:
comparison_df[f"{col}_normalized"] = (comparison_df[col] - min_val) / (
max_val - min_val
)
return comparison_df.set_index("scenario")
def _apply_adaptive_refinement(
self, initial_results: pd.DataFrame, config: SweepConfig
) -> pd.DataFrame:
"""Apply adaptive refinement near optimal regions.
Args:
initial_results: Initial sweep results
config: Sweep configuration
Returns:
Refined results DataFrame
"""
# Find promising regions
optimal, param_stats = self.find_optimal_regions(
initial_results, top_percentile=config.refinement_threshold
)
if optimal.empty:
return initial_results
# Generate refined grid around optimal regions
refined_params = {}
for param in config.parameters.keys():
if param in param_stats.index:
# Create refined range around optimal values
param_min = param_stats.loc[param, "min"]
param_max = param_stats.loc[param, "max"]
_param_mean = param_stats.loc[param, "mean"]
# Generate more points in optimal range
if isinstance(config.parameters[param][0], (int, float)):
# Numeric parameter - create refined grid
# Ensure param_min and param_max are numeric for multiplication
if isinstance(param_min, (int, float)) and isinstance(param_max, (int, float)):
refined_values = np.linspace(
float(param_min) * 0.9, # Slightly expand range
float(param_max) * 1.1,
num=len(config.parameters[param]) * 2, # Double resolution
)
refined_params[param] = list(refined_values)
else:
# If not numeric, keep original values
refined_params[param] = config.parameters[param]
else:
# Categorical parameter - keep original values
refined_params[param] = config.parameters[param]
# Create refined configuration
refined_config = SweepConfig(
parameters=refined_params,
fixed_params=config.fixed_params,
metrics_to_track=config.metrics_to_track,
n_workers=config.n_workers,
batch_size=config.batch_size,
adaptive_refinement=False, # Don't recurse
)
# Run refined sweep
self.logger.info("Running adaptive refinement sweep")
refined_results = self.sweep(refined_config)
# Combine results
combined = pd.concat([initial_results, refined_results], ignore_index=True)
# Remove duplicates based on parameter columns
param_cols = list(config.parameters.keys())
combined = combined.drop_duplicates(subset=param_cols, keep="last")
return combined
def _get_cache_key(self, params: Dict[str, Any]) -> str:
"""Generate cache key from parameters.
Args:
params: Parameter dictionary
Returns:
Cache key string
"""
# Sort parameters for consistent hashing
sorted_params = sorted(params.items())
param_str = json.dumps(sorted_params, sort_keys=True, default=str)
return hashlib.md5(param_str.encode()).hexdigest()
def _get_sweep_hash(self, config: SweepConfig) -> str:
"""Generate hash for sweep configuration.
Args:
config: Sweep configuration
Returns:
Hash string
"""
config_dict = {
"parameters": config.parameters,
"fixed_params": config.fixed_params,
"metrics": config.metrics_to_track,
}
config_str = json.dumps(config_dict, sort_keys=True, default=str)
return hashlib.md5(config_str.encode()).hexdigest()[:8]
def _save_results(self, df: pd.DataFrame, config: SweepConfig) -> None:
"""Save sweep results to HDF5 file.
Args:
df: Results DataFrame
config: Sweep configuration
"""
sweep_hash = self._get_sweep_hash(config)
# Try to save to HDF5 if available, otherwise use parquet
try:
h5_file = self.cache_dir / f"sweep_{sweep_hash}.h5"
df.to_hdf(h5_file, key="results", mode="w", complevel=5)
except ImportError:
# Fall back to parquet if HDF5 not available
parquet_file = self.cache_dir / f"sweep_{sweep_hash}.parquet"
df.to_parquet(parquet_file, compression="snappy")
self.logger.info(f"Saved results to {parquet_file} (HDF5 not available)")
# Always save metadata
metadata = {
"timestamp": datetime.now().isoformat(),
"n_results": len(df),
"parameters": list(config.parameters.keys()),
"metrics": config.metrics_to_track,
"sweep_hash": sweep_hash,
}
meta_file = self.cache_dir / f"sweep_{sweep_hash}_meta.json"
with open(meta_file, "w") as f:
json.dump(metadata, f, indent=2)
# Log the appropriate file
if "parquet_file" in locals():
self.logger.info(f"Saved results to {parquet_file}")
else:
self.logger.info(f"Saved results to {h5_file}")
def _save_intermediate_results(self, results: List[Dict], sweep_hash: str) -> None:
"""Save intermediate results during sweep.
Args:
results: List of result dictionaries
sweep_hash: Sweep configuration hash
"""
if not results:
return
# Convert to DataFrame
df = pd.DataFrame(results)
# Save to temporary file
try:
temp_file = self.cache_dir / f"sweep_{sweep_hash}_temp.h5"
df.to_hdf(temp_file, key="results", mode="w", complevel=5)
except ImportError:
# Fall back to parquet if HDF5 not available
temp_file = self.cache_dir / f"sweep_{sweep_hash}_temp.parquet"
df.to_parquet(temp_file, compression="snappy")
self.logger.debug(f"Saved {len(results)} intermediate results")
[docs]
def load_results( # pylint: disable=too-many-return-statements
self, sweep_hash: str
) -> Optional[pd.DataFrame]:
"""Load cached sweep results.
Args:
sweep_hash: Sweep configuration hash
Returns:
Results DataFrame if found, None otherwise
"""
# Try HDF5 first
h5_file = self.cache_dir / f"sweep_{sweep_hash}.h5"
if h5_file.exists():
try:
result = pd.read_hdf(h5_file, key="results")
if isinstance(result, pd.DataFrame):
return result
return None
except ImportError:
pass
# Try parquet
parquet_file = self.cache_dir / f"sweep_{sweep_hash}.parquet"
if parquet_file.exists():
return pd.read_parquet(parquet_file)
# Check for temporary files
temp_h5_file = self.cache_dir / f"sweep_{sweep_hash}_temp.h5"
if temp_h5_file.exists():
try:
self.logger.info("Loading partial results from interrupted sweep")
result = pd.read_hdf(temp_h5_file, key="results")
if isinstance(result, pd.DataFrame):
return result
return None
except ImportError:
pass
temp_parquet_file = self.cache_dir / f"sweep_{sweep_hash}_temp.parquet"
if temp_parquet_file.exists():
self.logger.info("Loading partial results from interrupted sweep")
return pd.read_parquet(temp_parquet_file)
return None
[docs]
def export_results(
self, results: pd.DataFrame, output_file: str, file_format: str = "parquet"
) -> None:
"""Export results to specified format.
Args:
results: Results DataFrame
output_file: Output file path
file_format: Export format ('parquet', 'csv', 'excel', 'hdf5')
"""
output_path = Path(output_file)
if file_format == "parquet":
results.to_parquet(output_path, compression="snappy")
elif file_format == "csv":
results.to_csv(output_path, index=False)
elif file_format == "excel":
results.to_excel(output_path, index=False)
elif file_format == "hdf5":
try:
results.to_hdf(output_path, key="results", mode="w", complevel=5)
except ImportError:
self.logger.warning(
"HDF5 support not available (tables package missing). Using parquet instead."
)
output_path = output_path.with_suffix(".parquet")
results.to_parquet(output_path, compression="snappy")
else:
raise ValueError(f"Unsupported format: {file_format}")
self.logger.info(f"Exported results to {output_path} ({file_format})")