Source code for ergodic_insurance.hjb_solver

"""Hamilton-Jacobi-Bellman solver for optimal insurance control.

This module implements a Hamilton-Jacobi-Bellman (HJB) partial differential equation
solver for finding optimal insurance strategies through dynamic programming. The solver
handles multi-dimensional state spaces and provides theoretically optimal control policies.

The HJB equation provides globally optimal solutions by solving:

    ∂V/∂t + max_u[L^u V + f(x,u)] = 0

where V is the value function, L^u is the controlled infinitesimal generator,
and f(x,u) is the running cost/reward.

Author: Alex Filiakov
Date: 2025-01-26
"""

# pylint: disable=too-many-lines  # large cohesive HJB solver module (cf. manufacturer.py)

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from itertools import product as itertools_product
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, TypedDict

import numpy as np
from scipy import interpolate, sparse

logger = logging.getLogger(__name__)

# Module-level named constants for numerical tolerances
_DRIFT_THRESHOLD = 1e-10
_MARGINAL_UTILITY_FLOOR = 1e-10
_GAMMA_TOLERANCE = 1e-10

# Policy improvement strategy thresholds
_VECTORIZE_COMBO_THRESHOLD = 5000  # Below: full vectorized batch
_COARSE_STRIDE = 3  # Adaptive: every 3rd point
_REFINE_RADIUS = 2  # Adaptive: +/-2 points around optimum
_DEFAULT_MEMORY_BUDGET_MB = 256  # Max memory for batched evaluation


