Source code for ergodic_insurance.pareto_frontier

"""Pareto frontier analysis for multi-objective optimization.

This module provides comprehensive tools for generating, analyzing, and visualizing
Pareto frontiers in multi-objective optimization problems, particularly focused on
insurance optimization trade-offs between ROE, risk, and costs.
"""

from dataclasses import dataclass, field
from enum import Enum
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
from scipy.optimize import differential_evolution, minimize

from .gpu_backend import GPUConfig, get_array_module, is_gpu_available, to_numpy

logger = logging.getLogger(__name__)


[docs] class ObjectiveType(Enum): """Types of objectives in multi-objective optimization.""" MAXIMIZE = "maximize" MINIMIZE = "minimize"
[docs] @dataclass class Objective: """Definition of an optimization objective. Attributes: name: Name of the objective (e.g., 'ROE', 'risk', 'cost') type: Whether to maximize or minimize this objective weight: Weight for weighted sum method (0-1) normalize: Whether to normalize this objective bounds: Optional bounds for this objective as (min, max) """ name: str type: ObjectiveType weight: float = 1.0 normalize: bool = True bounds: Optional[Tuple[float, float]] = None
[docs] @dataclass class ParetoPoint: """A point on the Pareto frontier. Attributes: objectives: Dictionary of objective values decision_variables: The decision variables that produce these objectives is_dominated: Whether this point is dominated by another crowding_distance: Crowding distance metric for this point trade_offs: Trade-off ratios with neighboring points """ objectives: Dict[str, float] decision_variables: np.ndarray is_dominated: bool = False crowding_distance: float = 0.0 trade_offs: Dict[str, float] = field(default_factory=dict)
[docs] def dominates(self, other: "ParetoPoint", objectives: List[Objective]) -> bool: """Check if this point dominates another point. Args: other: Another Pareto point to compare objectives: List of objectives to consider Returns: True if this point dominates the other """ at_least_one_better = False for obj in objectives: self_val = self.objectives[obj.name] other_val = other.objectives[obj.name] if obj.type == ObjectiveType.MAXIMIZE: if self_val < other_val: return False if self_val > other_val: at_least_one_better = True else: # MINIMIZE if self_val > other_val: return False if self_val < other_val: at_least_one_better = True return at_least_one_better
[docs] class ParetoFrontier: """Generator and analyzer for Pareto frontiers. This class provides methods for generating Pareto frontiers using various algorithms and analyzing the resulting trade-offs. """ def __init__( self, objectives: List[Objective], objective_function: Callable, bounds: List[Tuple[float, float]], constraints: Optional[List[Dict[str, Any]]] = None, seed: Optional[int] = None, gpu_config: Optional[GPUConfig] = None, ): """Initialize Pareto frontier generator. Args: objectives: List of objectives to optimize objective_function: Function that returns objective values given decision variables bounds: Bounds for decision variables constraints: Optional constraints for optimization seed: Optional random seed for reproducibility gpu_config: Optional GPU configuration for accelerated operations """ self.objectives = objectives self.objective_function = objective_function self.bounds = bounds self.constraints = constraints or [] self.frontier_points: List[ParetoPoint] = [] self._rng = np.random.default_rng(seed) self.gpu_config = gpu_config self._use_gpu = gpu_config is not None and gpu_config.enabled and is_gpu_available() self._validate_objectives() def _validate_objectives(self) -> None: """Validate objective definitions.""" if not self.objectives: raise ValueError("At least one objective must be defined") names = [obj.name for obj in self.objectives] if len(names) != len(set(names)): raise ValueError("Objective names must be unique") for obj in self.objectives: if not 0 <= obj.weight <= 1: raise ValueError(f"Objective weight must be in [0, 1], got {obj.weight}")
[docs] def generate_weighted_sum(self, n_points: int = 50, method: str = "SLSQP") -> List[ParetoPoint]: """Generate Pareto frontier using weighted sum method. Args: n_points: Number of points to generate on the frontier method: Optimization method to use Returns: List of Pareto points forming the frontier """ points = [] if len(self.objectives) == 2: # For bi-objective, vary weights systematically weights = np.linspace(0, 1, n_points) for w1 in weights: w2 = 1 - w1 point = self._optimize_weighted_sum([w1, w2], method) if point is not None: points.append(point) else: # For many objectives, use random weights for _ in range(n_points): weights = self._rng.dirichlet(np.ones(len(self.objectives))) point = self._optimize_weighted_sum(weights.tolist(), method) if point is not None: points.append(point) # Filter dominated points self.frontier_points = self._filter_dominated_points(points) self._calculate_crowding_distances() self._calculate_trade_offs() return self.frontier_points
[docs] def generate_epsilon_constraint( self, n_points: int = 50, method: str = "SLSQP" ) -> List[ParetoPoint]: """Generate Pareto frontier using epsilon-constraint method. Args: n_points: Number of points to generate method: Optimization method to use Returns: List of Pareto points forming the frontier """ if len(self.objectives) < 2: raise ValueError("Epsilon-constraint requires at least 2 objectives") points = [] primary_obj = self.objectives[0] constraint_objs = self.objectives[1:] # Find bounds for constraint objectives epsilon_ranges = [] for obj in constraint_objs: # Optimize for this objective alone to find its range temp_weights = [0.0] * len(self.objectives) idx = self.objectives.index(obj) temp_weights[idx] = 1.0 point = self._optimize_weighted_sum(temp_weights, method) if point: epsilon_ranges.append( (point.objectives[obj.name], point.objectives[obj.name] * 1.5) ) else: epsilon_ranges.append((0, 1)) # Generate grid of epsilon values if len(constraint_objs) == 1: epsilons = np.linspace(epsilon_ranges[0][0], epsilon_ranges[0][1], n_points) for eps in epsilons: point = self._optimize_epsilon_constraint( primary_obj, {constraint_objs[0].name: eps}, method ) if point is not None: points.append(point) else: # For multiple constraints, use random sampling for _ in range(n_points): epsilon_dict = {} for i, obj in enumerate(constraint_objs): epsilon_dict[obj.name] = self._rng.uniform( epsilon_ranges[i][0], epsilon_ranges[i][1] ) point = self._optimize_epsilon_constraint(primary_obj, epsilon_dict, method) if point is not None: points.append(point) # Filter dominated points self.frontier_points = self._filter_dominated_points(points) self._calculate_crowding_distances() self._calculate_trade_offs() return self.frontier_points
[docs] def generate_evolutionary( self, n_generations: int = 100, population_size: int = 50 ) -> List[ParetoPoint]: """Generate Pareto frontier using evolutionary algorithm. Args: n_generations: Number of generations for evolution population_size: Size of population in each generation Returns: List of Pareto points forming the frontier """ def multi_objective_wrapper(x): obj_vals = self.objective_function(x) # Convert to minimization problem result = [] for obj in self.objectives: val = obj_vals[obj.name] if obj.type == ObjectiveType.MAXIMIZE: result.append(-val) # Negate for maximization else: result.append(val) return result # Use differential evolution with multiple runs points = [] for _ in range(population_size): # Random weights for this run weights = self._rng.dirichlet(np.ones(len(self.objectives))) def weighted_objective(x, w=weights): obj_vals = multi_objective_wrapper(x) return np.dot(w, obj_vals) result = differential_evolution( weighted_objective, self.bounds, maxiter=n_generations, popsize=15, seed=int(self._rng.integers(0, 10000)), ) if result.success: obj_vals = self.objective_function(result.x) point = ParetoPoint( objectives=obj_vals, decision_variables=result.x, ) points.append(point) # Filter dominated points self.frontier_points = self._filter_dominated_points(points) self._calculate_crowding_distances() self._calculate_trade_offs() return self.frontier_points
def _optimize_weighted_sum(self, weights: List[float], method: str) -> Optional[ParetoPoint]: """Optimize using weighted sum of objectives. Args: weights: Weights for each objective method: Optimization method Returns: Pareto point if optimization successful, None otherwise """ def weighted_objective(x): obj_vals = self.objective_function(x) weighted_sum = 0 for obj, weight in zip(self.objectives, weights): val = obj_vals[obj.name] # Normalize if requested if obj.normalize and obj.bounds: val = (val - obj.bounds[0]) / (obj.bounds[1] - obj.bounds[0]) # Convert to minimization if obj.type == ObjectiveType.MAXIMIZE: val = -val weighted_sum += weight * val return weighted_sum # Initial guess x0 = np.array([(b[0] + b[1]) / 2 for b in self.bounds]) result = minimize( weighted_objective, x0, method=method, bounds=self.bounds, constraints=self.constraints, ) if result.success: obj_vals = self.objective_function(result.x) return ParetoPoint( objectives=obj_vals, decision_variables=result.x, ) return None def _optimize_epsilon_constraint( self, primary_obj: Objective, epsilon_constraints: Dict[str, float], method: str ) -> Optional[ParetoPoint]: """Optimize using epsilon-constraint method. Args: primary_obj: Primary objective to optimize epsilon_constraints: Constraints on other objectives method: Optimization method Returns: Pareto point if optimization successful, None otherwise """ def primary_objective(x): obj_vals = self.objective_function(x) val = obj_vals[primary_obj.name] if primary_obj.type == ObjectiveType.MAXIMIZE: return -val return val # Add epsilon constraints additional_constraints = [] for obj_name, epsilon_val in epsilon_constraints.items(): obj = next(o for o in self.objectives if o.name == obj_name) def make_constraint(name, eps, obj_type): if obj_type == ObjectiveType.MAXIMIZE: return { "type": "ineq", "fun": lambda x, n=name, e=eps: self.objective_function(x)[n] - e, } return { "type": "ineq", "fun": lambda x, n=name, e=eps: e - self.objective_function(x)[n], } additional_constraints.append(make_constraint(obj_name, epsilon_val, obj.type)) all_constraints = self.constraints + additional_constraints # Initial guess x0 = np.array([(b[0] + b[1]) / 2 for b in self.bounds]) result = minimize( primary_objective, x0, method=method, bounds=self.bounds, constraints=all_constraints, ) if result.success: obj_vals = self.objective_function(result.x) return ParetoPoint( objectives=obj_vals, decision_variables=result.x, ) return None def _filter_dominated_points(self, points: List[ParetoPoint]) -> List[ParetoPoint]: """Filter out dominated points to get true Pareto frontier. Uses vectorized array operations (GPU-accelerated when available) instead of nested Python loops. Args: points: List of candidate points Returns: List of non-dominated points Since: Updated for GPU in Version 0.11.0 (Issue #966) """ if not points: return [] xp = get_array_module(gpu=self._use_gpu) obj_names = [obj.name for obj in self.objectives] obj_matrix = xp.asarray([[p.objectives[name] for name in obj_names] for p in points]) signs = xp.asarray( [1.0 if obj.type == ObjectiveType.MAXIMIZE else -1.0 for obj in self.objectives] ) normalized = obj_matrix * signs n = len(points) is_dominated = xp.zeros(n, dtype=bool) for i in range(n): if bool(is_dominated[i]): continue geq = xp.all(normalized[i] >= normalized, axis=1) gt = xp.any(normalized[i] > normalized, axis=1) dominates = geq & gt dominates[i] = False is_dominated = is_dominated | dominates is_dominated_np = to_numpy(is_dominated) for i, point in enumerate(points): point.is_dominated = bool(is_dominated_np[i]) return [p for p in points if not p.is_dominated] def _calculate_crowding_distances(self) -> None: """Calculate crowding distances for frontier points.""" if len(self.frontier_points) <= 2: for point in self.frontier_points: point.crowding_distance = float("inf") return n_points = len(self.frontier_points) distances = np.zeros(n_points) for obj in self.objectives: # Sort points by this objective sorted_points = sorted( enumerate(self.frontier_points), key=lambda x, o=obj: x[1].objectives[o.name], # type: ignore[misc] ) # Boundary points get infinite distance distances[sorted_points[0][0]] = float("inf") distances[sorted_points[-1][0]] = float("inf") # Calculate distances for interior points obj_range = ( sorted_points[-1][1].objectives[obj.name] - sorted_points[0][1].objectives[obj.name] ) if obj_range > 0: for i in range(1, n_points - 1): prev_val = sorted_points[i - 1][1].objectives[obj.name] next_val = sorted_points[i + 1][1].objectives[obj.name] distances[sorted_points[i][0]] += (next_val - prev_val) / obj_range # Assign crowding distances for i, point in enumerate(self.frontier_points): point.crowding_distance = distances[i] def _calculate_trade_offs(self) -> None: """Calculate trade-off ratios between neighboring points.""" if len(self.frontier_points) < 2: return # For 2D frontiers, calculate direct trade-offs if len(self.objectives) == 2: sorted_points = sorted( self.frontier_points, key=lambda p: p.objectives[self.objectives[0].name], ) for i in range(len(sorted_points) - 1): p1, p2 = sorted_points[i], sorted_points[i + 1] obj1_diff = ( p2.objectives[self.objectives[0].name] - p1.objectives[self.objectives[0].name] ) obj2_diff = ( p2.objectives[self.objectives[1].name] - p1.objectives[self.objectives[1].name] ) if abs(obj1_diff) > 1e-10: trade_off = obj2_diff / obj1_diff p1.trade_offs[f"{self.objectives[1].name}_per_{self.objectives[0].name}"] = ( trade_off )
[docs] def calculate_hypervolume(self, reference_point: Optional[Dict[str, float]] = None) -> float: """Calculate hypervolume indicator for the Pareto frontier. Args: reference_point: Reference point for hypervolume calculation Returns: Hypervolume value """ if not self.frontier_points: return 0.0 # Set reference point if not provided if reference_point is None: reference_point = {} for obj in self.objectives: values = [p.objectives[obj.name] for p in self.frontier_points] if obj.type == ObjectiveType.MAXIMIZE: reference_point[obj.name] = min(values) * 0.9 else: reference_point[obj.name] = max(values) * 1.1 # For 2D, use simple rectangle sum if len(self.objectives) == 2: return self._calculate_2d_hypervolume(reference_point) # For higher dimensions, use Monte Carlo approximation return self._calculate_nd_hypervolume_monte_carlo(reference_point)
def _calculate_2d_hypervolume(self, reference_point: Dict[str, float]) -> float: """Calculate hypervolume for 2D frontier. Args: reference_point: Reference point Returns: Hypervolume value """ obj_names = [obj.name for obj in self.objectives] # Sort points by first objective sorted_points = sorted( self.frontier_points, key=lambda p: p.objectives[obj_names[0]], ) hypervolume = 0.0 prev_obj2 = reference_point[obj_names[1]] for point in sorted_points: obj1 = point.objectives[obj_names[0]] obj2 = point.objectives[obj_names[1]] # Calculate contribution if self.objectives[0].type == ObjectiveType.MAXIMIZE: width = obj1 - reference_point[obj_names[0]] else: width = reference_point[obj_names[0]] - obj1 if self.objectives[1].type == ObjectiveType.MAXIMIZE: height = obj2 - prev_obj2 else: height = prev_obj2 - obj2 if width > 0 and height > 0: hypervolume += width * height prev_obj2 = obj2 return hypervolume def _calculate_nd_hypervolume_monte_carlo( self, reference_point: Dict[str, float], n_samples: int = 10000 ) -> float: """Calculate hypervolume using Monte Carlo approximation for n-D. Uses GPU-accelerated sampling and dominance checking when available. Args: reference_point: Reference point n_samples: Number of Monte Carlo samples Returns: Approximate hypervolume value Since: Updated for GPU in Version 0.11.0 (Issue #966) """ xp = get_array_module(gpu=self._use_gpu) obj_names = [obj.name for obj in self.objectives] n_dims = len(obj_names) lower = np.empty(n_dims) upper = np.empty(n_dims) for i, obj in enumerate(self.objectives): values = [p.objectives[obj.name] for p in self.frontier_points] if obj.type == ObjectiveType.MAXIMIZE: lower[i] = reference_point[obj.name] upper[i] = max(values) else: lower[i] = min(values) upper[i] = reference_point[obj.name] lower_gpu = xp.asarray(lower) upper_gpu = xp.asarray(upper) # Generate random samples on device if self._use_gpu: random_samples = xp.random.uniform(lower_gpu, upper_gpu, size=(n_samples, n_dims)) else: random_samples = self._rng.uniform(lower, upper, size=(n_samples, n_dims)) random_samples = xp.asarray(random_samples) pareto_matrix = xp.asarray( [[p.objectives[name] for name in obj_names] for p in self.frontier_points] ) maximize_mask = xp.asarray([obj.type == ObjectiveType.MAXIMIZE for obj in self.objectives]) already_counted = xp.zeros(n_samples, dtype=bool) dominated_count = 0 for p_row in pareto_matrix: comparison = xp.where(maximize_mask, p_row >= random_samples, p_row <= random_samples) dominated = xp.all(comparison, axis=1) new_dominated = dominated & ~already_counted dominated_count += int(xp.sum(new_dominated)) already_counted = already_counted | dominated if bool(xp.all(already_counted)): break total_volume = float(xp.prod(upper_gpu - lower_gpu)) return float((dominated_count / n_samples) * total_volume)
[docs] def get_knee_points( self, n_knees: int = 1, method: str = "perpendicular_distance" ) -> List[ParetoPoint]: """Find knee points on the Pareto frontier. Knee points represent points of maximum curvature on the frontier, where small improvements in one objective require large sacrifices in others (the diminishing-returns inflection point). Three methods are supported: - ``"perpendicular_distance"`` (default): Finds the point(s) with maximum perpendicular distance from the line (2-D) or hyperplane (n-D) connecting the extreme points of the frontier in normalized objective space (Das, 1999). - ``"angle"``: Finds the point(s) where adjacent frontier segments form the sharpest angle, indicating maximum curvature (Branke et al., 2004). Most reliable for bi-objective frontiers. - ``"topsis"``: Finds the point(s) closest to the ideal (utopia) point in normalized space (Hwang & Yoon, 1981). This was the default in earlier versions. Args: n_knees: Number of knee points to identify. method: Knee detection method. One of ``"perpendicular_distance"``, ``"angle"``, or ``"topsis"``. Returns: List of knee points. Raises: ValueError: If *method* is not a recognised method name. References: Das, I. (1999). On characterizing the 'knee' of the Pareto curve based on Normal-Boundary Intersection. *Structural Optimization*, 18(2-3), 107-115. Branke, J., Deb, K., Dierolf, H., & Osswald, M. (2004). Finding knees in multi-objective optimization. *PPSN VIII*, Springer, 722-731. Hwang, C.L., & Yoon, K. (1981). *Multiple Attribute Decision Making*. Springer. """ valid_methods = ("perpendicular_distance", "angle", "topsis") if method not in valid_methods: raise ValueError(f"method must be one of {valid_methods}, got '{method}'") if not self.frontier_points: return [] if len(self.frontier_points) <= n_knees: return self.frontier_points.copy() # Convert frontier to objective matrix obj_matrix = np.array( [[p.objectives[obj.name] for obj in self.objectives] for p in self.frontier_points] ) # Normalize objectives so that higher = better in all dimensions normalized = np.zeros_like(obj_matrix) for i, obj in enumerate(self.objectives): col = obj_matrix[:, i] if obj.type == ObjectiveType.MAXIMIZE: normalized[:, i] = (col - col.min()) / (col.max() - col.min() + 1e-10) else: normalized[:, i] = (col.max() - col) / (col.max() - col.min() + 1e-10) if method == "topsis": return self._knee_topsis(normalized, n_knees) elif method == "perpendicular_distance": return self._knee_perpendicular_distance(normalized, n_knees) else: # method == "angle" return self._knee_angle(normalized, n_knees)
# ------------------------------------------------------------------ # Private helpers for get_knee_points # ------------------------------------------------------------------ def _knee_topsis(self, normalized: np.ndarray, n_knees: int) -> List[ParetoPoint]: """Select points closest to the ideal point (TOPSIS).""" ideal = normalized.max(axis=0) distances = np.linalg.norm(ideal - normalized, axis=1) knee_indices = np.argsort(distances)[:n_knees] return [self.frontier_points[i] for i in knee_indices] def _knee_perpendicular_distance( self, normalized: np.ndarray, n_knees: int ) -> List[ParetoPoint]: """Select points farthest from the extreme-to-extreme line/hyperplane. For 2 objectives the extreme points define a line; for ≥3 objectives a hyperplane is fitted through the per-objective extreme points via SVD. """ n_points, n_obj = normalized.shape # Identify extreme points (best point for each objective) extreme_indices: List[int] = [] for i in range(n_obj): extreme_indices.append(int(np.argmax(normalized[:, i]))) # Deduplicate while preserving order unique_extremes = list(dict.fromkeys(extreme_indices)) extreme_points = normalized[unique_extremes] if len(unique_extremes) < 2: # Degenerate: one point dominates on every objective return self.frontier_points[:n_knees] if len(unique_extremes) == 2 or n_obj == 2: # Distance from the line through two anchor points A = extreme_points[0] B = extreme_points[1] AB = B - A AB_sq = float(np.dot(AB, AB)) if AB_sq < 1e-20: return self.frontier_points[:n_knees] AP = normalized - A # (n_points, n_obj) t = AP @ AB / AB_sq # projection parameter per point proj = A + np.outer(t, AB) distances = np.linalg.norm(normalized - proj, axis=1) else: # ≥3 unique extremes in ≥3-D space: fit a hyperplane via SVD centroid = extreme_points.mean(axis=0) centered = extreme_points - centroid _, _, Vt = np.linalg.svd(centered, full_matrices=True) normal = Vt[-1] normal = normal / (np.linalg.norm(normal) + 1e-20) distances = np.abs((normalized - centroid) @ normal) knee_indices = np.argsort(distances)[::-1][:n_knees] return [self.frontier_points[i] for i in knee_indices] def _knee_angle(self, normalized: np.ndarray, n_knees: int) -> List[ParetoPoint]: """Select points where adjacent segments form the sharpest angle. Points are sorted along the first objective before computing inter-segment angles. Most reliable for bi-objective frontiers. """ n_points = normalized.shape[0] # Sort by first objective sort_idx = np.argsort(normalized[:, 0]) sorted_norm = normalized[sort_idx] if n_points < 3: # Not enough interior points for angle computation return [self.frontier_points[sort_idx[0]]] # Compute angle at each interior point; endpoints get inf angles = np.full(n_points, np.inf) for j in range(1, n_points - 1): v_prev = sorted_norm[j - 1] - sorted_norm[j] v_next = sorted_norm[j + 1] - sorted_norm[j] denom = np.linalg.norm(v_prev) * np.linalg.norm(v_next) if denom < 1e-20: continue cos_angle = np.clip(np.dot(v_prev, v_next) / denom, -1.0, 1.0) angles[j] = float(np.arccos(cos_angle)) # Smallest angle = sharpest bend = knee knee_sorted = np.argsort(angles)[:n_knees] knee_indices = sort_idx[knee_sorted] return [self.frontier_points[i] for i in knee_indices]
[docs] def to_dataframe(self) -> pd.DataFrame: """Convert frontier points to pandas DataFrame. Returns: DataFrame with objectives and decision variables """ if not self.frontier_points: return pd.DataFrame() data = [] for point in self.frontier_points: row = point.objectives.copy() row["crowding_distance"] = point.crowding_distance row["is_dominated"] = point.is_dominated # Add decision variables for i, val in enumerate(point.decision_variables): row[f"decision_var_{i}"] = val data.append(row) return pd.DataFrame(data)