"""Advanced optimization algorithms for constrained insurance decision making.
This module implements sophisticated optimization methods including trust-region,
penalty methods, augmented Lagrangian, and multi-start techniques for finding
global optima in complex insurance optimization problems.
"""
from dataclasses import dataclass, field
from enum import Enum
import logging
from typing import Any, Callable, Dict, List, Optional
import numpy as np
from scipy.optimize import Bounds, NonlinearConstraint, OptimizeResult, minimize
logger = logging.getLogger(__name__)
[docs]
class ConstraintType(Enum):
"""Types of constraints in optimization."""
EQUALITY = "eq"
INEQUALITY = "ineq"
BOUNDS = "bounds"
[docs]
@dataclass
class ConstraintViolation:
"""Information about constraint violations."""
constraint_name: str
violation_amount: float
constraint_type: ConstraintType
current_value: float
limit_value: float
is_satisfied: bool
[docs]
def __str__(self) -> str:
"""String representation of violation."""
status = "✓" if self.is_satisfied else "✗"
return (
f"{status} {self.constraint_name}: "
f"{self.current_value:.4f} vs {self.limit_value:.4f} "
f"(violation: {self.violation_amount:.4f})"
)
[docs]
@dataclass
class ConvergenceMonitor:
"""Monitor and track convergence of optimization algorithms."""
max_iterations: int = 1000
tolerance: float = 1e-6
objective_history: List[float] = field(default_factory=list)
constraint_violation_history: List[float] = field(default_factory=list)
gradient_norm_history: List[float] = field(default_factory=list)
step_size_history: List[float] = field(default_factory=list)
iteration_count: int = 0
converged: bool = False
convergence_message: str = ""
[docs]
def update(
self,
objective: float,
constraint_violation: float = 0.0,
gradient_norm: float = 0.0,
step_size: float = 0.0,
):
"""Update convergence history."""
self.iteration_count += 1
self.objective_history.append(objective)
self.constraint_violation_history.append(constraint_violation)
self.gradient_norm_history.append(gradient_norm)
self.step_size_history.append(step_size)
# Check convergence criteria
if self.iteration_count >= self.max_iterations:
self.converged = True
self.convergence_message = "Maximum iterations reached"
elif len(self.objective_history) >= 2:
obj_change = abs(self.objective_history[-1] - self.objective_history[-2])
if obj_change < self.tolerance:
self.converged = True
self.convergence_message = f"Objective converged (change: {obj_change:.2e})"
elif 0 < gradient_norm < self.tolerance:
self.converged = True
self.convergence_message = f"Gradient converged (norm: {gradient_norm:.2e})"
elif 0 < gradient_norm < self.tolerance:
self.converged = True
self.convergence_message = f"Gradient converged (norm: {gradient_norm:.2e})"
[docs]
def get_summary(self) -> Dict[str, Any]:
"""Get convergence summary statistics."""
return {
"iterations": self.iteration_count,
"converged": self.converged,
"message": self.convergence_message,
"final_objective": self.objective_history[-1] if self.objective_history else None,
"final_constraint_violation": (
self.constraint_violation_history[-1] if self.constraint_violation_history else 0.0
),
"objective_improvement": (
self.objective_history[0] - self.objective_history[-1]
if len(self.objective_history) > 1
else 0.0
),
}
[docs]
@dataclass
class AdaptivePenaltyParameters:
"""Parameters for adaptive penalty method."""
initial_penalty: float = 10.0
penalty_increase_factor: float = 2.0
max_penalty: float = 1e6
constraint_tolerance: float = 1e-4
penalty_update_frequency: int = 10
current_penalties: Dict[str, float] = field(default_factory=dict)
[docs]
def update_penalties(self, violations: List[ConstraintViolation]):
"""Update penalty parameters based on constraint violations."""
for violation in violations:
if not violation.is_satisfied:
# Increase penalty for violated constraints
current = self.current_penalties.get(
violation.constraint_name, self.initial_penalty
)
new_penalty = min(current * self.penalty_increase_factor, self.max_penalty)
self.current_penalties[violation.constraint_name] = new_penalty
logger.debug(
f"Updated penalty for {violation.constraint_name}: "
f"{current:.2f} -> {new_penalty:.2f}"
)
[docs]
class TrustRegionOptimizer:
"""Trust-region constrained optimization with adaptive radius adjustment."""
def __init__(
self,
objective_fn: Callable,
gradient_fn: Optional[Callable] = None,
hessian_fn: Optional[Callable] = None,
constraints: Optional[List[Dict[str, Any]]] = None,
bounds: Optional[Bounds] = None,
):
"""Initialize trust-region optimizer.
Args:
objective_fn: Objective function to minimize
gradient_fn: Gradient function (computed numerically if None)
hessian_fn: Hessian function (approximated if None)
constraints: List of constraint dictionaries
bounds: Variable bounds
"""
self.objective_fn = objective_fn
self.gradient_fn = gradient_fn
self.hessian_fn = hessian_fn
self.constraints = constraints or []
self.bounds = bounds
self.convergence_monitor = ConvergenceMonitor()
[docs]
def optimize(
self,
x0: np.ndarray,
initial_radius: float = 1.0,
max_radius: float = 10.0,
eta: float = 0.15,
max_iter: int = 1000,
tol: float = 1e-6,
) -> OptimizeResult:
"""Run trust-region optimization.
Args:
x0: Initial point
initial_radius: Initial trust region radius
max_radius: Maximum trust region radius
eta: Minimum reduction ratio for accepting step
max_iter: Maximum iterations
tol: Convergence tolerance
Returns:
Optimization result
"""
logger.info("Starting trust-region optimization")
# Use scipy's trust-region method with our enhancements
if self.constraints:
# Convert constraints to scipy format
scipy_constraints = self._convert_constraints()
# Set Jacobian to finite differences if not provided
jac = self.gradient_fn if self.gradient_fn is not None else "2-point"
result = minimize(
self.objective_fn,
x0,
method="trust-constr",
jac=jac,
hess=self.hessian_fn,
constraints=scipy_constraints,
bounds=self.bounds,
options={
"initial_tr_radius": initial_radius,
"maxiter": max_iter,
"gtol": tol,
"xtol": tol,
"verbose": 0,
},
)
else:
# Unconstrained trust-region - use L-BFGS-B if no gradient
if self.gradient_fn is None:
result = minimize(
self.objective_fn,
x0,
method="L-BFGS-B",
bounds=self.bounds,
options={"maxiter": max_iter, "ftol": tol},
)
else:
# trust-ncg requires a Hessian; fall back to BFGS when absent
method = "trust-ncg" if self.hessian_fn else "BFGS"
result = minimize(
self.objective_fn,
x0,
method=method,
jac=self.gradient_fn,
hess=self.hessian_fn if self.hessian_fn else None,
options={"maxiter": max_iter, "gtol": tol},
)
# Add convergence monitoring information
result.convergence_info = self.convergence_monitor.get_summary()
logger.info(f"Trust-region optimization completed: {result.message}")
return result
def _convert_constraints(self) -> List[NonlinearConstraint]:
"""Convert constraint dictionaries to scipy format."""
scipy_constraints = []
for constr in self.constraints:
# Set Jacobian to finite differences if not provided
jac = constr.get("jac", "2-point")
if constr["type"] == "ineq":
# Inequality constraint: g(x) >= 0
scipy_constraints.append(NonlinearConstraint(constr["fun"], 0, np.inf, jac=jac))
elif constr["type"] == "eq":
# Equality constraint: h(x) = 0
scipy_constraints.append(NonlinearConstraint(constr["fun"], 0, 0, jac=jac))
return scipy_constraints
[docs]
class PenaltyMethodOptimizer:
"""Optimization using penalty method with adaptive penalty parameters."""
def __init__(
self,
objective_fn: Callable,
constraints: List[Dict[str, Any]],
bounds: Optional[Bounds] = None,
):
"""Initialize penalty method optimizer.
Args:
objective_fn: Original objective function
constraints: List of constraints
bounds: Variable bounds
"""
self.objective_fn = objective_fn
self.constraints = constraints
self.bounds = bounds
self.penalty_params = AdaptivePenaltyParameters()
self.convergence_monitor = ConvergenceMonitor()
def _penalized_objective(self, x: np.ndarray, penalty_multipliers: Dict[str, float]) -> float:
"""Compute penalized objective function.
Args:
x: Current point
penalty_multipliers: Current penalty parameters
Returns:
Penalized objective value
"""
obj = self.objective_fn(x)
# Add penalty terms for constraint violations
total_penalty = 0.0
for i, constr in enumerate(self.constraints):
constr_name = f"constraint_{i}"
penalty = penalty_multipliers.get(constr_name, self.penalty_params.initial_penalty)
if constr["type"] == "ineq":
# Inequality constraint g(x) >= 0
violation = max(0, -constr["fun"](x))
else:
# Equality constraint h(x) = 0
violation = abs(constr["fun"](x))
total_penalty += penalty * violation**2
return float(obj + total_penalty)
[docs]
def optimize(
self,
x0: np.ndarray,
method: str = "L-BFGS-B",
max_outer_iter: int = 50,
max_inner_iter: int = 100,
tol: float = 1e-6,
) -> OptimizeResult:
"""Run penalty method optimization.
Args:
x0: Initial point
method: Inner optimization method
max_outer_iter: Maximum outer iterations
max_inner_iter: Maximum inner iterations per outer loop
tol: Convergence tolerance
Returns:
Optimization result
"""
logger.info("Starting penalty method optimization")
x_current = x0.copy()
best_x = x0.copy()
best_obj = float("inf")
# Initialize penalties
for i in range(len(self.constraints)):
self.penalty_params.current_penalties[f"constraint_{i}"] = (
self.penalty_params.initial_penalty
)
for outer_iter in range(max_outer_iter):
# Solve penalized problem
def penalized_fn(x):
return self._penalized_objective(x, self.penalty_params.current_penalties)
result = minimize(
penalized_fn,
x_current,
method=method,
bounds=self.bounds,
options={"maxiter": max_inner_iter, "ftol": tol},
)
x_current = result.x
# Check constraint violations
violations = self._check_violations(x_current)
# Update convergence monitor
constraint_violation = sum(v.violation_amount for v in violations if not v.is_satisfied)
self.convergence_monitor.update(result.fun, constraint_violation)
# Check if this is the best solution
actual_obj = self.objective_fn(x_current)
if (
actual_obj < best_obj
and constraint_violation < self.penalty_params.constraint_tolerance
):
best_x = x_current.copy()
best_obj = actual_obj
# Check convergence
if constraint_violation < self.penalty_params.constraint_tolerance:
logger.info(f"Converged after {outer_iter + 1} outer iterations")
break
# Update penalties
if (outer_iter + 1) % self.penalty_params.penalty_update_frequency == 0:
self.penalty_params.update_penalties(violations)
# Create final result
final_result = OptimizeResult(
x=best_x,
fun=self.objective_fn(best_x),
success=constraint_violation < self.penalty_params.constraint_tolerance,
nit=outer_iter + 1,
message="Penalty method optimization completed",
convergence_info=self.convergence_monitor.get_summary(),
)
return final_result
def _check_violations(self, x: np.ndarray) -> List[ConstraintViolation]:
"""Check constraint violations at current point."""
violations = []
for i, constr in enumerate(self.constraints):
constr_value = constr["fun"](x)
if constr["type"] == "ineq":
# Inequality g(x) >= 0
violation = max(0, -constr_value)
is_satisfied = constr_value >= -self.penalty_params.constraint_tolerance
limit_value = 0.0
else:
# Equality h(x) = 0
violation = abs(constr_value)
is_satisfied = violation <= self.penalty_params.constraint_tolerance
limit_value = 0.0
violations.append(
ConstraintViolation(
constraint_name=f"constraint_{i}",
violation_amount=violation,
constraint_type=(
ConstraintType.INEQUALITY
if constr["type"] == "ineq"
else ConstraintType.EQUALITY
),
current_value=constr_value,
limit_value=limit_value,
is_satisfied=is_satisfied,
)
)
return violations
[docs]
class AugmentedLagrangianOptimizer:
"""Augmented Lagrangian method for constrained optimization."""
def __init__(
self,
objective_fn: Callable,
constraints: List[Dict[str, Any]],
bounds: Optional[Bounds] = None,
):
"""Initialize augmented Lagrangian optimizer.
Args:
objective_fn: Original objective function
constraints: List of constraints
bounds: Variable bounds
"""
self.objective_fn = objective_fn
self.constraints = constraints
self.bounds = bounds
self.convergence_monitor = ConvergenceMonitor()
def _augmented_lagrangian(
self,
x: np.ndarray,
lambdas: np.ndarray,
mus: np.ndarray,
rho: float,
) -> float:
"""Compute augmented Lagrangian function.
Args:
x: Current point
lambdas: Lagrange multipliers for inequality constraints
mus: Lagrange multipliers for equality constraints
rho: Penalty parameter
Returns:
Augmented Lagrangian value
"""
L = self.objective_fn(x)
ineq_idx = 0
eq_idx = 0
for constr in self.constraints:
if constr["type"] == "ineq":
# Inequality constraint g(x) >= 0
g = constr["fun"](x)
# Standard augmented Lagrangian (Bertsekas, 1999, Section 4.2):
# L_A += (rho/2) * max(0, lambda/rho - g)^2 - lambda^2/(2*rho)
slack = max(0, lambdas[ineq_idx] / rho - g)
L += (rho / 2) * slack**2 - lambdas[ineq_idx] ** 2 / (2 * rho)
ineq_idx += 1
else:
# Equality constraint h(x) = 0
h = constr["fun"](x)
# Augmented term: mu*h + (rho/2)*h^2
L += mus[eq_idx] * h + (rho / 2) * h**2
eq_idx += 1
return float(L)
[docs]
def optimize( # pylint: disable=too-many-locals
self,
x0: np.ndarray,
max_outer_iter: int = 50,
max_inner_iter: int = 100,
tol: float = 1e-6,
rho_init: float = 1.0,
rho_max: float = 1e4,
) -> OptimizeResult:
"""Run augmented Lagrangian optimization.
Args:
x0: Initial point
max_outer_iter: Maximum outer iterations
max_inner_iter: Maximum inner iterations
tol: Convergence tolerance
rho_init: Initial penalty parameter
rho_max: Maximum penalty parameter
Returns:
Optimization result
"""
logger.info("Starting augmented Lagrangian optimization")
# Count constraints
n_ineq = sum(1 for c in self.constraints if c["type"] == "ineq")
n_eq = sum(1 for c in self.constraints if c["type"] == "eq")
# Initialize
x_current = x0.copy()
lambdas = np.zeros(n_ineq) # Inequality multipliers
mus = np.zeros(n_eq) # Equality multipliers
rho = rho_init
best_x = x0.copy()
best_obj = float("inf")
for outer_iter in range(max_outer_iter):
# Minimize augmented Lagrangian
def aug_lag_fn(x):
return self._augmented_lagrangian(x, lambdas, mus, rho)
result = minimize(
aug_lag_fn,
x_current,
method="L-BFGS-B",
bounds=self.bounds,
options={"maxiter": max_inner_iter, "ftol": tol},
)
x_current = result.x
# Update multipliers
ineq_idx = 0
eq_idx = 0
constraint_violation = 0.0
for constr in self.constraints:
if constr["type"] == "ineq":
g = constr["fun"](x_current)
# Update inequality multiplier
lambdas[ineq_idx] = max(0, lambdas[ineq_idx] - rho * g)
constraint_violation += max(0, -g) ** 2
ineq_idx += 1
else:
h = constr["fun"](x_current)
# Update equality multiplier
mus[eq_idx] = mus[eq_idx] + rho * h
constraint_violation += h**2
eq_idx += 1
constraint_violation = np.sqrt(constraint_violation)
# Update convergence monitor
self.convergence_monitor.update(result.fun, constraint_violation)
# Check if this is the best solution
obj_value = self.objective_fn(x_current)
if obj_value < best_obj and constraint_violation < tol:
best_x = x_current.copy()
best_obj = obj_value
# Check convergence
if constraint_violation < tol:
logger.info(f"Converged after {outer_iter + 1} outer iterations")
break
# Update penalty parameter
should_increase_penalty = True
if len(self.convergence_monitor.constraint_violation_history) > 1:
should_increase_penalty = (
constraint_violation
> 0.5 * self.convergence_monitor.constraint_violation_history[-2]
)
if should_increase_penalty:
rho = min(rho * 2, rho_max)
# Create final result
final_result = OptimizeResult(
x=best_x,
fun=best_obj,
success=constraint_violation < tol,
nit=outer_iter + 1,
message="Augmented Lagrangian optimization completed",
convergence_info=self.convergence_monitor.get_summary(),
)
return final_result
[docs]
class MultiStartOptimizer:
"""Multi-start optimization for finding global optima."""
def __init__(
self,
objective_fn: Callable,
bounds: Bounds,
constraints: Optional[List[Dict[str, Any]]] = None,
base_optimizer: str = "SLSQP",
):
"""Initialize multi-start optimizer.
Args:
objective_fn: Objective function to minimize
bounds: Variable bounds
constraints: Optional constraints
base_optimizer: Base optimization method to use
"""
self.objective_fn = objective_fn
self.bounds = bounds
self.constraints = constraints or []
self.base_optimizer = base_optimizer
[docs]
def optimize(
self,
n_starts: int = 10,
x0: Optional[np.ndarray] = None,
seed: Optional[int] = None,
parallel: bool = False,
) -> OptimizeResult:
"""Run multi-start optimization.
Args:
n_starts: Number of random starts
x0: Optional initial point (included as first start)
seed: Random seed for reproducibility
parallel: Whether to run starts in parallel
Returns:
Best optimization result across all starts
"""
logger.info(f"Starting multi-start optimization with {n_starts} starts")
rng = np.random.default_rng(seed)
# Generate starting points
starting_points = self._generate_starting_points(n_starts, x0, rng)
# Run optimization from each starting point
results = []
for i, start_point in enumerate(starting_points):
logger.debug(f"Running optimization from start point {i + 1}/{n_starts}")
try:
if self.base_optimizer == "trust-region":
tr_optimizer = TrustRegionOptimizer(
self.objective_fn,
constraints=self.constraints,
bounds=self.bounds,
)
result = tr_optimizer.optimize(start_point)
elif self.base_optimizer == "penalty":
pm_optimizer = PenaltyMethodOptimizer(
self.objective_fn,
self.constraints,
self.bounds,
)
result = pm_optimizer.optimize(start_point)
elif self.base_optimizer == "augmented-lagrangian":
al_optimizer = AugmentedLagrangianOptimizer(
self.objective_fn,
self.constraints,
self.bounds,
)
result = al_optimizer.optimize(start_point)
elif self.base_optimizer == "enhanced-slsqp":
es_optimizer = EnhancedSLSQPOptimizer(
self.objective_fn,
gradient_fn=None, # Let it compute numerically
constraints=self.constraints,
bounds=self.bounds,
)
result = es_optimizer.optimize(start_point)
else:
# Default to scipy minimize - map enhanced-slsqp to SLSQP
method = (
"SLSQP" if self.base_optimizer == "enhanced-slsqp" else self.base_optimizer
)
result = minimize(
self.objective_fn,
start_point,
method=method,
bounds=self.bounds,
constraints=self.constraints,
options={"maxiter": 1000},
)
results.append(result)
except (ValueError, RuntimeError, TypeError) as e:
logger.warning(f"Optimization failed from start point {i + 1}: {e}")
continue
if not results:
raise RuntimeError("All optimization attempts failed")
# Find best result
best_result = min(results, key=lambda r: r.fun if r.success else float("inf"))
# Add multi-start specific information
best_result.n_starts = n_starts
best_result.n_successful = sum(1 for r in results if r.success)
best_result.all_objectives = [r.fun for r in results if r.success]
logger.info(
f"Multi-start completed: {best_result.n_successful}/{n_starts} successful, "
f"best objective: {best_result.fun:.6f}"
)
return best_result
def _generate_starting_points(
self, n_starts: int, x0: Optional[np.ndarray], rng: np.random.Generator
) -> List[np.ndarray]:
"""Generate diverse starting points within bounds."""
n_vars = len(self.bounds.lb)
starting_points = []
# Include provided initial point if given
if x0 is not None:
starting_points.append(x0)
n_starts -= 1
# Generate random points using Latin Hypercube Sampling for better coverage
for _ in range(n_starts):
point = np.zeros(n_vars)
for i in range(n_vars):
lb = self.bounds.lb[i]
ub = self.bounds.ub[i]
# Handle infinite bounds
if np.isinf(lb):
lb = -1e6
if np.isinf(ub):
ub = 1e6
point[i] = rng.uniform(lb, ub)
starting_points.append(point)
return starting_points
[docs]
class EnhancedSLSQPOptimizer:
"""Enhanced SLSQP with adaptive step sizing and improved convergence."""
def __init__(
self,
objective_fn: Callable,
gradient_fn: Optional[Callable] = None,
constraints: Optional[List[Dict[str, Any]]] = None,
bounds: Optional[Bounds] = None,
):
"""Initialize enhanced SLSQP optimizer.
Args:
objective_fn: Objective function
gradient_fn: Gradient function (computed numerically if None)
constraints: List of constraints
bounds: Variable bounds
"""
self.objective_fn = objective_fn
self.gradient_fn = gradient_fn
self.constraints = constraints or []
self.bounds = bounds
self.convergence_monitor = ConvergenceMonitor()
# Initialize attributes that may be set during optimization
self.step_size: float = 1.0
self.prev_x: Optional[np.ndarray] = None
self.prev_obj: Optional[float] = None
[docs]
def optimize(
self,
x0: np.ndarray,
adaptive_step: bool = True,
line_search: str = "armijo",
max_iter: int = 1000,
tol: float = 1e-6,
) -> OptimizeResult:
"""Run enhanced SLSQP optimization.
Args:
x0: Initial point
adaptive_step: Whether to use adaptive step sizing
line_search: Line search method ("armijo" or "wolfe")
max_iter: Maximum iterations
tol: Convergence tolerance
Returns:
Optimization result
"""
logger.info("Starting enhanced SLSQP optimization")
# Configure options
options = {
"maxiter": max_iter,
"ftol": tol,
"disp": False,
}
# Track adaptive step sizing
if adaptive_step:
self.step_size = 1.0
self.prev_x = None
self.prev_obj = None
# Run optimization
result = minimize(
self.objective_fn,
x0,
method="SLSQP",
jac=self.gradient_fn,
bounds=self.bounds,
constraints=self.constraints,
options=options,
)
# Add convergence information
result.convergence_info = self.convergence_monitor.get_summary()
return result
[docs]
def create_optimizer(
method: str,
objective_fn: Callable,
constraints: Optional[List[Dict[str, Any]]] = None,
bounds: Optional[Bounds] = None,
**kwargs,
) -> Any:
"""Factory function to create appropriate optimizer.
Args:
method: Optimization method name
objective_fn: Objective function
constraints: Optional constraints
bounds: Optional bounds
**kwargs: Additional optimizer-specific arguments
Returns:
Configured optimizer instance
"""
method = method.lower()
if method == "trust-region":
return TrustRegionOptimizer(objective_fn, constraints=constraints, bounds=bounds, **kwargs)
if method == "penalty":
return PenaltyMethodOptimizer(objective_fn, constraints or [], bounds, **kwargs)
if method == "augmented-lagrangian":
return AugmentedLagrangianOptimizer(objective_fn, constraints or [], bounds, **kwargs)
if method == "multi-start":
return MultiStartOptimizer(objective_fn, bounds or Bounds([], []), constraints, **kwargs)
if method == "enhanced-slsqp":
return EnhancedSLSQPOptimizer(
objective_fn, constraints=constraints, bounds=bounds, **kwargs
)
raise ValueError(f"Unknown optimization method: {method}")