[docs] class NumericalDivergenceError(RuntimeError): """Raised when the HJB solver detects NaN or Inf in the value function."""
[docs] class TimeSteppingScheme(Enum): """Time stepping schemes for PDE integration.""" EXPLICIT = "explicit" IMPLICIT = "implicit" CRANK_NICOLSON = "crank_nicolson"
[docs] class BoundaryCondition(Enum): """Types of boundary conditions.""" DIRICHLET = "dirichlet" # Fixed value NEUMANN = "neumann" # Fixed derivative ABSORBING = "absorbing" # Zero second derivative REFLECTING = "reflecting" # Zero first derivative
[docs] @dataclass class StateVariable: """Definition of a state variable in the HJB problem.""" name: str min_value: float max_value: float num_points: int boundary_lower: BoundaryCondition = BoundaryCondition.ABSORBING boundary_upper: BoundaryCondition = BoundaryCondition.ABSORBING log_scale: bool = False
[docs] def __post_init__(self): """Validate state variable configuration.""" if self.min_value >= self.max_value: raise ValueError(f"min_value must be less than max_value for {self.name}") if self.num_points < 3: raise ValueError(f"Need at least 3 grid points for {self.name}") if self.log_scale and self.min_value <= 0: raise ValueError(f"Cannot use log scale with non-positive min_value for {self.name}")
[docs] def get_grid(self) -> np.ndarray: """Generate grid points for this variable. Returns: Array of grid points """ if self.log_scale: return np.logspace(np.log10(self.min_value), np.log10(self.max_value), self.num_points) return np.linspace(self.min_value, self.max_value, self.num_points)
[docs] @dataclass class ControlVariable: """Definition of a control variable in the HJB problem.""" name: str min_value: float max_value: float num_points: int = 50 continuous: bool = True log_scale: bool = False
[docs] def __post_init__(self): """Validate control variable configuration.""" if self.min_value >= self.max_value: raise ValueError(f"min_value must be less than max_value for {self.name}") if self.num_points < 2: raise ValueError(f"Need at least 2 control points for {self.name}") if self.log_scale and self.min_value <= 0: raise ValueError(f"Cannot use log scale with non-positive min_value for {self.name}")
[docs] def get_values(self) -> np.ndarray: """Get discrete control values for optimization. Returns: Array of control values """ if self.log_scale: return np.geomspace(self.min_value, self.max_value, self.num_points) return np.linspace(self.min_value, self.max_value, self.num_points)
[docs] @dataclass class StateSpace: """Multi-dimensional state space for HJB problem. Handles arbitrary dimensionality with proper grid management and boundary condition enforcement. """ state_variables: List[StateVariable]
[docs] def __post_init__(self): """Initialize derived attributes.""" self.ndim = len(self.state_variables) self.shape = tuple(sv.num_points for sv in self.state_variables) self.size = np.prod(self.shape) # Create grids for each dimension self.grids = [sv.get_grid() for sv in self.state_variables] # Create meshgrid for full state space self.meshgrid = np.meshgrid(*self.grids, indexing="ij") # Flatten for linear algebra operations self.flat_grids = [mg.ravel() for mg in self.meshgrid] logger.info(f"Initialized {self.ndim}D state space with shape {self.shape}")
[docs] def get_boundary_mask(self) -> np.ndarray: """Get boolean mask for boundary points. Returns: Boolean array where True indicates boundary points """ mask = np.zeros(self.shape, dtype=bool) for dim, _sv in enumerate(self.state_variables): # Create slice for this dimension's boundaries slices_lower: list[slice | int] = [slice(None)] * self.ndim slices_lower[dim] = 0 mask[tuple(slices_lower)] = True slices_upper: list[slice | int] = [slice(None)] * self.ndim slices_upper[dim] = -1 mask[tuple(slices_upper)] = True return mask
[docs] def interpolate_value(self, value_function: np.ndarray, points: np.ndarray) -> np.ndarray: """Interpolate value function at arbitrary points. Args: value_function: Value function on grid points: Points to interpolate at (shape: [n_points, n_dims]) Returns: Interpolated values """ if self.ndim == 1: interp = interpolate.interp1d( self.grids[0], value_function, kind="cubic", bounds_error=False, fill_value="extrapolate", ) return np.array(interp(points[:, 0])) if self.ndim == 2: interp = interpolate.RegularGridInterpolator( self.grids, value_function, method="linear", bounds_error=False, fill_value=None ) return np.array(interp(points)) # For higher dimensions, use linear interpolation interp = interpolate.RegularGridInterpolator( self.grids, value_function, method="linear", bounds_error=False, fill_value=None ) return np.array(interp(points))
[docs] class UtilityFunction(ABC): """Abstract base class for utility functions. Defines the interface for utility functions used in the HJB equation. Concrete implementations should provide both the utility value and its derivative. """
[docs] @abstractmethod def evaluate(self, wealth: np.ndarray) -> np.ndarray: """Evaluate utility at given wealth levels. Args: wealth: Wealth values Returns: Utility values """ pass # pylint: disable=unnecessary-pass
[docs] @abstractmethod def derivative(self, wealth: np.ndarray) -> np.ndarray: """Compute marginal utility (first derivative). Args: wealth: Wealth values Returns: Marginal utility values """ pass # pylint: disable=unnecessary-pass
[docs] @abstractmethod def inverse_derivative(self, marginal_utility: np.ndarray) -> np.ndarray: """Compute inverse of marginal utility. Used for finding optimal controls in some formulations. Args: marginal_utility: Marginal utility values Returns: Wealth values corresponding to given marginal utilities """ pass # pylint: disable=unnecessary-pass
[docs] class LogUtility(UtilityFunction): """Logarithmic utility function for ergodic optimization. U(w) = log(w) This utility function maximizes the long-term growth rate and is particularly suitable for ergodic analysis. """ def __init__(self, wealth_floor: float = 1e-6): """Initialize log utility. Args: wealth_floor: Minimum wealth to prevent log(0) """ self.wealth_floor = wealth_floor
[docs] def evaluate(self, wealth: np.ndarray) -> np.ndarray: """Evaluate log utility.""" safe_wealth = np.maximum(wealth, self.wealth_floor) return np.array(np.log(safe_wealth))
[docs] def derivative(self, wealth: np.ndarray) -> np.ndarray: """Compute marginal utility: U'(w) = 1/w.""" safe_wealth = np.maximum(wealth, self.wealth_floor) return np.array(1.0 / safe_wealth)
[docs] def inverse_derivative(self, marginal_utility: np.ndarray) -> np.ndarray: """Compute inverse: (U')^(-1)(m) = 1/m.""" safe_marginal = np.maximum(marginal_utility, _MARGINAL_UTILITY_FLOOR) return np.array(1.0 / safe_marginal)
[docs] class PowerUtility(UtilityFunction): """Power (CRRA) utility function with risk aversion parameter. U(w) = w^(1-γ)/(1-γ) for γ ≠ 1 U(w) = log(w) for γ = 1 where γ is the coefficient of relative risk aversion. """ def __init__(self, risk_aversion: float = 2.0, wealth_floor: float = 1e-6): """Initialize power utility. Args: risk_aversion: Coefficient of relative risk aversion (γ) wealth_floor: Minimum wealth to prevent numerical issues """ self.gamma = risk_aversion self.wealth_floor = wealth_floor # Use log utility if gamma is close to 1 if abs(self.gamma - 1.0) < _GAMMA_TOLERANCE: self._log_utility = LogUtility(wealth_floor)
[docs] def evaluate(self, wealth: np.ndarray) -> np.ndarray: """Evaluate power utility.""" if abs(self.gamma - 1.0) < _GAMMA_TOLERANCE: return self._log_utility.evaluate(wealth) safe_wealth = np.maximum(wealth, self.wealth_floor) return np.array(np.power(safe_wealth, 1 - self.gamma) / (1 - self.gamma))
[docs] def derivative(self, wealth: np.ndarray) -> np.ndarray: """Compute marginal utility: U'(w) = w^(-γ).""" if abs(self.gamma - 1.0) < _GAMMA_TOLERANCE: return self._log_utility.derivative(wealth) safe_wealth = np.maximum(wealth, self.wealth_floor) return np.array(np.power(safe_wealth, -self.gamma))
[docs] def inverse_derivative(self, marginal_utility: np.ndarray) -> np.ndarray: """Compute inverse: (U')^(-1)(m) = m^(-1/γ).""" if abs(self.gamma - 1.0) < _GAMMA_TOLERANCE: return self._log_utility.inverse_derivative(marginal_utility) safe_marginal = np.maximum(marginal_utility, _MARGINAL_UTILITY_FLOOR) return np.array(np.power(safe_marginal, -1.0 / self.gamma))
[docs] class ExpectedWealth(UtilityFunction): """Linear utility function for risk-neutral wealth maximization. U(w) = w This represents risk-neutral preferences where the goal is to maximize expected wealth. """
[docs] def evaluate(self, wealth: np.ndarray) -> np.ndarray: """Evaluate linear utility.""" return wealth
[docs] def derivative(self, wealth: np.ndarray) -> np.ndarray: """Compute marginal utility: U'(w) = 1.""" return np.ones_like(wealth)
[docs] def inverse_derivative(self, marginal_utility: np.ndarray) -> np.ndarray: """Inverse is undefined for constant marginal utility.""" raise NotImplementedError("Inverse derivative undefined for linear utility")
[docs] @dataclass class HJBProblem: """Complete specification of an HJB optimal control problem.""" state_space: StateSpace control_variables: List[ControlVariable] utility_function: UtilityFunction dynamics: Callable[[np.ndarray, np.ndarray, float], np.ndarray] running_cost: Callable[[np.ndarray, np.ndarray, float], np.ndarray] terminal_value: Optional[Callable[[np.ndarray], np.ndarray]] = None discount_rate: float = 0.0 time_horizon: Optional[float] = None diffusion: Optional[Callable[[np.ndarray, np.ndarray, float], np.ndarray]] = None """Optional callback returning σ²(x,u,t) with same shape as dynamics output. When provided, the solver includes the ½σ²·∇²V diffusion term.""" jump_term: Optional[ Callable[[np.ndarray, np.ndarray, float, np.ndarray, List[np.ndarray]], np.ndarray] ] = None """Optional PIDE jump term. Signature: ``jump_term(state, control, t, value_function, state_grids)`` returning a scalar contribution per evaluation point (shape ``state.shape[:-1]``). Mathematically this is:: lambda(x, u) * E_X[ V(x - L(X, u)) - V(x) ] evaluated at each (state, control) point. The solver passes the current value function array (shape ``state_space.shape``) and the list of state grids so the callback can interpolate V at post-jump states without needing closure access to solver-internal state. Used to model compound-Poisson losses exactly under multiplicative wealth dynamics, bypassing the diffusion approximation's underestimate of jump penalties when L/w is non-trivial. Treated explicitly (IMEX) in the implicit/Crank-Nicolson time-stepping paths: the jump term is evaluated at the previous V_old and added to the RHS, so the local advection-diffusion operator stays tridiagonal. Stability requires lambda*dt < 1, which is easily satisfied for typical insurance frequencies and time steps. """ initial_value_function: Optional[Callable[[np.ndarray], np.ndarray]] = None """Optional initial guess for V on the state grid (warm-start). When provided, ``HJBSolver.solve()`` uses this callback to initialize the value function instead of zeros (infinite horizon) or the terminal value (finite horizon). **Precedence rule**: If both ``initial_value_function`` and ``terminal_value`` are provided for a finite-horizon problem, ``initial_value_function`` wins and ``terminal_value`` is ignored (with a logged warning). Note that ``__post_init__`` auto-fills ``terminal_value`` with a zero function for finite-horizon problems when not explicitly set, so the warning will fire whenever ``initial_value_function`` is set on a finite-horizon problem. For infinite-horizon problems, ``initial_value_function`` replaces the default zero initialization. The callback receives a stacked grid of shape ``(n_pts_dim1, ..., n_pts_dimD, D)`` and must return shape ``(n_pts_dim1, ..., n_pts_dimD)``. The returned array is reshaped to match ``state_space.shape`` before being assigned to ``value_function``. Captured by the Dirichlet boundary machinery: warm-start values at ``V[0]`` / ``V[-1]`` along Dirichlet-bounded dimensions are used as the boundary values throughout the solve. """ control_feasibility: Optional[Callable[[np.ndarray, np.ndarray], np.ndarray]] = None """Optional state-dependent admissible-control mask. Signature: ``control_feasibility(state_points, control_array) -> mask`` where ``state_points`` has shape ``(n, ndim)``, ``control_array`` has shape ``(n, n_controls)`` (the same flat per-(state, control) layout the dynamics / diffusion / jump callbacks receive during policy improvement), and the returned array is broadcastable to ``(n,)`` and truthy for ADMISSIBLE pairs. During policy improvement the Hamiltonian at inadmissible ``(state, control)`` pairs is set to ``-inf`` so the argmax never selects them -- a hard control constraint, not a soft penalty. The constraint is applied ONLY to control selection; the value-function update then marches under the (now feasible) optimal policy, so the returned value reflects the constrained optimum. If every sampled control is inadmissible at a state, the solver falls back to the first control sample of each control variable (``ControlVariable.get_values()[0]`` -- the grid minimum, e.g. the smallest retention / most coverage), the safe default for a state that cannot satisfy the constraint with any admissible control. Defaults to ``None`` (every control admissible), so solves that do not set it are unchanged. Used by notebook 07 to bound the retention by the endogenous single-loss-insolvency threshold ``T = (E - E_floor) / (1 + LAE)`` (issue #1649), making ``SIR* <= equity`` hold by construction without a deployment-time cap. """
[docs] def __post_init__(self): """Validate problem specification.""" if self.time_horizon is not None and self.time_horizon <= 0: raise ValueError("Time horizon must be positive") if self.discount_rate < 0: raise ValueError("Discount rate must be non-negative") # For finite horizon problems, terminal value is required if self.time_horizon is not None and self.terminal_value is None: # Default to zero terminal value self.terminal_value = lambda x: np.zeros_like(x[..., 0])
[docs] @dataclass class HJBSolverConfig: """Configuration for HJB solver. Attributes: auto_cfl: EXPLICIT scheme only. When True (default) the solver runs a one-off CFL stability trial and auto-reduces ``time_step`` if the configured step is unstable. Set False to take explicit Euler steps at exactly ``time_step`` and skip the auto-CFL trial -- appropriate on log-spaced grids where the conservative global-max-sigma^2 / global-min-dx CFL estimate (which pairs coefficients and spacings from opposite ends of the grid) collapses ``dt`` needlessly. The caller then owns dt stability; the per-step NaN/Inf guard still applies and :meth:`HJBSolver.compute_cfl_diagnostics` reports the realized (local) CFL. See issue #1611. """ time_step: float = 0.01 max_iterations: int = 1000 tolerance: float = 1e-6 scheme: TimeSteppingScheme = TimeSteppingScheme.EXPLICIT auto_cfl: bool = True use_sparse: bool = True verbose: bool = True inner_max_iterations: int = 100 inner_tolerance_factor: float = 0.1 # inner_tol = tolerance * this rannacher_steps: int = 2 # Number of implicit half-step pairs for CN startup control_search_strategy: str = "auto" # "auto", "vectorized", "adaptive", "loop", "gradient" control_memory_budget_mb: int = 256 # Max memory for batched control evaluation
[docs] class CFLDiagnostics(TypedDict): """Realized-CFL report returned by :meth:`HJBSolver.compute_cfl_diagnostics`.""" advection_cfl: float diffusion_cfl: float cfl: float dt: float stable: bool
[docs] @dataclass class HamiltonianTerms: r"""Per-term HJB Hamiltonian on the state grid at a fixed control policy. Returned by :meth:`HJBSolver.hamiltonian_terms`. The HJB operator the solver integrates is .. math:: -\partial_t V = \mathcal{H} = f(x,u) + \mu(x,u)\cdot\nabla V + \tfrac12\sigma^2(x,u)\cdot\nabla^2 V + \lambda\,\mathbb{E}_X[V(x-L)-V(x)] - \rho V so :attr:`total` equals the solver's :math:`-\partial_t V` at this ``(value, policy)`` and satisfies the exact decomposition .. math:: \text{total} = \text{drift} + \text{diffusion} + \text{jump} + \text{running\_cost} + \text{discount}. Crucially, :attr:`drift`, :attr:`diffusion` and :attr:`jump` are computed with the **same discretization the solver integrates** -- the upwind first derivative (:meth:`HJBSolver._apply_upwind_scheme`), the non-uniform compact second-derivative stencil (:meth:`HJBSolver._compute_second_derivatives`), and the problem's ``jump_term`` callback -- **not** ``np.gradient``. They therefore reproduce the forces the solver actually balanced rather than re-deriving them from ``V`` (which, via a twice-applied central difference, amplifies grid-scale noise by :math:`\sim 1/\Delta x^2`). Attributes: drift: :math:`\mu(x,u)\cdot\nabla V` using the solver's **upwind** ``∇V``. diffusion: :math:`\tfrac12\sigma^2(x,u)\cdot\nabla^2 V` using the compact non-uniform central stencil; all-zeros if the problem has no diffusion. jump: :math:`\lambda\,\mathbb{E}_X[V(x-L)-V(x)]` from the ``jump_term`` callback; all-zeros if the problem has no jump term. running_cost: :math:`f(x,u)`. discount: :math:`-\rho V`. total: the full Hamiltonian :math:`-\partial_t V` (sum of the five above). grad: central-difference :math:`\nabla V`, provided **for display only** (e.g. a smooth ``∂V/∂A`` panel). Shape ``state_shape + (ndim,)``. This is **not** the upwind ``∇V`` used inside :attr:`drift`, so in general ``drift != sum_d mu_d * grad_d`` -- the two are intentionally different discretizations (upwind for the faithful operator, central for display). d2v: the compact non-uniform :math:`\nabla^2 V` used by :attr:`diffusion` (``diffusion = sum_d 0.5 * sigma^2_d * d2v_d``). Shape ``state_shape + (ndim,)``. All scalar-field attributes have shape ``state_space.shape``. """ drift: np.ndarray diffusion: np.ndarray jump: np.ndarray running_cost: np.ndarray discount: np.ndarray total: np.ndarray grad: np.ndarray d2v: np.ndarray
[docs] class HJBSolver: """Hamilton-Jacobi-Bellman PDE solver for optimal control. Implements finite difference methods with upwind schemes for solving HJB equations. Supports multi-dimensional state spaces and various boundary conditions. """ def __init__(self, problem: HJBProblem, config: HJBSolverConfig): """Initialize HJB solver. Args: problem: HJB problem specification config: Solver configuration """ self.problem = problem self.config = config # Initialize value function and policy self.value_function: np.ndarray | None = None self.optimal_policy: dict[str, np.ndarray] | None = None # Optional per-step policy snapshots, populated by solve_finite_horizon # when store_policy_history=True (oldest = nearest the terminal time). self.policy_history: Optional[List[Dict[str, np.ndarray]]] = None # Boundary values for Dirichlet BCs (captured from initial/terminal condition) self._boundary_values: dict[int, dict[str, np.ndarray]] | None = None # Cache for factorized sparse operators: (theta, dt, dim) -> (solve_func, B_or_None) self._operator_cache: Dict[ Tuple[float, float, int], Tuple[Any, Optional[sparse.spmatrix]] ] = {} # Set up finite difference operators self._setup_operators() logger.info(f"Initialized HJB solver for {problem.state_space.ndim}D problem") def _setup_operators(self): """Set up finite difference operators for the PDE.""" # Store per-interval grid spacings as arrays (supports non-uniform grids) self.dx = [] for sv in self.problem.state_space.state_variables: grid = sv.get_grid() if len(grid) > 1: self.dx.append(np.diff(grid)) else: self.dx.append(np.array([1.0])) # Will construct operators during solve based on current policy self.operators_initialized = True def _compute_cfl_number( self, drift: np.ndarray, sigma_sq: np.ndarray | None, dt: float, ) -> tuple[float, float]: """Compute CFL numbers for advection and diffusion stability. Args: drift: Drift values on the state grid, shape state_shape + (ndim,) sigma_sq: Diffusion coefficients on state grid, shape state_shape + (ndim,), or None if no diffusion. dt: Time step size. Returns: Tuple of (advection_cfl, diffusion_cfl). """ advection_cfl = 0.0 diffusion_cfl = 0.0 n_dims = min(drift.shape[-1], len(self.problem.state_space.state_variables)) for dim in range(n_dims): dx_arr = self.dx[dim] dx_min = float(np.min(dx_arr)) dx_mean = float(np.mean(dx_arr)) drift_max = float(np.max(np.abs(drift[..., dim]))) if dx_min > 0: advection_cfl = max(advection_cfl, drift_max * dt / dx_min) if sigma_sq is not None and dx_min > 0: # Effective diffusion coefficient is D = 0.5 * sigma_sq # Use dx_min (not dx_mean) — stability is governed by the # smallest grid spacing, which matters for non-uniform grids. sigma_sq_max = float(np.max(np.abs(sigma_sq[..., dim]))) diffusion_cfl = max(diffusion_cfl, 0.5 * sigma_sq_max * dt / (dx_min**2)) return advection_cfl, diffusion_cfl def _local_node_spacing(self, dim: int) -> np.ndarray: """Per-node local grid spacing along ``dim`` (length == grid size). For interior nodes this is the smaller of the two adjacent intervals (the binding spacing for explicit stability); the endpoints use their single adjacent interval. Works for non-uniform / log-spaced grids. """ diffs = np.atleast_1d(self.dx[dim]) n = int(self.problem.state_space.shape[dim]) if n <= 1: return np.ones(1) if diffs.size == 1: return np.full(n, float(diffs[0])) node_dx = np.empty(n) node_dx[0] = diffs[0] node_dx[-1] = diffs[-1] node_dx[1:-1] = np.minimum(diffs[:-1], diffs[1:]) return node_dx
[docs] def compute_cfl_diagnostics(self, dt: Optional[float] = None) -> CFLDiagnostics: """Realized (local) CFL numbers at the current policy and value. A post-solve stability guardrail, primarily for explicit solves run with ``auto_cfl=False``: it confirms the configured ``time_step`` actually satisfies the explicit-Euler CFL condition at the converged policy. Unlike the conservative auto-CFL trial (:meth:`_compute_cfl_number`, which pairs the *global* max diffusion/drift with the *global* min grid spacing), this pairs each grid cell's coefficients with that **same cell's** local spacing, then maxes over cells. On a log-spaced grid where ``sigma^2 ~ x^2`` and ``dx ~ x`` the global extrema sit at opposite ends and the global estimate is pathologically large; the local number here is the honest stability ratio. Args: dt: Step size to evaluate the CFL numbers at. Defaults to ``config.time_step``. Returns: Dict with ``advection_cfl`` and ``diffusion_cfl`` (per-dimension maxima over cells), ``cfl`` (the combined per-cell stability number ``dt * (sum_d |a_d|/dx_d + 0.5 sigma_d^2/dx_d^2) + dt * rho``), ``dt``, and ``stable`` (True iff ``cfl <= 1``). Raises: RuntimeError: If called before a solve (no policy/value available). """ if self.value_function is None or self.optimal_policy is None: raise RuntimeError( "compute_cfl_diagnostics requires a prior solve() / " "solve_finite_horizon() to populate the policy and value function." ) dt = float(self.config.time_step if dt is None else dt) state_points = np.stack(self.problem.state_space.flat_grids, axis=-1) control_array = np.stack( [self.optimal_policy[cv.name].ravel() for cv in self.problem.control_variables], axis=-1, ) drift = self.problem.dynamics(state_points, control_array, 0.0) drift = drift.reshape(self.problem.state_space.shape + (-1,)) sigma_sq = None if self.problem.diffusion is not None: sigma_sq_raw = self.problem.diffusion(state_points, control_array, 0.0) sigma_sq = sigma_sq_raw.reshape(self.problem.state_space.shape + (-1,)) ndim = self.problem.state_space.ndim n_dims = min(drift.shape[-1], len(self.problem.state_space.state_variables)) combined_rate = np.full(self.problem.state_space.shape, float(self.problem.discount_rate)) advection_cfl = 0.0 diffusion_cfl = 0.0 for dim in range(n_dims): node_dx = self._local_node_spacing(dim) bshape = [1] * ndim bshape[dim] = node_dx.size node_dx_b = node_dx.reshape(bshape) adv_rate = np.abs(drift[..., dim]) / node_dx_b advection_cfl = max(advection_cfl, float(np.max(adv_rate)) * dt) combined_rate = combined_rate + adv_rate if sigma_sq is not None: diff_rate = 0.5 * np.abs(sigma_sq[..., dim]) / node_dx_b**2 diffusion_cfl = max(diffusion_cfl, float(np.max(diff_rate)) * dt) combined_rate = combined_rate + diff_rate cfl = float(np.max(combined_rate)) * dt return { "advection_cfl": advection_cfl, "diffusion_cfl": diffusion_cfl, "cfl": cfl, "dt": dt, "stable": bool(cfl <= 1.0), }
def _warn_unsupported_scheme(self) -> None: """Surface an implicit/CN -> explicit fallback so it is never silent (#1611). Implicit and Crank-Nicolson are only implemented for 1-D state spaces; for higher dimensions :meth:`_advance_value_one_step` falls back to explicit Euler. Logging it here -- once per solve call, regardless of the CFL-check flag -- means a reader who sets ``scheme=IMPLICIT`` on a multi-dimensional problem cannot mistake the result for an unconditionally-stable implicit solve. """ scheme = self.config.scheme if ( scheme in (TimeSteppingScheme.IMPLICIT, TimeSteppingScheme.CRANK_NICOLSON) and self.problem.state_space.ndim != 1 ): logger.warning( "%s scheme is only implemented for 1-D state spaces; this %dD " "problem is falling back to explicit Euler at the configured " "time_step (NOT an implicit, unconditionally-stable solve). Set " "scheme=EXPLICIT (optionally with auto_cfl=False) to make this " "explicit and future-proof.", scheme.value, self.problem.state_space.ndim, ) def _build_spatial_operator_1d( self, drift_1d: np.ndarray, sigma_sq_1d: np.ndarray | None, dim: int = 0, ) -> sparse.spmatrix: """Build the 1D spatial operator L as a sparse tridiagonal matrix. The operator represents: L(V)[i] = -rho*V[i] + drift*dV/dx + 0.5*sigma^2*d2V/dx2 using upwind first derivatives and non-uniform central second derivatives, consistent with the explicit scheme. The non-uniform second-derivative stencil is: d²V/dx²|_i = 2/(h_b+h_f) * [V[i+1]/h_f - V[i]*(1/h_b+1/h_f) + V[i-1]/h_b] where h_b = x[i]-x[i-1] and h_f = x[i+1]-x[i]. This is exact for quadratic functions and reduces to (V[i+1]-2V[i]+V[i-1])/h² on uniform grids. Args: drift_1d: Drift values at each grid point, shape (N,). sigma_sq_1d: Diffusion coefficient sigma^2 at each grid point, shape (N,), or None for pure advection. dim: Dimension index (for grid spacing lookup). Returns: Sparse CSR matrix of shape (N, N). """ N = len(drift_1d) dx_arr = self.dx[dim] rho = self.problem.discount_rate # Sub-diagonal (V[i-1] coefficients), super-diagonal (V[i+1] coefficients), # main diagonal (V[i] coefficients) — all for interior points sub = np.zeros(N) main = np.zeros(N) sup = np.zeros(N) # Interior points: i = 1, ..., N-2 interior = slice(1, N - 1) drift_int = drift_1d[interior] drift_pos = np.maximum(drift_int, 0.0) drift_neg = np.minimum(drift_int, 0.0) # dx for backward diff at point i: dx_arr[i-1] (h_b) dx_back = dx_arr[0 : N - 2] # dx for forward diff at point i: dx_arr[i] (h_f) dx_fwd = dx_arr[1 : N - 1] # Diffusion coefficient at interior points: D = 0.5 * sigma^2 if sigma_sq_1d is not None: D = 0.5 * sigma_sq_1d[interior] else: D = np.zeros(N - 2) # Non-uniform diffusion stencil coefficients: # d²V/dx²|_i = c_lo*V[i-1] + c_mid*V[i] + c_hi*V[i+1] # where: # c_lo = 2 / (h_b * (h_b + h_f)) # c_mid = -2 / (h_b * h_f) # c_hi = 2 / (h_f * (h_b + h_f)) h_sum = dx_back + dx_fwd diff_sub = D * 2.0 / (dx_back * h_sum) diff_main = -D * 2.0 / (dx_back * dx_fwd) diff_sup = D * 2.0 / (dx_fwd * h_sum) # Operator coefficients (consistent with _apply_upwind_scheme). # # The HJB PDE is V_t = drift*V_x + ..., which is V_t + (-drift)*V_x = 0. # The effective advection coefficient is a = -drift, so the upwind # direction is OPPOSITE to standard advection references that assume # V_t + a*V_x = 0 with a = drift. Concretely: # drift > 0 => a < 0 => upwind = forward diff # drift < 0 => a > 0 => upwind = backward diff sub[interior] = -drift_neg / dx_back + diff_sub main[interior] = -drift_pos / dx_fwd + drift_neg / dx_back + diff_main - rho sup[interior] = drift_pos / dx_fwd + diff_sup # Boundary rows are zero (will be handled after solve) L = sparse.diags( [sub[1:], main, sup[:-1]], offsets=[-1, 0, 1], shape=(N, N), format="csr", ) return L def _invalidate_operator_cache(self): """Clear the cached sparse matrix factorizations. Must be called whenever drift or diffusion coefficients may change (i.e., at the start of each policy evaluation cycle). """ self._operator_cache.clear() def _theta_step_1d( self, old_v: np.ndarray, cost: np.ndarray, drift_1d: np.ndarray, sigma_sq_1d: np.ndarray | None, dt: float, theta: float, dim: int = 0, ) -> np.ndarray: """Perform one theta-scheme time step for a 1D problem. Solves: (I - theta*dt*L)*V_new = (I + (1-theta)*dt*L)*V_old + dt*cost For theta=1: fully implicit (backward Euler). For theta=0.5: Crank-Nicolson. For theta=0: explicit (forward Euler). Args: old_v: Current value function, shape (N,). cost: Running cost at each grid point, shape (N,). drift_1d: Drift at each grid point, shape (N,). sigma_sq_1d: Diffusion coefficient sigma^2, shape (N,) or None. dt: Time step. theta: Implicitness parameter in [0, 1]. dim: Dimension index. Returns: Updated value function, shape (N,). """ N = len(old_v) cache_key = (theta, dt, dim) if cache_key in self._operator_cache: # Cache hit: reuse factorized solver and B matrix solve_func, B = self._operator_cache[cache_key] else: # Cache miss: build, factorize, and store L = self._build_spatial_operator_1d(drift_1d, sigma_sq_1d, dim) I_mat = sparse.eye(N, format="csr") # LHS: A = I - theta*dt*L A = I_mat - theta * dt * L # Set boundary rows to identity (boundary values handled by # _apply_boundary_conditions after return) A = A.tolil() A[0, :] = 0 A[0, 0] = 1.0 A[N - 1, :] = 0 A[N - 1, N - 1] = 1.0 A = A.tocsc() # Factorize once via SuperLU solve_func = sparse.linalg.splu(A).solve # Build B matrix for Crank-Nicolson (theta < 1) if theta < 1.0: B = I_mat + (1.0 - theta) * dt * L else: B = None self._operator_cache[cache_key] = (solve_func, B) # RHS: B @ old_v + dt * cost if B is not None: rhs = B @ old_v + dt * cost else: rhs = old_v + dt * cost # Preserve old boundary values in RHS (will be overwritten by BCs) rhs[0] = old_v[0] rhs[N - 1] = old_v[N - 1] new_v: np.ndarray = solve_func(rhs) return new_v def _build_difference_matrix( self, dim: int, boundary_type: BoundaryCondition ) -> sparse.spmatrix: """Build finite difference matrix for one dimension. Args: dim: Dimension index boundary_type: Type of boundary condition Returns: Sparse difference operator """ n = self.problem.state_space.state_variables[dim].num_points dx = float(np.mean(self.dx[dim])) # First derivative (upwind) # We'll build this dynamically based on drift direction # Second derivative (diffusion) diagonals = np.ones((3, n)) diagonals[0] *= 1.0 / (dx * dx) # Lower diagonal diagonals[1] *= -2.0 / (dx * dx) # Main diagonal diagonals[2] *= 1.0 / (dx * dx) # Upper diagonal # Apply boundary conditions if boundary_type == BoundaryCondition.DIRICHLET: # Fixed value at boundaries (handled separately) diagonals[0, 0] = 0 diagonals[1, 0] = 1 diagonals[2, 0] = 0 diagonals[0, -1] = 0 diagonals[1, -1] = 1 diagonals[2, -1] = 0 elif boundary_type in (BoundaryCondition.NEUMANN, BoundaryCondition.REFLECTING): # Zero first derivative at boundaries (ghost-node reflection) diagonals[1, 0] += diagonals[0, 0] diagonals[0, 0] = 0 diagonals[1, -1] += diagonals[2, -1] diagonals[2, -1] = 0 elif boundary_type == BoundaryCondition.ABSORBING: # Absorbing boundary: enforce d²V/dx² = 0 (linear extrapolation) # Lower boundary: V[0] - 2*V[1] + V[2] = 0 # Upper boundary: V[n-3] - 2*V[n-2] + V[n-1] = 0 # Row 0 needs entry at column 2 (offset +2), and row n-1 needs # entry at column n-3 (offset -2), so we build as tridiagonal # then add the extra entries. coeff = 1.0 / (dx * dx) # Row 0: [coeff, -2*coeff, 0, ..., 0] (tridiagonal part) diagonals[1, 0] = coeff diagonals[2, 0] = -2.0 * coeff # Row n-1: [0, ..., 0, -2*coeff, coeff] (tridiagonal part) diagonals[0, n - 2] = -2.0 * coeff diagonals[1, n - 1] = coeff matrix = sparse.diags(diagonals, offsets=[-1, 0, 1], shape=(n, n)) if boundary_type == BoundaryCondition.ABSORBING: # Add the off-tridiagonal entries for absorbing BCs coeff = 1.0 / (dx * dx) matrix = matrix.tolil() matrix[0, 2] = coeff # V[2] coefficient in lower boundary row matrix[n - 1, n - 3] = coeff # V[n-3] coefficient in upper boundary row matrix = matrix.tocsr() return matrix def _apply_upwind_scheme(self, value: np.ndarray, drift: np.ndarray, dim: int) -> np.ndarray: """Apply upwind finite difference for advection term. Uses proper boundary-aware slicing (no wraparound) and supports non-uniform grid spacing for all dimensions. Args: value: Value function on state grid drift: Drift values at each grid point dim: Dimension for differentiation Returns: Advection term contribution (drift * dV/dx) """ dx_array = self.dx[dim] # Array of per-interval spacings ndim = value.ndim n = value.shape[dim] result = np.zeros_like(value) # Build index slices for interior points hi = [slice(None)] * ndim lo = [slice(None)] * ndim hi[dim] = slice(1, None) # indices 1..N-1 lo[dim] = slice(None, -1) # indices 0..N-2 # Shape dx_array for broadcasting: (1, ..., N-1, ..., 1) dx_shape = [1] * ndim dx_shape[dim] = n - 1 dx_bc = dx_array.reshape(dx_shape) # Forward difference: (V[i+1] - V[i]) / dx[i] for i=0..N-2 # At i=N-1, the forward difference is 0 (boundary) fwd_diff = np.zeros_like(value) fwd_diff[tuple(lo)] = (value[tuple(hi)] - value[tuple(lo)]) / dx_bc # Backward difference: (V[i] - V[i-1]) / dx[i-1] for i=1..N-1 # At i=0, the backward difference is 0 (boundary) bwd_diff = np.zeros_like(value) bwd_diff[tuple(hi)] = (value[tuple(hi)] - value[tuple(lo)]) / dx_bc # Upwind selection based on the HJB sign convention V_t = drift*V_x + ... # The effective advection coefficient is -drift, so: # drift > 0 => forward diff (upwind for negative effective coefficient) # drift < 0 => backward diff (upwind for positive effective coefficient) mask_pos = drift > 0 mask_neg = drift < 0 result[mask_pos] = fwd_diff[mask_pos] * drift[mask_pos] result[mask_neg] = bwd_diff[mask_neg] * drift[mask_neg] return result def _compute_gradient(self, value: Optional[np.ndarray] = None) -> np.ndarray: """Compute the central-difference numerical gradient of a value function. Uses np.gradient which handles non-uniform grids with second-order accurate central differences in the interior and first-order accurate one-sided differences at the boundaries. This is a *single* finite difference (smooth); it is NOT the upwind ∇V the solver uses to integrate the advection term -- see :meth:`_apply_upwind_scheme`. Args: value: Value function to differentiate. Defaults to ``self.value_function``. Returns: Gradient array with shape state_shape + (ndim,) """ if value is None: value = self.value_function if value is None: shape = self.problem.state_space.shape + (self.problem.state_space.ndim,) return np.zeros(shape) grids = self.problem.state_space.grids if self.problem.state_space.ndim == 1: grad_components = [np.gradient(value, grids[0])] else: grad_components = np.gradient(value, *grids) return np.stack(grad_components, axis=-1) def _compute_second_derivatives(self, value: np.ndarray) -> np.ndarray: """Compute second derivatives d²V/dx_i² for each dimension. Uses the non-uniform central difference formula for interior points: d²V/dx²|_i = 2/(h_b+h_f) * [(V[i+1]-V[i])/h_f - (V[i]-V[i-1])/h_b] where h_b = x[i]-x[i-1] and h_f = x[i+1]-x[i]. This is exact for quadratic functions on arbitrary grids and reduces to the standard (V[i+1]-2V[i]+V[i-1])/h² on uniform grids. Boundary values default to zero (no diffusion contribution at boundaries). Args: value: Value function on state grid Returns: Array with shape state_shape + (ndim,) containing d²V/dx_i² """ ndim = self.problem.state_space.ndim components = [] for dim in range(ndim): dx_array = self.dx[dim] d2v = np.zeros_like(value) hi = [slice(None)] * ndim mid = [slice(None)] * ndim lo = [slice(None)] * ndim hi[dim] = slice(2, None) mid[dim] = slice(1, -1) lo[dim] = slice(None, -2) # Per-interval spacings for interior points i = 1..N-2 # h_f[i] = dx_array[i] (forward: x[i+1] - x[i]) # h_b[i] = dx_array[i-1] (backward: x[i] - x[i-1]) n = value.shape[dim] h_b = dx_array[0 : n - 2] # backward spacing for interior points h_f = dx_array[1 : n - 1] # forward spacing for interior points # Broadcast spacings for multi-dimensional arrays bc_shape = [1] * ndim bc_shape[dim] = n - 2 h_b_bc = h_b.reshape(bc_shape) h_f_bc = h_f.reshape(bc_shape) # Non-uniform central difference: # d²V/dx² = 2/(h_b+h_f) * [(V[i+1]-V[i])/h_f - (V[i]-V[i-1])/h_b] d2v[tuple(mid)] = ( 2.0 / (h_b_bc + h_f_bc) * ( (value[tuple(hi)] - value[tuple(mid)]) / h_f_bc - (value[tuple(mid)] - value[tuple(lo)]) / h_b_bc ) ) components.append(d2v) return np.stack(components, axis=-1) def _initialize_solution_state(self) -> None: """Initialize the value function, Dirichlet boundary values, and policy. Shared by ``solve`` (policy iteration / infinite horizon) and ``solve_finite_horizon`` (backward time-march). Precedence for the initial value function: 1. ``initial_value_function`` (warm-start) if provided — for a finite-horizon problem this overrides ``terminal_value`` with a logged warning. 2. ``terminal_value`` if the problem is finite-horizon. 3. Zeros (infinite horizon, no warm-start). Dirichlet boundary values are captured from the initialized value function and re-imposed every time step; the policy is seeded with mid-range controls. """ # Initialize value function if self.problem.initial_value_function is not None: if self.problem.time_horizon is not None and self.problem.terminal_value is not None: logger.warning( "initial_value_function provided for a finite-horizon problem; " "ignoring terminal_value (which may have been auto-generated as " "zero by HJBProblem.__post_init__ if not explicitly set)." ) state_points = np.stack(self.problem.state_space.meshgrid, axis=-1) init_v = self.problem.initial_value_function(state_points) self.value_function = init_v.reshape(self.problem.state_space.shape) elif self.problem.time_horizon is not None: # Finite horizon: start from terminal condition state_points = np.stack(self.problem.state_space.flat_grids, axis=-1) if self.problem.terminal_value is not None: terminal_values = self.problem.terminal_value(state_points) self.value_function = terminal_values.reshape(self.problem.state_space.shape) else: self.value_function = np.zeros(self.problem.state_space.shape) else: # Infinite horizon: initialize with zeros or heuristic self.value_function = np.zeros(self.problem.state_space.shape) # Capture boundary values for Dirichlet enforcement self._boundary_values = {} ndim = self.problem.state_space.ndim for dim in range(ndim): sv = self.problem.state_space.state_variables[dim] if BoundaryCondition.DIRICHLET in (sv.boundary_lower, sv.boundary_upper): lo_idx: List[Any] = [slice(None)] * ndim hi_idx: List[Any] = [slice(None)] * ndim lo_idx[dim] = 0 hi_idx[dim] = -1 self._boundary_values[dim] = { "lower": self.value_function[tuple(lo_idx)].copy(), "upper": self.value_function[tuple(hi_idx)].copy(), } # Initialize policy with mid-range controls if self.optimal_policy is None: self.optimal_policy = {} for cv in self.problem.control_variables: self.optimal_policy[cv.name] = np.full( self.problem.state_space.shape, (cv.min_value + cv.max_value) / 2 )
[docs] def solve(self) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: """Solve the HJB equation using policy iteration. Returns: Tuple of (value_function, optimal_policy_dict) """ logger.info("Starting HJB solution with policy iteration") self._warn_unsupported_scheme() self._initialize_solution_state() # _initialize_solution_state assigns both; narrow for the type checker # (the assignment is no longer inline in this method after the refactor). assert self.value_function is not None and self.optimal_policy is not None # Policy iteration for iteration in range(self.config.max_iterations): old_value = self.value_function.copy() # Policy evaluation step self._policy_evaluation() # Check for NaN/Inf after policy evaluation (#453) if not np.all(np.isfinite(self.value_function)): n_nan = int(np.sum(np.isnan(self.value_function))) n_inf = int(np.sum(np.isinf(self.value_function))) raise NumericalDivergenceError( f"HJB solver diverged at outer iteration {iteration}: " f"value function contains {n_nan} NaN and {n_inf} Inf values. " f"Consider reducing time_step (current: {self.config.time_step}) " f"or using an implicit scheme." ) # Policy improvement step self._policy_improvement() # Check convergence value_change = np.max(np.abs(self.value_function - old_value)) if self.config.verbose and iteration % 10 == 0: logger.info(f"Iteration {iteration}: value change = {value_change:.6e}") if value_change < self.config.tolerance: logger.info(f"Converged after {iteration + 1} iterations") break if iteration == self.config.max_iterations - 1: logger.warning("Max iterations reached without convergence") return self.value_function, self.optimal_policy
[docs] def solve_finite_horizon( self, store_policy_history: bool = False, progress_every: int = 0, ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: r"""Solve a finite-horizon HJB problem by backward time-marching. Solves the time-dependent HJB .. math:: \partial_t V + \max_u \big[ f(x,u) + \mu(x,u)\cdot\nabla V + \tfrac12 \sigma^2(x,u)\,\nabla^2 V + \lambda(x)\,\mathbb{E}_X[V(x - L) - V(x)] \big] = 0 on :math:`t \in [0, T]` with the terminal condition :math:`V(\cdot, T) = \text{terminal\_value}`, marching **backward** from :math:`t=T` to :math:`t=0`. At each backward step the control is re-optimized (the Bellman max) given the current continuation value :math:`V(\cdot, t+dt)`, then :math:`V` is advanced one step. Unlike :meth:`solve` (Howard policy iteration, which converges to the infinite-horizon *stationary* policy and collapses the time dimension), this tracks time explicitly. The returned policy is the **t=0 decision** (the argmax of the Hamiltonian against :math:`V(\cdot, 0)`), i.e. the optimal control today with :math:`T` years remaining. This is the right formulation when accumulation risk matters: a heavily-retaining strategy cannot "grow out of" a sequence of losses before the horizon, so the value function — and hence the policy — internalizes multi-year (sequential) ruin rather than scoring each jump independently against current wealth. Pairing a terminal condition :math:`V(\cdot, T)=\log(w)` with zero discount and zero running cost makes :math:`V(\cdot, 0) = \max_u \mathbb{E}[\log W_T]`, so the HJB and a terminal-log-wealth Monte Carlo answer the *same* finite-horizon question. The number of backward steps is ``round(T / config.time_step)``; ``config.time_step`` is then rescaled so the march lands exactly at :math:`t=0`. For the explicit scheme a CFL-safe step is computed once up front so the step count stays fixed. Args: store_policy_history: When True, store per-step policy snapshots on ``self.policy_history`` (oldest first, i.e. nearest the terminal time first). The list has one entry per backward step. progress_every: If > 0 and ``config.verbose``, log progress every this many steps. Returns: Tuple of ``(value_function, optimal_policy)`` at :math:`t=0`. Raises: ValueError: If ``problem.time_horizon`` is None (use :meth:`solve` for infinite-horizon problems). """ if self.problem.time_horizon is None: raise ValueError( "solve_finite_horizon requires a finite problem.time_horizon; " "use solve() for infinite-horizon (stationary) problems." ) logger.info("Starting HJB finite-horizon backward time-march") self._warn_unsupported_scheme() horizon = float(self.problem.time_horizon) scheme = self.config.scheme n_steps = max(1, int(round(horizon / self.config.time_step))) dt = horizon / n_steps self._initialize_solution_state() assert self.value_function is not None and self.optimal_policy is not None # Explicit scheme: pre-compute a CFL-safe dt once (against the initial # mid-range policy) so the step count is fixed and the march lands # exactly at t=0 without mid-march dt changes. The trial step's value is # discarded; only the (possibly reduced) dt is kept. Skipped when # auto_cfl=False (#1611): the caller has opted to step at exactly # config.time_step (e.g. a log-grid where the global CFL estimate is # pathologically conservative); the per-step NaN/Inf guard still applies. if scheme == TimeSteppingScheme.EXPLICIT and self.config.auto_cfl: self._invalidate_operator_cache() _, dt_safe = self._advance_value_one_step(dt, scheme, step_index=0, run_cfl_check=True) if dt_safe < dt: n_steps = max(1, int(np.ceil(horizon / dt_safe))) dt = horizon / n_steps self.policy_history = [] if store_policy_history else None for step in range(n_steps): # Policy is re-optimized every step → cached operators are stale. self._invalidate_operator_cache() # Bellman max: optimal control given the continuation value V(·, t+dt). self._policy_improvement() if self.policy_history is not None: self.policy_history.append({k: v.copy() for k, v in self.optimal_policy.items()}) # Advance V one step backward in time under the improved policy. new_v, _ = self._advance_value_one_step( dt, scheme, step_index=step, run_cfl_check=False ) self.value_function = new_v if progress_every > 0 and self.config.verbose and step % progress_every == 0: t_now = horizon - (step + 1) * dt logger.info( f"Finite-horizon march: step {step + 1}/{n_steps}, t={t_now:.3f}, " f"V range [{float(np.min(new_v)):.3e}, {float(np.max(new_v)):.3e}]" ) # Final Bellman max so the returned policy is the argmax against # V(·, 0) — the decision today, with the full horizon ahead. self._invalidate_operator_cache() self._policy_improvement() logger.info(f"Finite-horizon march complete ({n_steps} steps, dt={dt:.4f})") return self.value_function, self.optimal_policy
def _reshape_cost(self, cost): """Helper method to reshape cost array.""" if hasattr(cost, "ndim") and cost.ndim > 1: # If cost is multi-dimensional, take the first column or mean if cost.shape[1] > 1: cost = np.mean(cost, axis=1) # Average across extra dimensions else: cost = cost[:, 0] # Take first column return cost.reshape(self.problem.state_space.shape) def _apply_upwind_drift(self, new_v, drift, dt): """Apply upwind differencing for drift term across all dimensions.""" if not np.any(np.abs(drift) > _DRIFT_THRESHOLD): return new_v if self.value_function is None: return new_v for dim in range(drift.shape[-1]): if dim >= len(self.problem.state_space.state_variables): continue drift_component = drift[..., dim] advection = self._apply_upwind_scheme(self.value_function, drift_component, dim) new_v = new_v + dt * advection return new_v def _evaluate_jump_term( self, state_points: np.ndarray, control_array: np.ndarray, value: np.ndarray, ) -> np.ndarray: """Evaluate the user-supplied PIDE jump term and reduce to a scalar field. Wraps the callback so the same shape-coercion logic lives in one place for both ``_policy_evaluation`` (state-grid-shaped output) and ``_policy_improvement`` (flat-per-(combo,state) output). Args: state_points: State coordinates of shape (n, ndim). control_array: Controls of shape (n, n_controls). value: Current value function on the state grid, shape state_space.shape. Returns: Flat array of shape (n,) giving the scalar jump contribution per point. """ assert self.problem.jump_term is not None raw = self.problem.jump_term( state_points, control_array, 0.0, value, self.problem.state_space.grids, ) arr = np.asarray(raw) # Tolerate (n, 1) trailing axis even though the documented contract is # scalar-per-point — squeezes a common user mistake without surprises. if arr.ndim > 1 and arr.shape[-1] == 1: arr = arr[..., 0] return arr.ravel() def _apply_diffusion_term( self, value: np.ndarray, sigma_sq: np.ndarray, ) -> np.ndarray: """Compute diffusion contribution ½σ²∇²V. Args: value: Value function on state grid sigma_sq: Diffusion coefficients σ²(x,u) with shape state_shape + (ndim,) Returns: Diffusion term (scalar field, same shape as value) """ d2v = self._compute_second_derivatives(value) diffusion = np.zeros_like(value) n_dims = min(sigma_sq.shape[-1], d2v.shape[-1]) for dim in range(n_dims): if dim >= len(self.problem.state_space.state_variables): continue diffusion += 0.5 * sigma_sq[..., dim] * d2v[..., dim] return diffusion def _apply_boundary_conditions(self, value: np.ndarray) -> np.ndarray: """Enforce boundary conditions on the value function. Applies the prescribed boundary condition for each dimension: - ABSORBING: linear extrapolation so d²V/dx² = 0 at boundary - DIRICHLET: reset boundary to prescribed (initial/terminal) values - NEUMANN / REFLECTING: copy adjacent interior value so dV/dx = 0 Args: value: Value function on state grid. Returns: Value function with boundary conditions enforced. """ ndim = self.problem.state_space.ndim result = value.copy() for dim in range(ndim): sv = self.problem.state_space.state_variables[dim] # --- lower boundary --- lo: List[Any] = [slice(None)] * ndim p1: List[Any] = [slice(None)] * ndim p2: List[Any] = [slice(None)] * ndim lo[dim] = 0 p1[dim] = 1 p2[dim] = 2 if sv.boundary_lower == BoundaryCondition.ABSORBING: # V[0] = 2*V[1] - V[2] → d²V/dx² = 0 result[tuple(lo)] = 2.0 * result[tuple(p1)] - result[tuple(p2)] elif sv.boundary_lower == BoundaryCondition.DIRICHLET: # Reset to initial value (stored during initialization) if self._boundary_values is not None and dim in self._boundary_values: result[tuple(lo)] = self._boundary_values[dim]["lower"] elif sv.boundary_lower in ( BoundaryCondition.NEUMANN, BoundaryCondition.REFLECTING, ): # dV/dx = 0 → V[0] = V[1] result[tuple(lo)] = result[tuple(p1)] # --- upper boundary --- hi: List[Any] = [slice(None)] * ndim m1: List[Any] = [slice(None)] * ndim m2: List[Any] = [slice(None)] * ndim hi[dim] = -1 m1[dim] = -2 m2[dim] = -3 if sv.boundary_upper == BoundaryCondition.ABSORBING: # V[-1] = 2*V[-2] - V[-3] → d²V/dx² = 0 result[tuple(hi)] = 2.0 * result[tuple(m1)] - result[tuple(m2)] elif sv.boundary_upper == BoundaryCondition.DIRICHLET: if self._boundary_values is not None and dim in self._boundary_values: result[tuple(hi)] = self._boundary_values[dim]["upper"] elif sv.boundary_upper in ( BoundaryCondition.NEUMANN, BoundaryCondition.REFLECTING, ): # dV/dx = 0 → V[-1] = V[-2] result[tuple(hi)] = result[tuple(m1)] return result def _update_value_finite_horizon(self, old_v, cost, drift, dt, sigma_sq=None): """Update value function for finite horizon problems.""" # Backward Euler scheme for parabolic PDE new_v = old_v + dt * cost # Add discount term if applicable if self.problem.discount_rate > 0: new_v -= dt * self.problem.discount_rate * old_v # Add drift term using upwind differencing new_v = self._apply_upwind_drift(new_v, drift, dt) # Add diffusion term: ½σ²∇²V if sigma_sq is not None: new_v += dt * self._apply_diffusion_term(old_v, sigma_sq) return new_v def _advance_value_one_step( # pylint: disable=too-many-branches,too-many-statements self, dt: float, scheme: TimeSteppingScheme, step_index: int, run_cfl_check: bool, ) -> Tuple[np.ndarray, float]: """Advance the value function by one time step under the current policy. Evaluates dynamics, running cost, diffusion, and the PIDE jump term at the *current* ``self.value_function`` and ``self.optimal_policy``, takes one step of size ``dt`` using ``scheme``, and applies boundary conditions. Does not mutate ``self.value_function`` — the caller assigns the returned array. Shared by the infinite-horizon policy-evaluation inner loop (``_policy_evaluation``) and the finite-horizon backward time-march (``solve_finite_horizon``). The caller is responsible for invalidating the operator cache whenever the policy may have changed. Args: dt: Time step size. scheme: Time-stepping scheme. step_index: Index of the current step (drives Crank-Nicolson Rannacher startup and divergence diagnostics). run_cfl_check: When True (typically only on the first step), performs the explicit-scheme CFL check and may reduce ``dt``. Returns: Tuple of ``(new_value_function, dt)`` where ``dt`` may have been reduced by the CFL check (explicit scheme only). """ assert self.value_function is not None and self.optimal_policy is not None old_v = self.value_function # not mutated below; caller owns the copy # Get state and control grids state_points = np.stack(self.problem.state_space.flat_grids, axis=-1) control_array = np.stack( [self.optimal_policy[cv.name].ravel() for cv in self.problem.control_variables], axis=-1, ) # Compute dynamics and running cost drift = self.problem.dynamics(state_points, control_array, 0.0) cost = self.problem.running_cost(state_points, control_array, 0.0) # Reshape drift = drift.reshape(self.problem.state_space.shape + (-1,)) cost = self._reshape_cost(cost) # Compute diffusion coefficient if specified sigma_sq = None if self.problem.diffusion is not None: sigma_sq_raw = self.problem.diffusion(state_points, control_array, 0.0) sigma_sq = sigma_sq_raw.reshape(self.problem.state_space.shape + (-1,)) # PIDE jump term (IMEX): evaluate at V_old and fold into the running # cost. Adding to `cost` is exactly the explicit RHS treatment used # by both the implicit theta-step and the explicit Euler step -- # neither scheme needs to factorize a non-local jump operator. if self.problem.jump_term is not None: jump_vals = self._evaluate_jump_term(state_points, control_array, old_v) cost = cost + jump_vals.reshape(cost.shape) # CFL stability check for explicit scheme (#452). Skipped when # auto_cfl=False (#1611): the caller owns dt stability in that mode. if scheme == TimeSteppingScheme.EXPLICIT and run_cfl_check and self.config.auto_cfl: adv_cfl, diff_cfl = self._compute_cfl_number(drift, sigma_sq, dt) if adv_cfl > 1.0 or diff_cfl > 1.0: # Compute safe dt max_rate = 0.0 n_dims = min(drift.shape[-1], len(self.problem.state_space.state_variables)) for dim in range(n_dims): dx_arr = self.dx[dim] dx_min = float(np.min(dx_arr)) dx_mean = float(np.mean(dx_arr)) drift_max = float(np.max(np.abs(drift[..., dim]))) if dx_min > 0: max_rate += drift_max / dx_min if sigma_sq is not None and dx_min > 0: max_rate += 0.5 * float(np.max(np.abs(sigma_sq[..., dim]))) / (dx_min**2) max_rate += self.problem.discount_rate if max_rate > 0: dt_safe = 0.9 / max_rate logger.warning( f"CFL condition violated (advection CFL={adv_cfl:.2f}, " f"diffusion CFL={diff_cfl:.2f}). " f"Auto-reducing dt from {dt:.4e} to {dt_safe:.4e}." ) dt = dt_safe # Time-stepping: branch on scheme (#451). Implicit/CN are implemented for # 1-D only; higher-dimensional requests fall back to explicit Euler here. # That fallback is surfaced once, up-front, by _warn_unsupported_scheme # (#1611) -- so it is not re-logged on every step (and is no longer gated # on run_cfl_check, which never fired during the finite-horizon march). use_implicit = scheme in ( TimeSteppingScheme.IMPLICIT, TimeSteppingScheme.CRANK_NICOLSON, ) can_use_implicit = use_implicit and self.problem.state_space.ndim == 1 if can_use_implicit: # Implicit or Crank-Nicolson 1D step drift_1d = drift[:, 0] if drift.ndim > 1 else drift sigma_sq_1d = sigma_sq[:, 0] if sigma_sq is not None and sigma_sq.ndim > 1 else sigma_sq cost_1d = cost.ravel() if scheme == TimeSteppingScheme.CRANK_NICOLSON: if step_index < self.config.rannacher_steps: # Rannacher startup: two implicit half-steps half_v = self._theta_step_1d( old_v.ravel(), cost_1d, drift_1d, sigma_sq_1d, dt / 2.0, theta=1.0, ) half_v = self._apply_boundary_conditions(half_v.reshape(old_v.shape)).ravel() new_v = self._theta_step_1d( half_v, cost_1d, drift_1d, sigma_sq_1d, dt / 2.0, theta=1.0, ) else: # Crank-Nicolson step (theta=0.5) new_v = self._theta_step_1d( old_v.ravel(), cost_1d, drift_1d, sigma_sq_1d, dt, theta=0.5, ) else: # Fully implicit step (theta=1) new_v = self._theta_step_1d( old_v.ravel(), cost_1d, drift_1d, sigma_sq_1d, dt, theta=1.0, ) new_v = new_v.reshape(old_v.shape) else: # Explicit scheme (or fallback for multi-D) if self.problem.time_horizon is not None: new_v = self._update_value_finite_horizon(old_v, cost, drift, dt, sigma_sq) else: advection = np.zeros_like(old_v) for dim in range(drift.shape[-1]): if dim >= len(self.problem.state_space.state_variables): continue drift_component = drift[..., dim] advection += self._apply_upwind_scheme(old_v, drift_component, dim) rhs = -self.problem.discount_rate * old_v + cost + advection if sigma_sq is not None: rhs += self._apply_diffusion_term(old_v, sigma_sq) new_v = old_v + dt * rhs # Enforce boundary conditions after each time step new_v = self._apply_boundary_conditions(new_v) # Check for NaN/Inf after each step (#453) if not np.all(np.isfinite(new_v)): n_nan = int(np.sum(np.isnan(new_v))) n_inf = int(np.sum(np.isinf(new_v))) raise NumericalDivergenceError( f"HJB solver diverged during value update " f"(step {step_index}): " f"{n_nan} NaN and {n_inf} Inf values detected. " f"Value function range before step: " f"[{float(np.nanmin(old_v)):.4e}, {float(np.nanmax(old_v)):.4e}]." ) return new_v, dt def _policy_evaluation(self): """Evaluate current policy by solving linear PDE. Supports explicit, implicit, and Crank-Nicolson time-stepping schemes. For explicit scheme, performs CFL stability check and auto-adapts dt. """ if self.value_function is None or self.optimal_policy is None: return # Policy may have changed since last evaluation; invalidate cached operators self._invalidate_operator_cache() dt = self.config.time_step scheme = self.config.scheme for _ in range(self.config.inner_max_iterations): # Inner iterations for policy evaluation # Type guard ensures value_function is not None after the check above assert self.value_function is not None # For mypy old_v = self.value_function.copy() new_v, dt = self._advance_value_one_step( dt, scheme, step_index=_, run_cfl_check=(_ == 0) ) self.value_function = new_v # Check inner convergence if ( np.max(np.abs(new_v - old_v)) < self.config.tolerance * self.config.inner_tolerance_factor ): break def _precompute_upwind_diffs(self) -> List[Tuple[np.ndarray, np.ndarray]]: """Precompute forward and backward finite differences of the value function. Returns a list (one entry per dimension) of (fwd_diff_flat, bwd_diff_flat) arrays, each of shape (n_states,). These encode the same upwind scheme used by ``_apply_upwind_scheme`` so that the advection term for an arbitrary drift field can be assembled as:: advection = sum_dim( max(drift, 0)*fwd + min(drift, 0)*bwd ) without recomputing the differences for every control candidate. """ assert self.value_function is not None ndim = self.problem.state_space.ndim diffs: List[Tuple[np.ndarray, np.ndarray]] = [] for dim in range(ndim): dx_array = self.dx[dim] n = self.value_function.shape[dim] # Broadcast dx along the correct axis dx_shape = [1] * ndim dx_shape[dim] = n - 1 dx_bc = dx_array.reshape(dx_shape) hi = [slice(None)] * ndim lo = [slice(None)] * ndim hi[dim] = slice(1, None) # indices 1..N-1 lo[dim] = slice(None, -1) # indices 0..N-2 diff_vals = (self.value_function[tuple(hi)] - self.value_function[tuple(lo)]) / dx_bc # Forward difference: defined at indices 0..N-2, zero at N-1 fwd = np.zeros_like(self.value_function) fwd[tuple(lo)] = diff_vals # Backward difference: defined at indices 1..N-1, zero at 0 bwd = np.zeros_like(self.value_function) bwd[tuple(hi)] = diff_vals diffs.append((fwd.ravel(), bwd.ravel())) return diffs @staticmethod def _build_control_combos(control_samples: List[np.ndarray]) -> np.ndarray: """Build all control combinations as an (n_combos, n_controls) array. Uses np.meshgrid instead of itertools.product for efficient Cartesian product construction. """ grids = np.meshgrid(*control_samples, indexing="ij") return np.column_stack([g.ravel() for g in grids]) def _compute_chunk_size(self, n_combos: int, n_states: int, n_controls: int) -> int: """Determine batch size from memory budget. Each combo-state pair requires storage for controls, drift, cost, and advection arrays (~4 * n_dims * 8 bytes per pair). """ ndim = self.problem.state_space.ndim budget_bytes = self.config.control_memory_budget_mb * 1024 * 1024 # Estimate bytes per combo: states * (controls + drift + cost + advection) * 8 bytes_per_combo = n_states * (n_controls + ndim + 1 + 1) * 8 if bytes_per_combo == 0: return n_combos chunk = max(1, budget_bytes // bytes_per_combo) return min(chunk, n_combos) def _evaluate_and_update_best( self, combos: np.ndarray, state_points: np.ndarray, n_states: int, n_controls: int, ndim: int, fwd_arr: np.ndarray, bwd_arr: np.ndarray, d2V_flat: Optional[np.ndarray], best_values: np.ndarray, best_controls: np.ndarray, ) -> None: """Evaluate Hamiltonian for a batch of control combos and update best. Processes combos in chunks determined by the memory budget, vectorizing over both states and combos within each chunk. """ chunk_size = self._compute_chunk_size(len(combos), n_states, n_controls) for start in range(0, len(combos), chunk_size): chunk = combos[start : start + chunk_size] n_chunk = len(chunk) # Tile state_points for all combos in chunk: (n_chunk * n_states, state_dim) states_tiled = np.tile(state_points, (n_chunk, 1)) # Repeat each combo for all states: (n_chunk * n_states, n_controls) controls_tiled = np.repeat(chunk, n_states, axis=0) # Evaluate dynamics and running cost in one vectorized call drift = self.problem.dynamics(states_tiled, controls_tiled, 0.0) cost = self.problem.running_cost(states_tiled, controls_tiled, 0.0) # Reduce cost to 1D cost = np.asarray(cost) if cost.ndim > 1: if cost.shape[-1] > 1: cost = np.mean(cost, axis=-1) else: cost = cost[..., 0] cost = cost.flatten() # Reshape to (n_chunk, n_states) cost_2d = cost.reshape(n_chunk, n_states) # Compute advection: upwind scheme drift_flat = np.asarray(drift).reshape(n_chunk * n_states, -1) n_dims = min(drift_flat.shape[1], ndim) drift_pos = np.maximum(drift_flat[:, :n_dims], 0.0) drift_neg = np.minimum(drift_flat[:, :n_dims], 0.0) # Tile fwd/bwd arrays for all combos in chunk fwd_tiled = np.tile(fwd_arr[:, :n_dims], (n_chunk, 1)) bwd_tiled = np.tile(bwd_arr[:, :n_dims], (n_chunk, 1)) advection = np.sum(drift_pos * fwd_tiled + drift_neg * bwd_tiled, axis=1) advection_2d = advection.reshape(n_chunk, n_states) hamiltonian_2d = cost_2d + advection_2d # Add diffusion term if self.problem.diffusion is not None and d2V_flat is not None: sigma_sq = self.problem.diffusion(states_tiled, controls_tiled, 0.0) sigma_sq = np.asarray(sigma_sq).reshape(n_chunk * n_states, -1) n_diff = min(sigma_sq.shape[1], d2V_flat.shape[1], n_dims) diff_term = 0.5 * np.sum( sigma_sq[:, :n_diff] * np.tile(d2V_flat[:, :n_diff], (n_chunk, 1)), axis=1, ) hamiltonian_2d += diff_term.reshape(n_chunk, n_states) # Add PIDE jump term -- evaluated at the current V; depends on # control because the post-jump wealth uses the user's retained-loss # function (a function of SIR or other controls). if self.problem.jump_term is not None and self.value_function is not None: jump_vals = self._evaluate_jump_term( states_tiled, controls_tiled, self.value_function ) hamiltonian_2d += jump_vals.reshape(n_chunk, n_states) # Hard state-dependent control constraint (optional): blank out # inadmissible (state, control) pairs to -inf so the argmax below can # never select them (issue #1649). States where every control is # inadmissible keep -inf here and are repaired by the min-control # fallback in _policy_improvement. if self.problem.control_feasibility is not None: feasible = np.asarray( self.problem.control_feasibility(states_tiled, controls_tiled), dtype=bool, ).reshape(n_chunk, n_states) hamiltonian_2d = np.where(feasible, hamiltonian_2d, -np.inf) # Find best combo per state within this chunk chunk_best_idx = np.argmax(hamiltonian_2d, axis=0) # (n_states,) chunk_best_vals = hamiltonian_2d[chunk_best_idx, np.arange(n_states)] # Update global best improved = chunk_best_vals > best_values best_values[improved] = chunk_best_vals[improved] best_controls[improved] = chunk[chunk_best_idx[improved]] def _policy_improvement_loop( self, state_points: np.ndarray, n_states: int, n_controls: int, ndim: int, fwd_arr: np.ndarray, bwd_arr: np.ndarray, d2V_flat: Optional[np.ndarray], best_values: np.ndarray, best_controls: np.ndarray, control_samples: List[np.ndarray], ) -> None: """Legacy loop-based policy improvement (original implementation).""" for control_combo in itertools_product(*control_samples): control_array = np.array(control_combo) control_broadcast = np.tile(control_array, (n_states, 1)) drift = self.problem.dynamics(state_points, control_broadcast, 0.0) cost = self.problem.running_cost(state_points, control_broadcast, 0.0) cost = np.asarray(cost) if cost.ndim > 1: if cost.shape[-1] > 1: cost = np.mean(cost, axis=-1) else: cost = cost[..., 0] cost = cost.flatten() drift_flat = np.asarray(drift).reshape(n_states, -1) n_dims = min(drift_flat.shape[1], ndim) drift_pos = np.maximum(drift_flat[:, :n_dims], 0.0) drift_neg = np.minimum(drift_flat[:, :n_dims], 0.0) advection_flat = np.sum( drift_pos * fwd_arr[:, :n_dims] + drift_neg * bwd_arr[:, :n_dims], axis=1, ) hamiltonian = cost + advection_flat if self.problem.diffusion is not None and d2V_flat is not None: sigma_sq = self.problem.diffusion(state_points, control_broadcast, 0.0) sigma_sq = np.asarray(sigma_sq).reshape(n_states, -1) n_diff = min(sigma_sq.shape[1], d2V_flat.shape[1], n_dims) hamiltonian += 0.5 * np.sum(sigma_sq[:, :n_diff] * d2V_flat[:, :n_diff], axis=1) if self.problem.jump_term is not None and self.value_function is not None: jump_vals = self._evaluate_jump_term( state_points, control_broadcast, self.value_function ) hamiltonian = hamiltonian + jump_vals # Hard state-dependent control constraint (optional): inadmissible # (state, this-control) pairs cannot improve the best value (issue #1649). if self.problem.control_feasibility is not None: feasible = np.asarray( self.problem.control_feasibility(state_points, control_broadcast), dtype=bool, ).ravel() hamiltonian = np.where(feasible, hamiltonian, -np.inf) improved = hamiltonian > best_values best_values[improved] = hamiltonian[improved] best_controls[improved] = control_array def _policy_improvement_vectorized( self, state_points: np.ndarray, n_states: int, n_controls: int, ndim: int, fwd_arr: np.ndarray, bwd_arr: np.ndarray, d2V_flat: Optional[np.ndarray], best_values: np.ndarray, best_controls: np.ndarray, control_samples: List[np.ndarray], ) -> None: """Fully vectorized policy improvement over all control combos.""" combos = self._build_control_combos(control_samples) self._evaluate_and_update_best( combos, state_points, n_states, n_controls, ndim, fwd_arr, bwd_arr, d2V_flat, best_values, best_controls, ) def _policy_improvement_adaptive( self, state_points: np.ndarray, n_states: int, n_controls: int, ndim: int, fwd_arr: np.ndarray, bwd_arr: np.ndarray, d2V_flat: Optional[np.ndarray], best_values: np.ndarray, best_controls: np.ndarray, control_samples: List[np.ndarray], ) -> None: """Two-pass adaptive policy improvement: coarse search then local refinement.""" # Pass 1: Coarse grid (every _COARSE_STRIDE-th point per control) coarse_samples = [s[::_COARSE_STRIDE] for s in control_samples] coarse_combos = self._build_control_combos(coarse_samples) self._evaluate_and_update_best( coarse_combos, state_points, n_states, n_controls, ndim, fwd_arr, bwd_arr, d2V_flat, best_values, best_controls, ) # Pass 2: Refine around coarse optima # Find unique coarse optima (each row of best_controls is a combo) unique_optima = np.unique(best_controls, axis=0) # For each unique optimum, build a refined grid of nearby combos refined_combos_list = [] for optimum in unique_optima: per_control_refined = [] for j, full_grid in enumerate(control_samples): # Find the closest index in the full grid closest_idx = int(np.argmin(np.abs(full_grid - optimum[j]))) lo = max(0, closest_idx - _REFINE_RADIUS) hi = min(len(full_grid), closest_idx + _REFINE_RADIUS + 1) per_control_refined.append(full_grid[lo:hi]) local_combos = self._build_control_combos(per_control_refined) refined_combos_list.append(local_combos) if refined_combos_list: all_refined = np.vstack(refined_combos_list) # Deduplicate all_refined = np.unique(all_refined, axis=0) self._evaluate_and_update_best( all_refined, state_points, n_states, n_controls, ndim, fwd_arr, bwd_arr, d2V_flat, best_values, best_controls, ) def _policy_improvement(self): """Improve policy by maximizing the Hamiltonian. H(x,u) = f(x,u) + drift(x,u)·∇V(x) + ½σ²(x,u)·∇²V(x) Dispatches to vectorized, adaptive, or loop-based strategy based on ``config.control_search_strategy``. The precomputation of upwind finite differences is shared across all strategies. """ if self.value_function is None or self.optimal_policy is None: return state_points = np.stack(self.problem.state_space.flat_grids, axis=-1) n_states = state_points.shape[0] n_controls = len(self.problem.control_variables) ndim = self.problem.state_space.ndim # Initialize tracking arrays best_values = np.full(n_states, -np.inf) best_controls = np.zeros((n_states, n_controls)) # Compute second derivatives for diffusion term (independent of control) d2V_flat = None if self.problem.diffusion is not None: d2V = self._compute_second_derivatives(self.value_function) # Check for NaN in second derivatives (#453) if not np.all(np.isfinite(d2V)): logger.warning( "NaN/Inf detected in second derivatives during policy improvement; " "skipping policy update." ) return d2V_flat = d2V.reshape(n_states, -1) # Precompute upwind finite differences of V (control-independent). # Each entry is (fwd_flat, bwd_flat) with shape (n_states,). upwind_diffs = self._precompute_upwind_diffs() # Stack into (n_states, ndim) for vectorized advection computation fwd_arr = np.column_stack([f for f, _ in upwind_diffs]) # (n_states, ndim) bwd_arr = np.column_stack([b for _, b in upwind_diffs]) # (n_states, ndim) # Get discrete control samples for each control variable control_samples = [cv.get_values() for cv in self.problem.control_variables] # Shared arguments for all strategies args = ( state_points, n_states, n_controls, ndim, fwd_arr, bwd_arr, d2V_flat, best_values, best_controls, control_samples, ) # Determine strategy strategy = self.config.control_search_strategy if strategy == "gradient": raise NotImplementedError( "Gradient-based control search is reserved for future implementation." ) if strategy == "auto": n_combos = 1 for s in control_samples: n_combos *= len(s) strategy = "vectorized" if n_combos <= _VECTORIZE_COMBO_THRESHOLD else "adaptive" if strategy == "vectorized": self._policy_improvement_vectorized(*args) elif strategy == "adaptive": self._policy_improvement_adaptive(*args) else: # "loop" or any unrecognized value falls back to legacy self._policy_improvement_loop(*args) # Constrained solves (issue #1649): states where every sampled control was # inadmissible keep best_values == -inf (the whole row was masked to -inf, so # no combo "improved"). Fall back to each control variable's first sample -- # the grid minimum, e.g. the smallest retention / most coverage -- the safe # default when no admissible control exists. No-op when control_feasibility is # None (best_values is always finite then). if self.problem.control_feasibility is not None: no_feasible = ~np.isfinite(best_values) if np.any(no_feasible): for j in range(n_controls): best_controls[no_feasible, j] = control_samples[j][0] # Write optimal controls back to policy arrays for j, cv in enumerate(self.problem.control_variables): self.optimal_policy[cv.name] = best_controls[:, j].reshape( self.problem.state_space.shape )
[docs] def extract_feedback_control(self, state: np.ndarray) -> Dict[str, float]: """Extract feedback control law at given state. Args: state: Current state values Returns: Dictionary of control variable names to optimal values """ if self.optimal_policy is None: raise RuntimeError("Must solve HJB equation before extracting controls") # Interpolate policy at given state controls = {} for cv in self.problem.control_variables: policy_func = interpolate.RegularGridInterpolator( self.problem.state_space.grids, self.optimal_policy[cv.name], method="linear", bounds_error=False, fill_value=None, ) controls[cv.name] = float(policy_func(state.reshape(1, -1))[0]) return controls
def _assemble_hamiltonian_terms( self, value: np.ndarray, control_array: np.ndarray ) -> HamiltonianTerms: """Assemble the per-term HJB Hamiltonian on the state grid. Single source of truth for the HJB operator: used by the public :meth:`hamiltonian_terms` diagnostic and by :meth:`compute_convergence_metrics` (whose residual is ``|total|``). Every term uses the SAME discretization the time-stepper integrates -- upwind first derivative (:meth:`_apply_upwind_scheme`), compact non-uniform central second derivative (:meth:`_compute_second_derivatives`), and the jump-term callback -- so ``total`` reproduces the ``(V_new - V_old)/dt = -∂V/∂t`` that :meth:`_advance_value_one_step` applies on interior cells (boundary cells are separately overwritten by :meth:`_apply_boundary_conditions`). Args: value: Value function on the state grid, shape ``state_space.shape``. control_array: Controls of shape ``(n_states, n_controls)`` in ``problem.control_variables`` order. Returns: HamiltonianTerms with grid-shaped force arrays (see that dataclass). """ ss = self.problem.state_space state_shape = ss.shape ndim = ss.ndim state_points = np.stack(ss.flat_grids, axis=-1) # Drift · ∇V using the solver's UPWIND scheme -- numerically identical to the # advection assembled in _policy_improvement / _evaluate_and_update_best. drift = np.asarray(self.problem.dynamics(state_points, control_array, 0.0)) drift = drift.reshape(state_shape + (-1,)) drift_term = np.zeros(state_shape) n_dims = min(drift.shape[-1], ndim) for dim in range(n_dims): drift_term = drift_term + self._apply_upwind_scheme(value, drift[..., dim], dim) # Central-difference ∇V (display only) and the compact ∇²V stencil used by # the diffusion term. grad is NOT the upwind ∇V folded into drift_term. grad = self._compute_gradient(value) d2v = self._compute_second_derivatives(value) # ½ σ² · ∇²V (compact stencil); zero field if the problem has no diffusion. diffusion_term = np.zeros(state_shape) if self.problem.diffusion is not None: sigma_sq = np.asarray(self.problem.diffusion(state_points, control_array, 0.0)) sigma_sq = sigma_sq.reshape(state_shape + (-1,)) n_diff = min(sigma_sq.shape[-1], d2v.shape[-1], ndim) for dim in range(n_diff): diffusion_term = diffusion_term + 0.5 * sigma_sq[..., dim] * d2v[..., dim] # PIDE jump term λ·E_X[V(post) - V]; zero field if no jump term. jump_term = np.zeros(state_shape) if self.problem.jump_term is not None: jump_term = self._evaluate_jump_term(state_points, control_array, value).reshape( state_shape ) # Running cost f(x,u). _reshape_cost mirrors the cost handling in # _policy_improvement (mean over any extra column), so the decomposition # matches the Hamiltonian the solver maximized; identical to a plain reshape # for the scalar-per-point costs used throughout this codebase. running_cost = self._reshape_cost( np.asarray(self.problem.running_cost(state_points, control_array, 0.0)) ) # Discount -ρV (control-independent; absent from the maximized Hamiltonian # but part of the full operator -∂V/∂t). discount = -self.problem.discount_rate * value total = drift_term + diffusion_term + jump_term + running_cost + discount return HamiltonianTerms( drift=drift_term, diffusion=diffusion_term, jump=jump_term, running_cost=running_cost, discount=discount, total=total, grad=grad, d2v=d2v, )
[docs] def hamiltonian_terms( self, value: Optional[np.ndarray] = None, policy: Optional[Dict[str, np.ndarray]] = None, ) -> HamiltonianTerms: """Per-term HJB Hamiltonian on the state grid at a fixed control policy. Exposes the forces the solver actually balanced -- ``drift·∇V`` (upwind), ``½σ²·∇²V`` (compact stencil), the ``jump_term`` callback, the running cost and ``-ρV`` -- evaluated on the full state grid at ``policy`` using the SAME discretization the time-stepper integrates (NOT ``np.gradient``). The returned :attr:`HamiltonianTerms.total` is the solver's ``-∂V/∂t`` at this ``(value, policy)``; see :class:`HamiltonianTerms` for the exact identity. Intended for faithful, smooth diagnostic plots of the Hamiltonian decomposition. Re-deriving the forces in user code with ``np.gradient(np.gradient(V))`` instead amplifies grid-scale noise by ``~1/Δx²`` (a *finer* grid makes it worse) and does not reproduce the operator the solver optimized. Args: value: Value function on the state grid (shape ``state_space.shape``). Defaults to the solved ``self.value_function``. policy: Control policy as ``{control_name: array}``; each array is ravelled in C-order to the state grid. Defaults to the solved ``self.optimal_policy``. Pass a de-jittered/smoothed policy to suppress argmax-step roughness in the plotted forces. Returns: HamiltonianTerms with grid-shaped force arrays. Raises: RuntimeError: If no value function or policy is available (no prior solve and the corresponding argument was not supplied). KeyError: If ``policy`` omits one of the problem's control variables. ValueError: If ``value`` does not match the state-grid shape, or a policy array does not have one entry per state-grid point. """ if value is None: value = self.value_function if value is None: raise RuntimeError( "hamiltonian_terms requires a prior solve() / solve_finite_horizon() " "to populate the value function, or an explicit `value=` argument." ) value = np.asarray(value) if value.shape != self.problem.state_space.shape: raise ValueError( f"value shape {value.shape} does not match the state grid " f"{self.problem.state_space.shape}." ) if policy is None: policy = self.optimal_policy if policy is None: raise RuntimeError( "hamiltonian_terms requires a prior solve to populate the policy, " "or an explicit `policy=` argument." ) n_states = int(np.prod(self.problem.state_space.shape)) control_cols = [] for cv in self.problem.control_variables: if cv.name not in policy: raise KeyError( f"policy is missing control variable '{cv.name}'; expected keys " f"{[c.name for c in self.problem.control_variables]}." ) col = np.asarray(policy[cv.name]).ravel() if col.size != n_states: raise ValueError( f"policy['{cv.name}'] has {col.size} entries but the state grid " f"has {n_states} points." ) control_cols.append(col) control_array = np.stack(control_cols, axis=-1) return self._assemble_hamiltonian_terms(value, control_array)
[docs] def compute_convergence_metrics(self) -> Dict[str, Any]: """Compute metrics for assessing solution quality. Returns: Dictionary of convergence metrics """ if self.value_function is None: return {"error": "No solution computed yet"} # Assemble the HJB residual |H| = |-∂V/∂t| via the shared per-term operator # (upwind drift, compact-stencil diffusion, jump callback, running cost, -ρV). # Routing through _assemble_hamiltonian_terms guarantees this residual and # HJBSolver.hamiltonian_terms().total use one identical discretization. assert self.optimal_policy is not None control_array = np.stack( [self.optimal_policy[cv.name].ravel() for cv in self.problem.control_variables], axis=-1 ) terms = self._assemble_hamiltonian_terms(self.value_function, control_array) residual = np.abs(terms.total).ravel() # Check for NaN/Inf (#453) has_nan_inf = not np.all(np.isfinite(self.value_function)) # Build interior mask: True where all dimensions are strictly interior. # The finite-difference operators (_apply_upwind_scheme, # _compute_second_derivatives) zero-pad at boundaries, so the residual # at boundary cells reduces to |-rho*V + cost + jump| with no derivative # term to balance against. The boundary value is set by a separate # mechanism (ABSORBING extrapolation or DIRICHLET) with no obligation # to satisfy the local PDE balance — including those cells in the max # residual conflates boundary-imposition noise with interior convergence. # See issue #1569 for full motivation. state_shape = self.problem.state_space.shape interior_mask = np.ones(state_shape, dtype=bool) for dim in range(self.problem.state_space.ndim): slicer: List[Any] = [slice(None)] * self.problem.state_space.ndim slicer[dim] = 0 interior_mask[tuple(slicer)] = False slicer[dim] = -1 interior_mask[tuple(slicer)] = False interior_flat = interior_mask.ravel() boundary_flat = ~interior_flat # Argmax location and boundary classification (for diagnostics). argmax_flat = int(np.argmax(residual)) argmax_idx = np.unravel_index(argmax_flat, state_shape) argmax_is_boundary = bool(boundary_flat[argmax_flat]) # Interior residual statistics (the convergence target). if np.any(interior_flat): max_residual_interior = float(np.max(residual[interior_flat])) mean_residual_interior = float(np.mean(residual[interior_flat])) else: # Degenerate case: grid too small to have interior cells. max_residual_interior = float("nan") mean_residual_interior = float("nan") # Boundary residual statistics (diagnostic only — not a convergence # criterion; useful for spotting when imposed Dirichlet values are far # from local PDE equilibrium). if np.any(boundary_flat): max_residual_boundary = float(np.max(residual[boundary_flat])) mean_residual_boundary = float(np.mean(residual[boundary_flat])) else: max_residual_boundary = float("nan") mean_residual_boundary = float("nan") return { "has_nan_inf": has_nan_inf, # Legacy keys (over all cells, including boundary) — retained for # backward compatibility; prefer the interior-only metrics for # convergence assessment. "max_residual": float(np.max(residual)), "mean_residual": float(np.mean(residual)), "max_residual_all_cells": float(np.max(residual)), # Interior residual (convergence target). "max_residual_interior": max_residual_interior, "mean_residual_interior": mean_residual_interior, # Boundary residual (diagnostic). "max_residual_boundary": max_residual_boundary, "mean_residual_boundary": mean_residual_boundary, # Argmax diagnostics — distinguishes boundary-spike vs interior # under-convergence; used by issue #1566 §0 diagnostic. "argmax_residual_index": tuple(int(i) for i in argmax_idx), "argmax_residual_is_boundary": argmax_is_boundary, "value_function_range": ( float(np.min(self.value_function)), float(np.max(self.value_function)), ), "policy_stats": { cv.name: { "min": float(np.min(self.optimal_policy[cv.name])), "max": float(np.max(self.optimal_policy[cv.name])), "mean": float(np.mean(self.optimal_policy[cv.name])), } for cv in self.problem.control_variables }, }
[docs] def create_custom_utility( evaluate_func: Callable[[np.ndarray], np.ndarray], derivative_func: Callable[[np.ndarray], np.ndarray], inverse_derivative_func: Optional[Callable[[np.ndarray], np.ndarray]] = None, ) -> UtilityFunction: """Factory function for creating custom utility functions. This function allows users to create custom utility functions by providing the evaluation and derivative functions. This is the recommended way to add new utility functions beyond the built-in ones. Args: evaluate_func: Function that evaluates U(w) derivative_func: Function that computes U'(w) inverse_derivative_func: Optional function for (U')^(-1)(m) Returns: Custom utility function instance Example: >>> # Create exponential utility: U(w) = 1 - exp(-α*w) >>> def exp_eval(w): ... alpha = 0.01 ... return 1 - np.exp(-alpha * w) >>> def exp_deriv(w): ... alpha = 0.01 ... return alpha * np.exp(-alpha * w) >>> exp_utility = create_custom_utility(exp_eval, exp_deriv) """ class CustomUtility(UtilityFunction): """Dynamically created custom utility function.""" def evaluate(self, wealth: np.ndarray) -> np.ndarray: return evaluate_func(wealth) def derivative(self, wealth: np.ndarray) -> np.ndarray: return derivative_func(wealth) def inverse_derivative(self, marginal_utility: np.ndarray) -> np.ndarray: if inverse_derivative_func is None: raise NotImplementedError("Inverse derivative not provided for custom utility") return inverse_derivative_func(marginal_utility) return CustomUtility()