"""Pareto frontier analysis for multi-objective optimization.
This module provides comprehensive tools for generating, analyzing, and visualizing
Pareto frontiers in multi-objective optimization problems, particularly focused on
insurance optimization trade-offs between ROE, risk, and costs.
"""
from dataclasses import dataclass, field
from enum import Enum
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from scipy.optimize import differential_evolution, minimize
from .gpu_backend import GPUConfig, get_array_module, is_gpu_available, to_numpy
logger = logging.getLogger(__name__)
[docs]
class ObjectiveType(Enum):
"""Types of objectives in multi-objective optimization."""
MAXIMIZE = "maximize"
MINIMIZE = "minimize"
[docs]
@dataclass
class Objective:
"""Definition of an optimization objective.
Attributes:
name: Name of the objective (e.g., 'ROE', 'risk', 'cost')
type: Whether to maximize or minimize this objective
weight: Weight for weighted sum method (0-1)
normalize: Whether to normalize this objective
bounds: Optional bounds for this objective as (min, max)
"""
name: str
type: ObjectiveType
weight: float = 1.0
normalize: bool = True
bounds: Optional[Tuple[float, float]] = None
[docs]
@dataclass
class ParetoPoint:
"""A point on the Pareto frontier.
Attributes:
objectives: Dictionary of objective values
decision_variables: The decision variables that produce these objectives
is_dominated: Whether this point is dominated by another
crowding_distance: Crowding distance metric for this point
trade_offs: Trade-off ratios with neighboring points
"""
objectives: Dict[str, float]
decision_variables: np.ndarray
is_dominated: bool = False
crowding_distance: float = 0.0
trade_offs: Dict[str, float] = field(default_factory=dict)
[docs]
def dominates(self, other: "ParetoPoint", objectives: List[Objective]) -> bool:
"""Check if this point dominates another point.
Args:
other: Another Pareto point to compare
objectives: List of objectives to consider
Returns:
True if this point dominates the other
"""
at_least_one_better = False
for obj in objectives:
self_val = self.objectives[obj.name]
other_val = other.objectives[obj.name]
if obj.type == ObjectiveType.MAXIMIZE:
if self_val < other_val:
return False
if self_val > other_val:
at_least_one_better = True
else: # MINIMIZE
if self_val > other_val:
return False
if self_val < other_val:
at_least_one_better = True
return at_least_one_better
[docs]
class ParetoFrontier:
"""Generator and analyzer for Pareto frontiers.
This class provides methods for generating Pareto frontiers using various
algorithms and analyzing the resulting trade-offs.
"""
def __init__(
self,
objectives: List[Objective],
objective_function: Callable,
bounds: List[Tuple[float, float]],
constraints: Optional[List[Dict[str, Any]]] = None,
seed: Optional[int] = None,
gpu_config: Optional[GPUConfig] = None,
):
"""Initialize Pareto frontier generator.
Args:
objectives: List of objectives to optimize
objective_function: Function that returns objective values given decision variables
bounds: Bounds for decision variables
constraints: Optional constraints for optimization
seed: Optional random seed for reproducibility
gpu_config: Optional GPU configuration for accelerated operations
"""
self.objectives = objectives
self.objective_function = objective_function
self.bounds = bounds
self.constraints = constraints or []
self.frontier_points: List[ParetoPoint] = []
self._rng = np.random.default_rng(seed)
self.gpu_config = gpu_config
self._use_gpu = gpu_config is not None and gpu_config.enabled and is_gpu_available()
self._validate_objectives()
def _validate_objectives(self) -> None:
"""Validate objective definitions."""
if not self.objectives:
raise ValueError("At least one objective must be defined")
names = [obj.name for obj in self.objectives]
if len(names) != len(set(names)):
raise ValueError("Objective names must be unique")
for obj in self.objectives:
if not 0 <= obj.weight <= 1:
raise ValueError(f"Objective weight must be in [0, 1], got {obj.weight}")
[docs]
def generate_weighted_sum(self, n_points: int = 50, method: str = "SLSQP") -> List[ParetoPoint]:
"""Generate Pareto frontier using weighted sum method.
Args:
n_points: Number of points to generate on the frontier
method: Optimization method to use
Returns:
List of Pareto points forming the frontier
"""
points = []
if len(self.objectives) == 2:
# For bi-objective, vary weights systematically
weights = np.linspace(0, 1, n_points)
for w1 in weights:
w2 = 1 - w1
point = self._optimize_weighted_sum([w1, w2], method)
if point is not None:
points.append(point)
else:
# For many objectives, use random weights
for _ in range(n_points):
weights = self._rng.dirichlet(np.ones(len(self.objectives)))
point = self._optimize_weighted_sum(weights.tolist(), method)
if point is not None:
points.append(point)
# Filter dominated points
self.frontier_points = self._filter_dominated_points(points)
self._calculate_crowding_distances()
self._calculate_trade_offs()
return self.frontier_points
[docs]
def generate_epsilon_constraint(
self, n_points: int = 50, method: str = "SLSQP"
) -> List[ParetoPoint]:
"""Generate Pareto frontier using epsilon-constraint method.
Args:
n_points: Number of points to generate
method: Optimization method to use
Returns:
List of Pareto points forming the frontier
"""
if len(self.objectives) < 2:
raise ValueError("Epsilon-constraint requires at least 2 objectives")
points = []
primary_obj = self.objectives[0]
constraint_objs = self.objectives[1:]
# Find bounds for constraint objectives
epsilon_ranges = []
for obj in constraint_objs:
# Optimize for this objective alone to find its range
temp_weights = [0.0] * len(self.objectives)
idx = self.objectives.index(obj)
temp_weights[idx] = 1.0
point = self._optimize_weighted_sum(temp_weights, method)
if point:
epsilon_ranges.append(
(point.objectives[obj.name], point.objectives[obj.name] * 1.5)
)
else:
epsilon_ranges.append((0, 1))
# Generate grid of epsilon values
if len(constraint_objs) == 1:
epsilons = np.linspace(epsilon_ranges[0][0], epsilon_ranges[0][1], n_points)
for eps in epsilons:
point = self._optimize_epsilon_constraint(
primary_obj, {constraint_objs[0].name: eps}, method
)
if point is not None:
points.append(point)
else:
# For multiple constraints, use random sampling
for _ in range(n_points):
epsilon_dict = {}
for i, obj in enumerate(constraint_objs):
epsilon_dict[obj.name] = self._rng.uniform(
epsilon_ranges[i][0], epsilon_ranges[i][1]
)
point = self._optimize_epsilon_constraint(primary_obj, epsilon_dict, method)
if point is not None:
points.append(point)
# Filter dominated points
self.frontier_points = self._filter_dominated_points(points)
self._calculate_crowding_distances()
self._calculate_trade_offs()
return self.frontier_points
[docs]
def generate_evolutionary(
self, n_generations: int = 100, population_size: int = 50
) -> List[ParetoPoint]:
"""Generate Pareto frontier using evolutionary algorithm.
Args:
n_generations: Number of generations for evolution
population_size: Size of population in each generation
Returns:
List of Pareto points forming the frontier
"""
def multi_objective_wrapper(x):
obj_vals = self.objective_function(x)
# Convert to minimization problem
result = []
for obj in self.objectives:
val = obj_vals[obj.name]
if obj.type == ObjectiveType.MAXIMIZE:
result.append(-val) # Negate for maximization
else:
result.append(val)
return result
# Use differential evolution with multiple runs
points = []
for _ in range(population_size):
# Random weights for this run
weights = self._rng.dirichlet(np.ones(len(self.objectives)))
def weighted_objective(x, w=weights):
obj_vals = multi_objective_wrapper(x)
return np.dot(w, obj_vals)
result = differential_evolution(
weighted_objective,
self.bounds,
maxiter=n_generations,
popsize=15,
seed=int(self._rng.integers(0, 10000)),
)
if result.success:
obj_vals = self.objective_function(result.x)
point = ParetoPoint(
objectives=obj_vals,
decision_variables=result.x,
)
points.append(point)
# Filter dominated points
self.frontier_points = self._filter_dominated_points(points)
self._calculate_crowding_distances()
self._calculate_trade_offs()
return self.frontier_points
def _optimize_weighted_sum(self, weights: List[float], method: str) -> Optional[ParetoPoint]:
"""Optimize using weighted sum of objectives.
Args:
weights: Weights for each objective
method: Optimization method
Returns:
Pareto point if optimization successful, None otherwise
"""
def weighted_objective(x):
obj_vals = self.objective_function(x)
weighted_sum = 0
for obj, weight in zip(self.objectives, weights):
val = obj_vals[obj.name]
# Normalize if requested
if obj.normalize and obj.bounds:
val = (val - obj.bounds[0]) / (obj.bounds[1] - obj.bounds[0])
# Convert to minimization
if obj.type == ObjectiveType.MAXIMIZE:
val = -val
weighted_sum += weight * val
return weighted_sum
# Initial guess
x0 = np.array([(b[0] + b[1]) / 2 for b in self.bounds])
result = minimize(
weighted_objective,
x0,
method=method,
bounds=self.bounds,
constraints=self.constraints,
)
if result.success:
obj_vals = self.objective_function(result.x)
return ParetoPoint(
objectives=obj_vals,
decision_variables=result.x,
)
return None
def _optimize_epsilon_constraint(
self, primary_obj: Objective, epsilon_constraints: Dict[str, float], method: str
) -> Optional[ParetoPoint]:
"""Optimize using epsilon-constraint method.
Args:
primary_obj: Primary objective to optimize
epsilon_constraints: Constraints on other objectives
method: Optimization method
Returns:
Pareto point if optimization successful, None otherwise
"""
def primary_objective(x):
obj_vals = self.objective_function(x)
val = obj_vals[primary_obj.name]
if primary_obj.type == ObjectiveType.MAXIMIZE:
return -val
return val
# Add epsilon constraints
additional_constraints = []
for obj_name, epsilon_val in epsilon_constraints.items():
obj = next(o for o in self.objectives if o.name == obj_name)
def make_constraint(name, eps, obj_type):
if obj_type == ObjectiveType.MAXIMIZE:
return {
"type": "ineq",
"fun": lambda x, n=name, e=eps: self.objective_function(x)[n] - e,
}
return {
"type": "ineq",
"fun": lambda x, n=name, e=eps: e - self.objective_function(x)[n],
}
additional_constraints.append(make_constraint(obj_name, epsilon_val, obj.type))
all_constraints = self.constraints + additional_constraints
# Initial guess
x0 = np.array([(b[0] + b[1]) / 2 for b in self.bounds])
result = minimize(
primary_objective,
x0,
method=method,
bounds=self.bounds,
constraints=all_constraints,
)
if result.success:
obj_vals = self.objective_function(result.x)
return ParetoPoint(
objectives=obj_vals,
decision_variables=result.x,
)
return None
def _filter_dominated_points(self, points: List[ParetoPoint]) -> List[ParetoPoint]:
"""Filter out dominated points to get true Pareto frontier.
Uses vectorized array operations (GPU-accelerated when available)
instead of nested Python loops.
Args:
points: List of candidate points
Returns:
List of non-dominated points
Since:
Updated for GPU in Version 0.11.0 (Issue #966)
"""
if not points:
return []
xp = get_array_module(gpu=self._use_gpu)
obj_names = [obj.name for obj in self.objectives]
obj_matrix = xp.asarray([[p.objectives[name] for name in obj_names] for p in points])
signs = xp.asarray(
[1.0 if obj.type == ObjectiveType.MAXIMIZE else -1.0 for obj in self.objectives]
)
normalized = obj_matrix * signs
n = len(points)
is_dominated = xp.zeros(n, dtype=bool)
for i in range(n):
if bool(is_dominated[i]):
continue
geq = xp.all(normalized[i] >= normalized, axis=1)
gt = xp.any(normalized[i] > normalized, axis=1)
dominates = geq & gt
dominates[i] = False
is_dominated = is_dominated | dominates
is_dominated_np = to_numpy(is_dominated)
for i, point in enumerate(points):
point.is_dominated = bool(is_dominated_np[i])
return [p for p in points if not p.is_dominated]
def _calculate_crowding_distances(self) -> None:
"""Calculate crowding distances for frontier points."""
if len(self.frontier_points) <= 2:
for point in self.frontier_points:
point.crowding_distance = float("inf")
return
n_points = len(self.frontier_points)
distances = np.zeros(n_points)
for obj in self.objectives:
# Sort points by this objective
sorted_points = sorted(
enumerate(self.frontier_points),
key=lambda x, o=obj: x[1].objectives[o.name], # type: ignore[misc]
)
# Boundary points get infinite distance
distances[sorted_points[0][0]] = float("inf")
distances[sorted_points[-1][0]] = float("inf")
# Calculate distances for interior points
obj_range = (
sorted_points[-1][1].objectives[obj.name] - sorted_points[0][1].objectives[obj.name]
)
if obj_range > 0:
for i in range(1, n_points - 1):
prev_val = sorted_points[i - 1][1].objectives[obj.name]
next_val = sorted_points[i + 1][1].objectives[obj.name]
distances[sorted_points[i][0]] += (next_val - prev_val) / obj_range
# Assign crowding distances
for i, point in enumerate(self.frontier_points):
point.crowding_distance = distances[i]
def _calculate_trade_offs(self) -> None:
"""Calculate trade-off ratios between neighboring points."""
if len(self.frontier_points) < 2:
return
# For 2D frontiers, calculate direct trade-offs
if len(self.objectives) == 2:
sorted_points = sorted(
self.frontier_points,
key=lambda p: p.objectives[self.objectives[0].name],
)
for i in range(len(sorted_points) - 1):
p1, p2 = sorted_points[i], sorted_points[i + 1]
obj1_diff = (
p2.objectives[self.objectives[0].name] - p1.objectives[self.objectives[0].name]
)
obj2_diff = (
p2.objectives[self.objectives[1].name] - p1.objectives[self.objectives[1].name]
)
if abs(obj1_diff) > 1e-10:
trade_off = obj2_diff / obj1_diff
p1.trade_offs[f"{self.objectives[1].name}_per_{self.objectives[0].name}"] = (
trade_off
)
[docs]
def calculate_hypervolume(self, reference_point: Optional[Dict[str, float]] = None) -> float:
"""Calculate hypervolume indicator for the Pareto frontier.
Args:
reference_point: Reference point for hypervolume calculation
Returns:
Hypervolume value
"""
if not self.frontier_points:
return 0.0
# Set reference point if not provided
if reference_point is None:
reference_point = {}
for obj in self.objectives:
values = [p.objectives[obj.name] for p in self.frontier_points]
if obj.type == ObjectiveType.MAXIMIZE:
reference_point[obj.name] = min(values) * 0.9
else:
reference_point[obj.name] = max(values) * 1.1
# For 2D, use simple rectangle sum
if len(self.objectives) == 2:
return self._calculate_2d_hypervolume(reference_point)
# For higher dimensions, use Monte Carlo approximation
return self._calculate_nd_hypervolume_monte_carlo(reference_point)
def _calculate_2d_hypervolume(self, reference_point: Dict[str, float]) -> float:
"""Calculate hypervolume for 2D frontier.
Args:
reference_point: Reference point
Returns:
Hypervolume value
"""
obj_names = [obj.name for obj in self.objectives]
# Sort points by first objective
sorted_points = sorted(
self.frontier_points,
key=lambda p: p.objectives[obj_names[0]],
)
hypervolume = 0.0
prev_obj2 = reference_point[obj_names[1]]
for point in sorted_points:
obj1 = point.objectives[obj_names[0]]
obj2 = point.objectives[obj_names[1]]
# Calculate contribution
if self.objectives[0].type == ObjectiveType.MAXIMIZE:
width = obj1 - reference_point[obj_names[0]]
else:
width = reference_point[obj_names[0]] - obj1
if self.objectives[1].type == ObjectiveType.MAXIMIZE:
height = obj2 - prev_obj2
else:
height = prev_obj2 - obj2
if width > 0 and height > 0:
hypervolume += width * height
prev_obj2 = obj2
return hypervolume
def _calculate_nd_hypervolume_monte_carlo(
self, reference_point: Dict[str, float], n_samples: int = 10000
) -> float:
"""Calculate hypervolume using Monte Carlo approximation for n-D.
Uses GPU-accelerated sampling and dominance checking when available.
Args:
reference_point: Reference point
n_samples: Number of Monte Carlo samples
Returns:
Approximate hypervolume value
Since:
Updated for GPU in Version 0.11.0 (Issue #966)
"""
xp = get_array_module(gpu=self._use_gpu)
obj_names = [obj.name for obj in self.objectives]
n_dims = len(obj_names)
lower = np.empty(n_dims)
upper = np.empty(n_dims)
for i, obj in enumerate(self.objectives):
values = [p.objectives[obj.name] for p in self.frontier_points]
if obj.type == ObjectiveType.MAXIMIZE:
lower[i] = reference_point[obj.name]
upper[i] = max(values)
else:
lower[i] = min(values)
upper[i] = reference_point[obj.name]
lower_gpu = xp.asarray(lower)
upper_gpu = xp.asarray(upper)
# Generate random samples on device
if self._use_gpu:
random_samples = xp.random.uniform(lower_gpu, upper_gpu, size=(n_samples, n_dims))
else:
random_samples = self._rng.uniform(lower, upper, size=(n_samples, n_dims))
random_samples = xp.asarray(random_samples)
pareto_matrix = xp.asarray(
[[p.objectives[name] for name in obj_names] for p in self.frontier_points]
)
maximize_mask = xp.asarray([obj.type == ObjectiveType.MAXIMIZE for obj in self.objectives])
already_counted = xp.zeros(n_samples, dtype=bool)
dominated_count = 0
for p_row in pareto_matrix:
comparison = xp.where(maximize_mask, p_row >= random_samples, p_row <= random_samples)
dominated = xp.all(comparison, axis=1)
new_dominated = dominated & ~already_counted
dominated_count += int(xp.sum(new_dominated))
already_counted = already_counted | dominated
if bool(xp.all(already_counted)):
break
total_volume = float(xp.prod(upper_gpu - lower_gpu))
return float((dominated_count / n_samples) * total_volume)
[docs]
def get_knee_points(
self, n_knees: int = 1, method: str = "perpendicular_distance"
) -> List[ParetoPoint]:
"""Find knee points on the Pareto frontier.
Knee points represent points of maximum curvature on the frontier,
where small improvements in one objective require large sacrifices
in others (the diminishing-returns inflection point).
Three methods are supported:
- ``"perpendicular_distance"`` (default): Finds the point(s) with
maximum perpendicular distance from the line (2-D) or hyperplane
(n-D) connecting the extreme points of the frontier in normalized
objective space (Das, 1999).
- ``"angle"``: Finds the point(s) where adjacent frontier segments
form the sharpest angle, indicating maximum curvature (Branke
et al., 2004). Most reliable for bi-objective frontiers.
- ``"topsis"``: Finds the point(s) closest to the ideal (utopia)
point in normalized space (Hwang & Yoon, 1981). This was the
default in earlier versions.
Args:
n_knees: Number of knee points to identify.
method: Knee detection method. One of
``"perpendicular_distance"``, ``"angle"``, or ``"topsis"``.
Returns:
List of knee points.
Raises:
ValueError: If *method* is not a recognised method name.
References:
Das, I. (1999). On characterizing the 'knee' of the Pareto
curve based on Normal-Boundary Intersection. *Structural
Optimization*, 18(2-3), 107-115.
Branke, J., Deb, K., Dierolf, H., & Osswald, M. (2004).
Finding knees in multi-objective optimization. *PPSN VIII*,
Springer, 722-731.
Hwang, C.L., & Yoon, K. (1981). *Multiple Attribute Decision
Making*. Springer.
"""
valid_methods = ("perpendicular_distance", "angle", "topsis")
if method not in valid_methods:
raise ValueError(f"method must be one of {valid_methods}, got '{method}'")
if not self.frontier_points:
return []
if len(self.frontier_points) <= n_knees:
return self.frontier_points.copy()
# Convert frontier to objective matrix
obj_matrix = np.array(
[[p.objectives[obj.name] for obj in self.objectives] for p in self.frontier_points]
)
# Normalize objectives so that higher = better in all dimensions
normalized = np.zeros_like(obj_matrix)
for i, obj in enumerate(self.objectives):
col = obj_matrix[:, i]
if obj.type == ObjectiveType.MAXIMIZE:
normalized[:, i] = (col - col.min()) / (col.max() - col.min() + 1e-10)
else:
normalized[:, i] = (col.max() - col) / (col.max() - col.min() + 1e-10)
if method == "topsis":
return self._knee_topsis(normalized, n_knees)
elif method == "perpendicular_distance":
return self._knee_perpendicular_distance(normalized, n_knees)
else: # method == "angle"
return self._knee_angle(normalized, n_knees)
# ------------------------------------------------------------------
# Private helpers for get_knee_points
# ------------------------------------------------------------------
def _knee_topsis(self, normalized: np.ndarray, n_knees: int) -> List[ParetoPoint]:
"""Select points closest to the ideal point (TOPSIS)."""
ideal = normalized.max(axis=0)
distances = np.linalg.norm(ideal - normalized, axis=1)
knee_indices = np.argsort(distances)[:n_knees]
return [self.frontier_points[i] for i in knee_indices]
def _knee_perpendicular_distance(
self, normalized: np.ndarray, n_knees: int
) -> List[ParetoPoint]:
"""Select points farthest from the extreme-to-extreme line/hyperplane.
For 2 objectives the extreme points define a line; for ≥3 objectives
a hyperplane is fitted through the per-objective extreme points via
SVD.
"""
n_points, n_obj = normalized.shape
# Identify extreme points (best point for each objective)
extreme_indices: List[int] = []
for i in range(n_obj):
extreme_indices.append(int(np.argmax(normalized[:, i])))
# Deduplicate while preserving order
unique_extremes = list(dict.fromkeys(extreme_indices))
extreme_points = normalized[unique_extremes]
if len(unique_extremes) < 2:
# Degenerate: one point dominates on every objective
return self.frontier_points[:n_knees]
if len(unique_extremes) == 2 or n_obj == 2:
# Distance from the line through two anchor points
A = extreme_points[0]
B = extreme_points[1]
AB = B - A
AB_sq = float(np.dot(AB, AB))
if AB_sq < 1e-20:
return self.frontier_points[:n_knees]
AP = normalized - A # (n_points, n_obj)
t = AP @ AB / AB_sq # projection parameter per point
proj = A + np.outer(t, AB)
distances = np.linalg.norm(normalized - proj, axis=1)
else:
# ≥3 unique extremes in ≥3-D space: fit a hyperplane via SVD
centroid = extreme_points.mean(axis=0)
centered = extreme_points - centroid
_, _, Vt = np.linalg.svd(centered, full_matrices=True)
normal = Vt[-1]
normal = normal / (np.linalg.norm(normal) + 1e-20)
distances = np.abs((normalized - centroid) @ normal)
knee_indices = np.argsort(distances)[::-1][:n_knees]
return [self.frontier_points[i] for i in knee_indices]
def _knee_angle(self, normalized: np.ndarray, n_knees: int) -> List[ParetoPoint]:
"""Select points where adjacent segments form the sharpest angle.
Points are sorted along the first objective before computing
inter-segment angles. Most reliable for bi-objective frontiers.
"""
n_points = normalized.shape[0]
# Sort by first objective
sort_idx = np.argsort(normalized[:, 0])
sorted_norm = normalized[sort_idx]
if n_points < 3:
# Not enough interior points for angle computation
return [self.frontier_points[sort_idx[0]]]
# Compute angle at each interior point; endpoints get inf
angles = np.full(n_points, np.inf)
for j in range(1, n_points - 1):
v_prev = sorted_norm[j - 1] - sorted_norm[j]
v_next = sorted_norm[j + 1] - sorted_norm[j]
denom = np.linalg.norm(v_prev) * np.linalg.norm(v_next)
if denom < 1e-20:
continue
cos_angle = np.clip(np.dot(v_prev, v_next) / denom, -1.0, 1.0)
angles[j] = float(np.arccos(cos_angle))
# Smallest angle = sharpest bend = knee
knee_sorted = np.argsort(angles)[:n_knees]
knee_indices = sort_idx[knee_sorted]
return [self.frontier_points[i] for i in knee_indices]
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Convert frontier points to pandas DataFrame.
Returns:
DataFrame with objectives and decision variables
"""
if not self.frontier_points:
return pd.DataFrame()
data = []
for point in self.frontier_points:
row = point.objectives.copy()
row["crowding_distance"] = point.crowding_distance
row["is_dominated"] = point.is_dominated
# Add decision variables
for i, val in enumerate(point.decision_variables):
row[f"decision_var_{i}"] = val
data.append(row)
return pd.DataFrame(data)