"""Advanced optimization algorithms for constrained insurance decision making.
This module implements sophisticated optimization methods including trust-region,
penalty methods, augmented Lagrangian, and multi-start techniques for finding
global optima in complex insurance optimization problems.
"""
from dataclasses import dataclass, field
from enum import Enum
import logging
from typing import Any, Callable, Dict, List, Optional
import numpy as np
from scipy.optimize import Bounds, NonlinearConstraint, OptimizeResult, minimize
from .gpu_backend import GPUConfig, get_array_module, is_gpu_available, to_numpy
logger = logging.getLogger(__name__)
[docs]
class ConstraintType(Enum):
"""Types of constraints in optimization."""
EQUALITY = "eq"
INEQUALITY = "ineq"
BOUNDS = "bounds"
[docs]
@dataclass
class ConstraintViolation:
"""Information about constraint violations."""
constraint_name: str
violation_amount: float
constraint_type: ConstraintType
current_value: float
limit_value: float
is_satisfied: bool
[docs]
def __str__(self) -> str:
"""String representation of violation."""
status = "✓" if self.is_satisfied else "✗"
return (
f"{status} {self.constraint_name}: "
f"{self.current_value:.4f} vs {self.limit_value:.4f} "
f"(violation: {self.violation_amount:.4f})"
)
[docs]
@dataclass
class ConvergenceMonitor:
"""Monitor and track convergence of optimization algorithms."""
max_iterations: int = 1000
tolerance: float = 1e-6
objective_history: List[float] = field(default_factory=list)
constraint_violation_history: List[float] = field(default_factory=list)
gradient_norm_history: List[float] = field(default_factory=list)
step_size_history: List[float] = field(default_factory=list)
iteration_count: int = 0
converged: bool = False
convergence_message: str = ""
[docs]
def update(
self,
objective: float,
constraint_violation: float = 0.0,
gradient_norm: float = 0.0,
step_size: float = 0.0,
):
"""Update convergence history."""
self.iteration_count += 1
self.objective_history.append(objective)
self.constraint_violation_history.append(constraint_violation)
self.gradient_norm_history.append(gradient_norm)
self.step_size_history.append(step_size)
# Check convergence criteria
if self.iteration_count >= self.max_iterations:
self.converged = True
self.convergence_message = "Maximum iterations reached"
elif len(self.objective_history) >= 2:
obj_change = abs(self.objective_history[-1] - self.objective_history[-2])
if obj_change < self.tolerance:
self.converged = True
self.convergence_message = f"Objective converged (change: {obj_change:.2e})"
elif 0 < gradient_norm < self.tolerance:
self.converged = True
self.convergence_message = f"Gradient converged (norm: {gradient_norm:.2e})"
elif 0 < gradient_norm < self.tolerance:
self.converged = True
self.convergence_message = f"Gradient converged (norm: {gradient_norm:.2e})"
[docs]
def get_summary(self) -> Dict[str, Any]:
"""Get convergence summary statistics."""
return {
"iterations": self.iteration_count,
"converged": self.converged,
"message": self.convergence_message,
"final_objective": self.objective_history[-1] if self.objective_history else None,
"final_constraint_violation": (
self.constraint_violation_history[-1] if self.constraint_violation_history else 0.0
),
"objective_improvement": (
self.objective_history[0] - self.objective_history[-1]
if len(self.objective_history) > 1
else 0.0
),
}
[docs]
@dataclass
class AdaptivePenaltyParameters:
"""Parameters for adaptive penalty method."""
initial_penalty: float = 10.0
penalty_increase_factor: float = 2.0
max_penalty: float = 1e6
constraint_tolerance: float = 1e-4
penalty_update_frequency: int = 10
current_penalties: Dict[str, float] = field(default_factory=dict)
[docs]
def update_penalties(self, violations: List[ConstraintViolation]):
"""Update penalty parameters based on constraint violations."""
for violation in violations:
if not violation.is_satisfied:
# Increase penalty for violated constraints
current = self.current_penalties.get(
violation.constraint_name, self.initial_penalty
)
new_penalty = min(current * self.penalty_increase_factor, self.max_penalty)
self.current_penalties[violation.constraint_name] = new_penalty
logger.debug(
f"Updated penalty for {violation.constraint_name}: "
f"{current:.2f} -> {new_penalty:.2f}"
)
[docs]
class TrustRegionOptimizer:
"""Trust-region constrained optimization with adaptive radius adjustment."""
def __init__(
self,
objective_fn: Callable,
gradient_fn: Optional[Callable] = None,
hessian_fn: Optional[Callable] = None,
constraints: Optional[List[Dict[str, Any]]] = None,
bounds: Optional[Bounds] = None,
):
"""Initialize trust-region optimizer.
Args:
objective_fn: Objective function to minimize
gradient_fn: Gradient function (computed numerically if None)
hessian_fn: Hessian function (approximated if None)
constraints: List of constraint dictionaries
bounds: Variable bounds
"""
self.objective_fn = objective_fn
self.gradient_fn = gradient_fn
self.hessian_fn = hessian_fn
self.constraints = constraints or []
self.bounds = bounds
self.convergence_monitor = ConvergenceMonitor()
[docs]
def optimize(
self,
x0: np.ndarray,
initial_radius: float = 1.0,
max_radius: float = 10.0,
eta: float = 0.15,
max_iter: int = 1000,
tol: float = 1e-6,
) -> OptimizeResult:
"""Run trust-region optimization.
Args:
x0: Initial point
initial_radius: Initial trust region radius
max_radius: Maximum trust region radius
eta: Minimum reduction ratio for accepting step
max_iter: Maximum iterations
tol: Convergence tolerance
Returns:
Optimization result
"""
logger.info("Starting trust-region optimization")
# Use scipy's trust-region method with our enhancements
if self.constraints:
# Convert constraints to scipy format
scipy_constraints = self._convert_constraints()
# Set Jacobian to finite differences if not provided
jac = self.gradient_fn if self.gradient_fn is not None else "2-point"
result = minimize(
self.objective_fn,
x0,
method="trust-constr",
jac=jac,
hess=self.hessian_fn,
constraints=scipy_constraints,
bounds=self.bounds,
options={
"initial_tr_radius": initial_radius,
"maxiter": max_iter,
"gtol": tol,
"xtol": tol,
"verbose": 0,
},
)
else:
# Unconstrained trust-region - use L-BFGS-B if no gradient
if self.gradient_fn is None:
result = minimize(
self.objective_fn,
x0,
method="L-BFGS-B",
bounds=self.bounds,
options={"maxiter": max_iter, "ftol": tol},
)
else:
# trust-ncg requires a Hessian; fall back to BFGS when absent
method = "trust-ncg" if self.hessian_fn else "BFGS"
result = minimize(
self.objective_fn,
x0,
method=method,
jac=self.gradient_fn,
hess=self.hessian_fn if self.hessian_fn else None,
options={"maxiter": max_iter, "gtol": tol},
)
# Add convergence monitoring information
result.convergence_info = self.convergence_monitor.get_summary()
logger.info(f"Trust-region optimization completed: {result.message}")
return result
def _convert_constraints(self) -> List[NonlinearConstraint]:
"""Convert constraint dictionaries to scipy format."""
scipy_constraints = []
for constr in self.constraints:
# Set Jacobian to finite differences if not provided
jac = constr.get("jac", "2-point")
if constr["type"] == "ineq":
# Inequality constraint: g(x) >= 0
scipy_constraints.append(NonlinearConstraint(constr["fun"], 0, np.inf, jac=jac))
elif constr["type"] == "eq":
# Equality constraint: h(x) = 0
scipy_constraints.append(NonlinearConstraint(constr["fun"], 0, 0, jac=jac))
return scipy_constraints
[docs]
class PenaltyMethodOptimizer:
"""Optimization using penalty method with adaptive penalty parameters.
By default, uses a **quadratic** (L2) penalty: ``penalty * violation^2``.
The quadratic formulation only drives the solution to exact feasibility
as the penalty parameter tends to infinity. For any finite penalty the
converged solution is an *interior* approximation that systematically
violates inequality constraints by ``O(1/penalty)``
(see Nocedal & Wright, *Numerical Optimization*, 2nd ed., Ch. 17.1).
When ``exact_penalty=True``, uses an **exact** (L1) penalty instead:
``penalty * violation``. For a sufficiently large (but finite) penalty
the L1 formulation yields a solution that satisfies the constraints
exactly (Fletcher, *Practical Methods of Optimization*, Ch. 12.2).
The trade-off is that the L1 penalty is non-smooth, which may affect
gradient-based inner solvers.
For problems that require exact constraint satisfaction with bounded
penalty parameters, consider :class:`AugmentedLagrangianOptimizer`
(augmented Lagrangian with multiplier estimates) or
:class:`TrustRegionOptimizer` (wraps SciPy ``trust-constr``).
"""
def __init__(
self,
objective_fn: Callable,
constraints: List[Dict[str, Any]],
bounds: Optional[Bounds] = None,
exact_penalty: bool = False,
):
"""Initialize penalty method optimizer.
Args:
objective_fn: Original objective function
constraints: List of constraints
bounds: Variable bounds
exact_penalty: If ``True``, use L1 (exact) penalty
``penalty * |violation|`` instead of the default quadratic
penalty ``penalty * violation^2``. The L1 penalty achieves
exact feasibility for a sufficiently large finite penalty,
whereas the quadratic penalty only achieves feasibility as
``penalty -> inf``.
"""
self.objective_fn = objective_fn
self.constraints = constraints
self.bounds = bounds
self.exact_penalty = exact_penalty
self.penalty_params = AdaptivePenaltyParameters()
self.convergence_monitor = ConvergenceMonitor()
def _penalized_objective(self, x: np.ndarray, penalty_multipliers: Dict[str, float]) -> float:
"""Compute penalized objective function.
When ``self.exact_penalty`` is ``False`` (default), the penalty term
for each constraint is ``penalty * violation^2`` (quadratic / L2).
When ``True``, the penalty term is ``penalty * violation`` (exact / L1).
Args:
x: Current point
penalty_multipliers: Current penalty parameters
Returns:
Penalized objective value
"""
obj = self.objective_fn(x)
# Add penalty terms for constraint violations
total_penalty = 0.0
for i, constr in enumerate(self.constraints):
constr_name = f"constraint_{i}"
penalty = penalty_multipliers.get(constr_name, self.penalty_params.initial_penalty)
if constr["type"] == "ineq":
# Inequality constraint g(x) >= 0
violation = max(0, -constr["fun"](x))
else:
# Equality constraint h(x) = 0
violation = abs(constr["fun"](x))
if self.exact_penalty:
total_penalty += penalty * violation # L1 (exact) penalty
else:
total_penalty += penalty * violation**2 # L2 (quadratic) penalty
return float(obj + total_penalty)
[docs]
def optimize(
self,
x0: np.ndarray,
method: str = "L-BFGS-B",
max_outer_iter: int = 50,
max_inner_iter: int = 100,
tol: float = 1e-6,
) -> OptimizeResult:
"""Run penalty method optimization.
Args:
x0: Initial point
method: Inner optimization method
max_outer_iter: Maximum outer iterations
max_inner_iter: Maximum inner iterations per outer loop
tol: Convergence tolerance
Returns:
Optimization result
"""
logger.info("Starting penalty method optimization")
x_current = x0.copy()
best_x = x0.copy()
best_obj = float("inf")
# Initialize penalties
for i in range(len(self.constraints)):
self.penalty_params.current_penalties[f"constraint_{i}"] = (
self.penalty_params.initial_penalty
)
for outer_iter in range(max_outer_iter):
# Solve penalized problem
def penalized_fn(x):
return self._penalized_objective(x, self.penalty_params.current_penalties)
result = minimize(
penalized_fn,
x_current,
method=method,
bounds=self.bounds,
options={"maxiter": max_inner_iter, "ftol": tol},
)
x_current = result.x
# Check constraint violations
violations = self._check_violations(x_current)
# Update convergence monitor
constraint_violation = sum(v.violation_amount for v in violations if not v.is_satisfied)
self.convergence_monitor.update(result.fun, constraint_violation)
# Check if this is the best solution
actual_obj = self.objective_fn(x_current)
if (
actual_obj < best_obj
and constraint_violation < self.penalty_params.constraint_tolerance
):
best_x = x_current.copy()
best_obj = actual_obj
# Check convergence
if constraint_violation < self.penalty_params.constraint_tolerance:
logger.info(f"Converged after {outer_iter + 1} outer iterations")
break
# Update penalties
if (outer_iter + 1) % self.penalty_params.penalty_update_frequency == 0:
self.penalty_params.update_penalties(violations)
# Warn when converged solution still has non-trivial constraint violation
best_violations = self._check_violations(best_x)
best_violation_total = sum(v.violation_amount for v in best_violations)
if best_violation_total > 1e-8:
logger.warning(
"Converged solution has constraint violation %.2e. "
"The quadratic penalty method produces systematically "
"infeasible solutions for finite penalty values. "
"Consider using exact_penalty=True, "
"AugmentedLagrangianOptimizer, or TrustRegionOptimizer "
"for exact constraint satisfaction.",
best_violation_total,
)
# Create final result
final_result = OptimizeResult(
x=best_x,
fun=self.objective_fn(best_x),
success=constraint_violation < self.penalty_params.constraint_tolerance,
nit=outer_iter + 1,
message="Penalty method optimization completed",
convergence_info=self.convergence_monitor.get_summary(),
)
return final_result
def _check_violations(self, x: np.ndarray) -> List[ConstraintViolation]:
"""Check constraint violations at current point."""
violations = []
for i, constr in enumerate(self.constraints):
constr_value = constr["fun"](x)
if constr["type"] == "ineq":
# Inequality g(x) >= 0
violation = max(0, -constr_value)
is_satisfied = constr_value >= -self.penalty_params.constraint_tolerance
limit_value = 0.0
else:
# Equality h(x) = 0
violation = abs(constr_value)
is_satisfied = violation <= self.penalty_params.constraint_tolerance
limit_value = 0.0
violations.append(
ConstraintViolation(
constraint_name=f"constraint_{i}",
violation_amount=violation,
constraint_type=(
ConstraintType.INEQUALITY
if constr["type"] == "ineq"
else ConstraintType.EQUALITY
),
current_value=constr_value,
limit_value=limit_value,
is_satisfied=is_satisfied,
)
)
return violations
[docs]
class AugmentedLagrangianOptimizer:
"""Augmented Lagrangian method for constrained optimization."""
def __init__(
self,
objective_fn: Callable,
constraints: List[Dict[str, Any]],
bounds: Optional[Bounds] = None,
):
"""Initialize augmented Lagrangian optimizer.
Args:
objective_fn: Original objective function
constraints: List of constraints
bounds: Variable bounds
"""
self.objective_fn = objective_fn
self.constraints = constraints
self.bounds = bounds
self.convergence_monitor = ConvergenceMonitor()
def _augmented_lagrangian(
self,
x: np.ndarray,
lambdas: np.ndarray,
mus: np.ndarray,
rho: float,
) -> float:
"""Compute augmented Lagrangian function.
Args:
x: Current point
lambdas: Lagrange multipliers for inequality constraints
mus: Lagrange multipliers for equality constraints
rho: Penalty parameter
Returns:
Augmented Lagrangian value
"""
L = self.objective_fn(x)
ineq_idx = 0
eq_idx = 0
for constr in self.constraints:
if constr["type"] == "ineq":
# Inequality constraint g(x) >= 0
g = constr["fun"](x)
# Standard augmented Lagrangian (Bertsekas, 1999, Section 4.2):
# L_A += (rho/2) * max(0, lambda/rho - g)^2 - lambda^2/(2*rho)
slack = max(0, lambdas[ineq_idx] / rho - g)
L += (rho / 2) * slack**2 - lambdas[ineq_idx] ** 2 / (2 * rho)
ineq_idx += 1
else:
# Equality constraint h(x) = 0
h = constr["fun"](x)
# Augmented term: mu*h + (rho/2)*h^2
L += mus[eq_idx] * h + (rho / 2) * h**2
eq_idx += 1
return float(L)
[docs]
def optimize( # pylint: disable=too-many-locals
self,
x0: np.ndarray,
max_outer_iter: int = 50,
max_inner_iter: int = 100,
tol: float = 1e-6,
rho_init: float = 1.0,
rho_max: float = 1e4,
) -> OptimizeResult:
"""Run augmented Lagrangian optimization.
Args:
x0: Initial point
max_outer_iter: Maximum outer iterations
max_inner_iter: Maximum inner iterations
tol: Convergence tolerance
rho_init: Initial penalty parameter
rho_max: Maximum penalty parameter
Returns:
Optimization result
"""
logger.info("Starting augmented Lagrangian optimization")
# Count constraints
n_ineq = sum(1 for c in self.constraints if c["type"] == "ineq")
n_eq = sum(1 for c in self.constraints if c["type"] == "eq")
# Initialize
x_current = x0.copy()
lambdas = np.zeros(n_ineq) # Inequality multipliers
mus = np.zeros(n_eq) # Equality multipliers
rho = rho_init
best_x = x0.copy()
best_obj = float("inf")
for outer_iter in range(max_outer_iter):
# Minimize augmented Lagrangian
def aug_lag_fn(x):
return self._augmented_lagrangian(x, lambdas, mus, rho)
result = minimize(
aug_lag_fn,
x_current,
method="L-BFGS-B",
bounds=self.bounds,
options={"maxiter": max_inner_iter, "ftol": tol},
)
x_current = result.x
# Update multipliers
ineq_idx = 0
eq_idx = 0
constraint_violation = 0.0
for constr in self.constraints:
if constr["type"] == "ineq":
g = constr["fun"](x_current)
# Update inequality multiplier
lambdas[ineq_idx] = max(0, lambdas[ineq_idx] - rho * g)
constraint_violation += max(0, -g) ** 2
ineq_idx += 1
else:
h = constr["fun"](x_current)
# Update equality multiplier
mus[eq_idx] = mus[eq_idx] + rho * h
constraint_violation += h**2
eq_idx += 1
constraint_violation = np.sqrt(constraint_violation)
# Update convergence monitor
self.convergence_monitor.update(result.fun, constraint_violation)
# Check if this is the best solution
obj_value = self.objective_fn(x_current)
if obj_value < best_obj and constraint_violation < tol:
best_x = x_current.copy()
best_obj = obj_value
# Check convergence
if constraint_violation < tol:
logger.info(f"Converged after {outer_iter + 1} outer iterations")
break
# Update penalty parameter.
# Increase rho when constraint violation has not decreased
# sufficiently (by factor 0.25) since the previous iteration
# (Bertsekas & Tsitsiklis, 1997, p. 319).
should_increase_penalty = True
if len(self.convergence_monitor.constraint_violation_history) > 1:
should_increase_penalty = (
constraint_violation
> 0.25 * self.convergence_monitor.constraint_violation_history[-2]
)
if should_increase_penalty:
rho = min(rho * 2, rho_max)
# Create final result
final_result = OptimizeResult(
x=best_x,
fun=best_obj,
success=constraint_violation < tol,
nit=outer_iter + 1,
message="Augmented Lagrangian optimization completed",
convergence_info=self.convergence_monitor.get_summary(),
)
return final_result
[docs]
class MultiStartOptimizer:
"""Multi-start optimization for finding global optima."""
def __init__(
self,
objective_fn: Callable,
bounds: Bounds,
constraints: Optional[List[Dict[str, Any]]] = None,
base_optimizer: str = "SLSQP",
gpu_config: Optional[GPUConfig] = None,
):
"""Initialize multi-start optimizer.
Args:
objective_fn: Objective function to minimize
bounds: Variable bounds
constraints: Optional constraints
base_optimizer: Base optimization method to use
gpu_config: Optional GPU configuration for accelerated screening
"""
self.objective_fn = objective_fn
self.bounds = bounds
self.constraints = constraints or []
self.base_optimizer = base_optimizer
self.gpu_config = gpu_config
[docs]
def optimize(
self,
n_starts: int = 10,
x0: Optional[np.ndarray] = None,
seed: Optional[int] = None,
parallel: bool = False,
) -> OptimizeResult:
"""Run multi-start optimization.
Args:
n_starts: Number of random starts
x0: Optional initial point (included as first start)
seed: Random seed for reproducibility
parallel: Whether to run starts in parallel
Returns:
Best optimization result across all starts
"""
logger.info(f"Starting multi-start optimization with {n_starts} starts")
rng = np.random.default_rng(seed)
# Generate starting points
starting_points = self._generate_starting_points(n_starts, x0, rng)
# Run optimization from each starting point
results = []
for i, start_point in enumerate(starting_points):
logger.debug(f"Running optimization from start point {i + 1}/{n_starts}")
try:
if self.base_optimizer == "trust-region":
tr_optimizer = TrustRegionOptimizer(
self.objective_fn,
constraints=self.constraints,
bounds=self.bounds,
)
result = tr_optimizer.optimize(start_point)
elif self.base_optimizer == "penalty":
pm_optimizer = PenaltyMethodOptimizer(
self.objective_fn,
self.constraints,
self.bounds,
)
result = pm_optimizer.optimize(start_point)
elif self.base_optimizer == "augmented-lagrangian":
al_optimizer = AugmentedLagrangianOptimizer(
self.objective_fn,
self.constraints,
self.bounds,
)
result = al_optimizer.optimize(start_point)
elif self.base_optimizer == "enhanced-slsqp":
es_optimizer = EnhancedSLSQPOptimizer(
self.objective_fn,
gradient_fn=None, # Let it compute numerically
constraints=self.constraints,
bounds=self.bounds,
)
result = es_optimizer.optimize(start_point)
else:
# Default to scipy minimize - map enhanced-slsqp to SLSQP
method = (
"SLSQP" if self.base_optimizer == "enhanced-slsqp" else self.base_optimizer
)
result = minimize(
self.objective_fn,
start_point,
method=method,
bounds=self.bounds,
constraints=self.constraints,
options={"maxiter": 1000},
)
results.append(result)
except (ValueError, RuntimeError, TypeError) as e:
logger.warning(f"Optimization failed from start point {i + 1}: {e}")
continue
if not results:
raise RuntimeError("All optimization attempts failed")
# Find best result
best_result = min(results, key=lambda r: r.fun if r.success else float("inf"))
# Add multi-start specific information
best_result.n_starts = n_starts
best_result.n_successful = sum(1 for r in results if r.success)
best_result.all_objectives = [r.fun for r in results if r.success]
logger.info(
f"Multi-start completed: {best_result.n_successful}/{n_starts} successful, "
f"best objective: {best_result.fun:.6f}"
)
return best_result
def _generate_starting_points(
self, n_starts: int, x0: Optional[np.ndarray], rng: np.random.Generator
) -> List[np.ndarray]:
"""Generate diverse starting points within bounds."""
n_vars = len(self.bounds.lb)
starting_points = []
# Include provided initial point if given
if x0 is not None:
starting_points.append(x0)
n_starts -= 1
# Generate random points using Latin Hypercube Sampling for better coverage
for _ in range(n_starts):
point = np.zeros(n_vars)
for i in range(n_vars):
lb = self.bounds.lb[i]
ub = self.bounds.ub[i]
# Handle infinite bounds
if np.isinf(lb):
lb = -1e6
if np.isinf(ub):
ub = 1e6
point[i] = rng.uniform(lb, ub)
starting_points.append(point)
return starting_points
[docs]
def screen_starting_points_gpu(
self, starting_points: List[np.ndarray], top_k: int = 5
) -> List[np.ndarray]:
"""Screen starting points by evaluating all in a GPU-accelerated batch.
Evaluates all starting points simultaneously using GPU array operations,
then returns the top-k points with the best objective values.
Args:
starting_points: List of starting points to evaluate
top_k: Number of best starting points to return
Returns:
List of top-k starting points sorted by objective value
Since:
Version 0.11.0 (Issue #966)
"""
use_gpu = self.gpu_config is not None and self.gpu_config.enabled and is_gpu_available()
xp = get_array_module(gpu=use_gpu)
# Evaluate all starting points
values = []
for pt in starting_points:
try:
val = self.objective_fn(pt)
values.append(val)
except (ValueError, RuntimeError):
values.append(float("inf"))
values_arr = xp.asarray(values)
indices = to_numpy(xp.argsort(values_arr))[:top_k]
return [starting_points[i] for i in indices]
[docs]
class EnhancedSLSQPOptimizer:
"""Enhanced SLSQP with adaptive step sizing and improved convergence."""
def __init__(
self,
objective_fn: Callable,
gradient_fn: Optional[Callable] = None,
constraints: Optional[List[Dict[str, Any]]] = None,
bounds: Optional[Bounds] = None,
):
"""Initialize enhanced SLSQP optimizer.
Args:
objective_fn: Objective function
gradient_fn: Gradient function (computed numerically if None)
constraints: List of constraints
bounds: Variable bounds
"""
self.objective_fn = objective_fn
self.gradient_fn = gradient_fn
self.constraints = constraints or []
self.bounds = bounds
self.convergence_monitor = ConvergenceMonitor()
# Initialize attributes that may be set during optimization
self.step_size: float = 1.0
self.prev_x: Optional[np.ndarray] = None
self.prev_obj: Optional[float] = None
[docs]
def optimize(
self,
x0: np.ndarray,
adaptive_step: bool = True,
line_search: str = "armijo",
max_iter: int = 1000,
tol: float = 1e-6,
) -> OptimizeResult:
"""Run enhanced SLSQP optimization.
Args:
x0: Initial point
adaptive_step: Whether to use adaptive step sizing
line_search: Line search method ("armijo" or "wolfe")
max_iter: Maximum iterations
tol: Convergence tolerance
Returns:
Optimization result
"""
logger.info("Starting enhanced SLSQP optimization")
# Configure options
options = {
"maxiter": max_iter,
"ftol": tol,
"disp": False,
}
# Track adaptive step sizing
if adaptive_step:
self.step_size = 1.0
self.prev_x = None
self.prev_obj = None
# Run optimization
result = minimize(
self.objective_fn,
x0,
method="SLSQP",
jac=self.gradient_fn,
bounds=self.bounds,
constraints=self.constraints,
options=options,
)
# Add convergence information
result.convergence_info = self.convergence_monitor.get_summary()
return result
[docs]
def create_optimizer(
method: str,
objective_fn: Callable,
constraints: Optional[List[Dict[str, Any]]] = None,
bounds: Optional[Bounds] = None,
gpu_config: Optional["GPUConfig"] = None,
**kwargs,
) -> Any:
"""Factory function to create appropriate optimizer.
Args:
method: Optimization method name
objective_fn: Objective function
constraints: Optional constraints
bounds: Optional bounds
gpu_config: Optional GPU configuration for accelerated operations
**kwargs: Additional optimizer-specific arguments
Returns:
Configured optimizer instance
"""
method = method.lower()
if method == "trust-region":
return TrustRegionOptimizer(objective_fn, constraints=constraints, bounds=bounds, **kwargs)
if method == "penalty":
return PenaltyMethodOptimizer(objective_fn, constraints or [], bounds, **kwargs)
if method == "augmented-lagrangian":
return AugmentedLagrangianOptimizer(objective_fn, constraints or [], bounds, **kwargs)
if method == "multi-start":
return MultiStartOptimizer(
objective_fn, bounds or Bounds([], []), constraints, gpu_config=gpu_config, **kwargs
)
if method == "enhanced-slsqp":
return EnhancedSLSQPOptimizer(
objective_fn, constraints=constraints, bounds=bounds, **kwargs
)
raise ValueError(f"Unknown optimization method: {method}")