Source code for ergodic_insurance.trends

"""Trend module for insurance claim frequency and severity adjustments.

This module provides a hierarchy of trend classes that apply multiplicative
adjustments to claim frequencies and severities over time. Trends model
how insurance risks evolve due to inflation, exposure growth, regulatory
changes, or other systematic factors.

Key Concepts:
    - All trends are multiplicative (1.0 = no change, 1.03 = 3% increase)
    - Support both annual and sub-annual (monthly) time steps
    - Seedable for reproducibility in stochastic trends
    - Time-based multipliers for dynamic risk evolution

Example:
    Basic usage with linear trend::

        from ergodic_insurance.trends import LinearTrend, ScenarioTrend

        # 3% annual inflation trend
        inflation = LinearTrend(annual_rate=0.03)
        multiplier_year5 = inflation.get_multiplier(5.0)  # ~1.159

        # Custom scenario with varying rates
        scenario = ScenarioTrend(
            factors=[1.0, 1.05, 1.08, 1.06, 1.10],
            time_unit="annual"
        )
        multiplier_year3 = scenario.get_multiplier(3.5)  # Interpolated

Since:
    Version 0.4.0 - Core trend infrastructure for ClaimGenerator
"""

from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Union

import numpy as np


[docs] class Trend(ABC): """Abstract base class for all trend implementations. Defines the interface that all trend classes must implement. Trends provide multiplicative adjustments over time for frequencies and severities in insurance claim modeling. All trend implementations must provide: - get_multiplier(time): Returns multiplicative factor at given time - Proper handling of edge cases (negative time, etc.) - Reproducibility through seed support (if stochastic) Examples: Implementing a custom trend:: class StepTrend(Trend): def __init__(self, step_time: float, step_factor: float): self.step_time = step_time self.step_factor = step_factor def get_multiplier(self, time: float) -> float: if time < 0: return 1.0 return 1.0 if time < self.step_time else self.step_factor """ def __init__(self, seed: Optional[int] = None): """Initialize trend with optional random seed. Args: seed: Random seed for reproducibility in stochastic trends. Ignored for deterministic trends but included for interface consistency. """ self.seed = seed if seed is not None: self.rng = np.random.default_rng(seed) else: self.rng = np.random.default_rng()
[docs] @abstractmethod def get_multiplier(self, time: float) -> float: """Get the multiplicative adjustment factor at a given time. Args: time: Time point (in years from start) to get multiplier for. Can be fractional for sub-annual precision. Returns: float: Multiplicative factor (1.0 = no change, >1.0 = increase, <1.0 = decrease). Note: Implementations should handle negative time gracefully, typically returning 1.0 or the initial value. """
[docs] def reset_seed(self, seed: int) -> None: """Reset random seed for stochastic trends. Args: seed: New random seed to use. Note: This method allows re-running scenarios with different random outcomes while maintaining reproducibility. """ self.seed = seed self.rng = np.random.default_rng(seed)
[docs] class NoTrend(Trend): """Default trend implementation with no adjustment over time. This trend always returns a multiplier of 1.0, representing no change in frequency or severity over time. Useful as a default or baseline. Examples: Using NoTrend as baseline:: from ergodic_insurance.trends import NoTrend baseline = NoTrend() # Always returns 1.0 assert baseline.get_multiplier(0) == 1.0 assert baseline.get_multiplier(10) == 1.0 assert baseline.get_multiplier(-5) == 1.0 """
[docs] def get_multiplier(self, time: float) -> float: """Return constant multiplier of 1.0. Args: time: Time point (ignored). Returns: float: Always returns 1.0. """ return 1.0
[docs] class LinearTrend(Trend): """Linear compound growth trend with constant annual rate. Models exponential growth/decay with a fixed annual rate, similar to compound interest. Commonly used for inflation, exposure growth, or systematic risk changes. The multiplier at time t is calculated as: (1 + annual_rate)^t Attributes: annual_rate: Annual growth rate (0.03 = 3% growth, -0.02 = 2% decay). Examples: Modeling inflation:: from ergodic_insurance.trends import LinearTrend # 3% annual inflation inflation = LinearTrend(annual_rate=0.03) # After 5 years: 1.03^5 ≈ 1.159 mult_5y = inflation.get_multiplier(5.0) print(f"5-year inflation factor: {mult_5y:.3f}") # After 6 months: 1.03^0.5 ≈ 1.015 mult_6m = inflation.get_multiplier(0.5) print(f"6-month inflation factor: {mult_6m:.3f}") Modeling exposure decay:: # 2% annual exposure reduction reduction = LinearTrend(annual_rate=-0.02) mult_10y = reduction.get_multiplier(10.0) # 0.98^10 ≈ 0.817 """ def __init__(self, annual_rate: float = 0.03, seed: Optional[int] = None): """Initialize linear trend with specified annual rate. Args: annual_rate: Annual growth rate. Positive for growth, negative for decay. Default is 0.03 (3% growth). seed: Included for interface consistency but unused for this deterministic trend. Examples: Various growth scenarios:: # 5% annual growth growth = LinearTrend(0.05) # 1% annual decay decay = LinearTrend(-0.01) # No change (equivalent to NoTrend) flat = LinearTrend(0.0) """ super().__init__(seed) self.annual_rate = annual_rate
[docs] def get_multiplier(self, time: float) -> float: """Calculate compound growth multiplier at given time. Args: time: Time in years from start. Can be fractional for sub-annual calculations. Negative times return 1.0. Returns: float: Multiplicative factor calculated as (1 + annual_rate)^time. Returns 1.0 for negative times. Examples: Calculating multipliers:: trend = LinearTrend(0.04) # 4% annual # Year 1: 1.04 mult_1 = trend.get_multiplier(1.0) # Year 2.5: 1.04^2.5 ≈ 1.104 mult_2_5 = trend.get_multiplier(2.5) # Negative time: 1.0 mult_neg = trend.get_multiplier(-1.0) """ if time < 0: return 1.0 return float((1 + self.annual_rate) ** time)
[docs] class RandomWalkTrend(Trend): """Random walk trend with drift and volatility. Models a geometric random walk (geometric Brownian motion) where the multiplier evolves as a cumulative product of random changes. Commonly used for modeling market indices, asset prices, or unpredictable long-term trends in insurance markets. The multiplier at time t follows: M(t) = exp(drift * t + volatility * W(t)) where W(t) is a Brownian motion. Attributes: drift: Annual drift rate (expected growth rate). volatility: Annual volatility (standard deviation of log returns). cached_path: Cached random path for efficiency. cached_times: Time points for the cached path. Examples: Basic random walk with drift:: from ergodic_insurance.trends import RandomWalkTrend # 2% drift with 10% volatility trend = RandomWalkTrend(drift=0.02, volatility=0.10, seed=42) # Generate multipliers mult_1 = trend.get_multiplier(1.0) # Random around e^0.02 mult_5 = trend.get_multiplier(5.0) # More variation Market-like volatility:: # High volatility market volatile = RandomWalkTrend(drift=0.0, volatility=0.30) # Low volatility with positive drift stable = RandomWalkTrend(drift=0.03, volatility=0.05) """ def __init__(self, drift: float = 0.0, volatility: float = 0.10, seed: Optional[int] = None): """Initialize random walk trend. Args: drift: Annual drift rate (mu in geometric Brownian motion). Can be positive (upward trend) or negative (downward). Default is 0.0 (no drift). volatility: Annual volatility (sigma in geometric Brownian motion). Must be >= 0. Default is 0.10 (10% annual volatility). seed: Random seed for reproducibility. Raises: ValueError: If volatility is negative. Examples: Various configurations:: # Pure random walk (no drift) pure_rw = RandomWalkTrend(drift=0.0, volatility=0.15) # Positive drift with moderate volatility growth = RandomWalkTrend(drift=0.05, volatility=0.20) # Declining trend with low volatility decline = RandomWalkTrend(drift=-0.02, volatility=0.08) """ super().__init__(seed) if volatility < 0: raise ValueError(f"volatility must be >= 0, got {volatility}") self.drift = drift self.volatility = volatility self.cached_path: Optional[np.ndarray] = None self.cached_times: Optional[np.ndarray] = None def _generate_path(self, max_time: float, dt: float = 0.01) -> None: """Generate and cache a random path. Args: max_time: Maximum time to generate path for. dt: Time step for path generation. """ num_steps = int(max_time / dt) + 1 times = np.linspace(0, max_time, num_steps) # Generate Brownian motion increments if self.volatility > 0: dW = self.rng.standard_normal(num_steps - 1) * np.sqrt(dt) W = np.concatenate([[0], np.cumsum(dW)]) else: W = np.zeros(num_steps) # Geometric Brownian motion formula # M(t) = exp((drift - 0.5*volatility^2)*t + volatility*W(t)) log_multipliers = (self.drift - 0.5 * self.volatility**2) * times + self.volatility * W self.cached_times = times self.cached_path = np.exp(log_multipliers)
[docs] def get_multiplier(self, time: float) -> float: """Get random walk multiplier at given time. Args: time: Time in years from start. Negative times return 1.0. Returns: float: Multiplicative factor following geometric Brownian motion. Always positive due to exponential transformation. Note: The path is cached on first call for efficiency. All subsequent calls will use the same random path, ensuring consistency within a simulation run. """ if time < 0: return 1.0 if time == 0: return 1.0 # Generate or extend cached path if needed if self.cached_path is None or self.cached_times is None or time > self.cached_times[-1]: max_time = max(time * 1.5, 100.0) # Generate extra for efficiency self._generate_path(max_time) # Interpolate from cached path assert self.cached_times is not None assert self.cached_path is not None return float(np.interp(time, self.cached_times, self.cached_path))
[docs] def reset_seed(self, seed: int) -> None: """Reset random seed and clear cached path. Args: seed: New random seed to use. """ super().reset_seed(seed) self.cached_path = None self.cached_times = None
[docs] class MeanRevertingTrend(Trend): """Mean-reverting trend using Ornstein-Uhlenbeck process. Models a trend that tends to revert to a long-term mean level, commonly used for interest rates, insurance market cycles, or any process with cyclical behavior around a stable level. The process follows: dX(t) = theta*(mu - X(t))*dt + sigma*dW(t) where the multiplier M(t) = exp(X(t)) Attributes: mean_level: Long-term mean multiplier level. reversion_speed: Speed of mean reversion (theta). volatility: Volatility of the process (sigma). initial_level: Starting multiplier level. cached_path: Cached process path for efficiency. cached_times: Time points for the cached path. Examples: Insurance market cycle:: from ergodic_insurance.trends import MeanRevertingTrend # Market cycles around 1.0 with 5-year half-life market = MeanRevertingTrend( mean_level=1.0, reversion_speed=0.14, # ln(2)/5 years volatility=0.10, initial_level=1.1, # Start in hard market seed=42 ) # Will gradually revert to 1.0 mult_1 = market.get_multiplier(1.0) mult_10 = market.get_multiplier(10.0) # Closer to 1.0 Interest rate model:: # Interest rates reverting to 3% with high volatility rates = MeanRevertingTrend( mean_level=1.03, reversion_speed=0.5, volatility=0.15 ) """ def __init__( self, mean_level: float = 1.0, reversion_speed: float = 0.2, volatility: float = 0.10, initial_level: float = 1.0, seed: Optional[int] = None, ): """Initialize mean-reverting trend. Args: mean_level: Long-term mean multiplier level (mu). Default is 1.0 (no long-term trend). reversion_speed: Speed of mean reversion (theta). Higher values = faster reversion. Default is 0.2 (moderate speed). volatility: Volatility of the process (sigma). Default is 0.10 (10% volatility). initial_level: Starting multiplier level. Default is 1.0. seed: Random seed for reproducibility. Raises: ValueError: If reversion_speed or volatility is negative, or if mean_level or initial_level is not positive. Examples: Different mean reversion scenarios:: # Fast reversion to high mean fast = MeanRevertingTrend( mean_level=1.2, reversion_speed=1.0, volatility=0.05 ) # Slow reversion with high volatility slow = MeanRevertingTrend( mean_level=0.9, reversion_speed=0.05, volatility=0.25 ) # Starting far from mean displaced = MeanRevertingTrend( mean_level=1.0, reversion_speed=0.3, initial_level=1.5 ) """ super().__init__(seed) if reversion_speed < 0: raise ValueError(f"reversion_speed must be >= 0, got {reversion_speed}") if volatility < 0: raise ValueError(f"volatility must be >= 0, got {volatility}") if mean_level <= 0: raise ValueError(f"mean_level must be > 0, got {mean_level}") if initial_level <= 0: raise ValueError(f"initial_level must be > 0, got {initial_level}") self.mean_level = mean_level self.reversion_speed = reversion_speed self.volatility = volatility self.initial_level = initial_level self.cached_path: Optional[np.ndarray] = None self.cached_times: Optional[np.ndarray] = None def _generate_path(self, max_time: float, dt: float = 0.01) -> None: """Generate and cache an Ornstein-Uhlenbeck path. Args: max_time: Maximum time to generate path for. dt: Time step for path generation. """ num_steps = int(max_time / dt) + 1 times = np.linspace(0, max_time, num_steps) # Work in log space for positivity log_mean = np.log(self.mean_level) log_initial = np.log(self.initial_level) # Generate OU process in log space X = np.zeros(num_steps) X[0] = log_initial if self.volatility > 0 or self.reversion_speed > 0: for i in range(1, num_steps): dW = self.rng.standard_normal() * np.sqrt(dt) X[i] = ( X[i - 1] + self.reversion_speed * (log_mean - X[i - 1]) * dt + self.volatility * dW ) else: X[:] = log_initial # Convert to multipliers self.cached_times = times self.cached_path = np.exp(X)
[docs] def get_multiplier(self, time: float) -> float: """Get mean-reverting multiplier at given time. Args: time: Time in years from start. Negative times return 1.0. Returns: float: Multiplicative factor following OU process. Always positive. Tends toward mean_level over time. Note: The path is cached on first call for efficiency. The process exhibits mean reversion: starting values far from the mean will tend to move toward it over time. """ if time < 0: return 1.0 if time == 0: return float(self.initial_level) # Generate or extend cached path if needed if self.cached_path is None or self.cached_times is None or time > self.cached_times[-1]: max_time = max(time * 1.5, 100.0) self._generate_path(max_time) # Interpolate from cached path assert self.cached_times is not None assert self.cached_path is not None return float(np.interp(time, self.cached_times, self.cached_path))
[docs] def reset_seed(self, seed: int) -> None: """Reset random seed and clear cached path. Args: seed: New random seed to use. """ super().reset_seed(seed) self.cached_path = None self.cached_times = None
[docs] class RegimeSwitchingTrend(Trend): """Trend that switches between different market regimes. Models discrete regime changes such as hard/soft insurance markets, economic cycles, or regulatory environments. Each regime has its own multiplier, and transitions occur stochastically based on probabilities. Attributes: regimes: List of regime multipliers. transition_matrix: Matrix of transition probabilities between regimes. initial_regime: Starting regime index. regime_persistence: How long regimes tend to last. cached_regimes: Cached regime path for efficiency. cached_times: Time points for the cached path. Examples: Hard/soft insurance market:: from ergodic_insurance.trends import RegimeSwitchingTrend # Two regimes: soft (0.9x) and hard (1.2x) markets market = RegimeSwitchingTrend( regimes=[0.9, 1.2], transition_probs=[[0.8, 0.2], # Soft -> [80% stay, 20% to hard] [0.3, 0.7]], # Hard -> [30% to soft, 70% stay] initial_regime=0, # Start in soft market seed=42 ) # Multiplier switches between 0.9 and 1.2 mult_5 = market.get_multiplier(5.0) Three-regime economic cycle:: # Recession, normal, boom economy = RegimeSwitchingTrend( regimes=[0.8, 1.0, 1.3], transition_probs=[ [0.6, 0.4, 0.0], # Recession [0.1, 0.7, 0.2], # Normal [0.0, 0.5, 0.5], # Boom ], initial_regime=1 # Start in normal ) """ def __init__( self, regimes: Optional[List[float]] = None, transition_probs: Optional[List[List[float]]] = None, initial_regime: int = 0, regime_persistence: float = 1.0, seed: Optional[int] = None, ): """Initialize regime-switching trend. Args: regimes: List of multipliers for each regime. Default is [0.9, 1.0, 1.2] (soft/normal/hard). transition_probs: Transition probability matrix where element [i][j] is P(regime j | regime i). Rows must sum to 1.0. Default creates persistent regimes. initial_regime: Starting regime index (0-based). Default is 0 (first regime). regime_persistence: Time scaling factor. Higher values make regimes last longer. Default is 1.0. seed: Random seed for reproducibility. Raises: ValueError: If transition probabilities don't sum to 1, if matrix dimensions don't match regime count, or if initial_regime is out of bounds. Examples: Custom regime configurations:: # Simple two-state with equal transitions simple = RegimeSwitchingTrend( regimes=[0.95, 1.05], transition_probs=[[0.5, 0.5], [0.5, 0.5]] ) # Highly persistent regimes persistent = RegimeSwitchingTrend( regimes=[0.8, 1.2], transition_probs=[[0.95, 0.05], [0.05, 0.95]], regime_persistence=2.0 # Double persistence ) # Four regulatory environments regulatory = RegimeSwitchingTrend( regimes=[0.7, 0.9, 1.1, 1.3], transition_probs=[ [0.7, 0.2, 0.1, 0.0], [0.2, 0.5, 0.2, 0.1], [0.1, 0.2, 0.5, 0.2], [0.0, 0.1, 0.2, 0.7] ] ) """ super().__init__(seed) # Default regimes: soft, normal, hard markets if regimes is None: regimes = [0.9, 1.0, 1.2] # Default transition matrix: persistent regimes if transition_probs is None: n = len(regimes) transition_probs = [] for i in range(n): row = [0.1 / (n - 1)] * n # Small prob to other states row[i] = 0.9 # High prob to stay transition_probs.append(row) # Validate inputs n_regimes = len(regimes) if len(transition_probs) != n_regimes: raise ValueError( f"transition_probs rows ({len(transition_probs)}) must match regime count ({n_regimes})" ) for i, row in enumerate(transition_probs): if len(row) != n_regimes: raise ValueError( f"transition_probs row {i} has {len(row)} elements, expected {n_regimes}" ) row_sum = sum(row) if abs(row_sum - 1.0) > 1e-6: raise ValueError(f"transition_probs row {i} sums to {row_sum}, must sum to 1.0") if initial_regime < 0 or initial_regime >= n_regimes: raise ValueError(f"initial_regime {initial_regime} out of bounds [0, {n_regimes-1}]") for i, regime in enumerate(regimes): if regime <= 0: raise ValueError(f"regime {i} multiplier must be > 0, got {regime}") self.regimes = regimes self.transition_matrix = np.array(transition_probs) self.initial_regime = initial_regime self.regime_persistence = regime_persistence self.cached_regimes: Optional[np.ndarray] = None self.cached_times: Optional[np.ndarray] = None def _generate_regime_path(self, max_time: float, dt: float = 0.1) -> None: """Generate and cache a regime path. Args: max_time: Maximum time to generate path for. dt: Time step for regime transitions. """ # Adjust dt by persistence factor effective_dt = dt * self.regime_persistence num_steps = int(max_time / effective_dt) + 1 times = np.linspace(0, max_time, num_steps) # Generate regime sequence regimes = np.zeros(num_steps, dtype=int) regimes[0] = self.initial_regime for i in range(1, num_steps): current_regime = regimes[i - 1] probs = self.transition_matrix[current_regime] regimes[i] = self.rng.choice(len(self.regimes), p=probs) self.cached_times = times self.cached_regimes = regimes
[docs] def get_multiplier(self, time: float) -> float: """Get regime-based multiplier at given time. Args: time: Time in years from start. Negative times return 1.0. Returns: float: Multiplicative factor for the active regime at time t. Changes discretely as regimes switch. Note: The regime path is cached on first call. Regime changes are stochastic but reproducible with the same seed. The actual regime durations depend on both transition probabilities and the regime_persistence parameter. """ if time < 0: return 1.0 if time == 0: return float(self.regimes[self.initial_regime]) # Generate or extend cached path if needed if self.cached_regimes is None or self.cached_times is None or time > self.cached_times[-1]: max_time = max(time * 1.5, 100.0) self._generate_regime_path(max_time) # Find regime at given time (step function, no interpolation) assert self.cached_times is not None assert self.cached_regimes is not None idx = int(np.searchsorted(self.cached_times, time, side="right")) - 1 idx = max(0, min(idx, len(self.cached_regimes) - 1)) regime_idx = self.cached_regimes[idx] return float(self.regimes[regime_idx])
[docs] def reset_seed(self, seed: int) -> None: """Reset random seed and clear cached regime path. Args: seed: New random seed to use. """ super().reset_seed(seed) self.cached_regimes = None self.cached_times = None
[docs] class ScenarioTrend(Trend): """Trend based on explicit scenario factors with interpolation. Allows specifying exact multiplicative factors at specific time points, with linear interpolation between points. Useful for modeling known future changes, regulatory impacts, or custom scenarios. Attributes: factors: List or dict of multiplicative factors. time_unit: Time unit for the factors ("annual" or "monthly"). interpolation: Interpolation method ("linear" or "step"). Examples: Annual scenario with known rates:: from ergodic_insurance.trends import ScenarioTrend # Year 0: 1.0, Year 1: 1.05, Year 2: 1.08, etc. scenario = ScenarioTrend( factors=[1.0, 1.05, 1.08, 1.06, 1.10], time_unit="annual" ) # Exact points mult_1 = scenario.get_multiplier(1.0) # 1.05 mult_2 = scenario.get_multiplier(2.0) # 1.08 # Interpolated mult_1_5 = scenario.get_multiplier(1.5) # ≈1.065 Monthly scenario:: # Monthly adjustment factors monthly = ScenarioTrend( factors=[1.0, 1.01, 1.02, 1.015, 1.025, 1.03], time_unit="monthly" ) # Month 3 (0.25 years) mult_3m = monthly.get_multiplier(0.25) Using dictionary for specific times:: # Specific time points custom = ScenarioTrend( factors={0: 1.0, 2: 1.1, 5: 1.2, 10: 1.5}, interpolation="linear" ) """ def __init__( self, factors: Union[List[float], Dict[float, float]], time_unit: str = "annual", interpolation: str = "linear", seed: Optional[int] = None, ): """Initialize scenario trend with explicit factors. Args: factors: Either: - List[float]: Factors at regular intervals (0, 1, 2, ...) - Dict[float, float]: {time: factor} pairs for irregular times time_unit: Time unit for the factors. Options: - "annual": Factors apply at yearly intervals (default) - "monthly": Factors apply at monthly intervals interpolation: Method for interpolating between points: - "linear": Linear interpolation (default) - "step": Step function (use previous value) seed: Included for interface consistency but unused. Raises: ValueError: If time_unit is not "annual" or "monthly", or if interpolation is not "linear" or "step". Examples: Various scenario configurations:: # Regular annual factors annual = ScenarioTrend([1.0, 1.1, 1.15, 1.18]) # Irregular time points irregular = ScenarioTrend({ 0: 1.0, 1.5: 1.08, 3: 1.15, 7: 1.25 }) # Monthly with step interpolation monthly_step = ScenarioTrend( factors=[1.0, 1.02, 1.05], time_unit="monthly", interpolation="step" ) """ super().__init__(seed) # Validate inputs if time_unit not in ["annual", "monthly"]: raise ValueError(f"time_unit must be 'annual' or 'monthly', got {time_unit}") if interpolation not in ["linear", "step"]: raise ValueError(f"interpolation must be 'linear' or 'step', got {interpolation}") self.time_unit = time_unit self.interpolation = interpolation # Convert to internal representation: sorted (time, factor) pairs if isinstance(factors, list): # List assumed to be at regular intervals starting from 0 time_scale = 1.0 if time_unit == "annual" else 1.0 / 12.0 self.time_factors = [(i * time_scale, f) for i, f in enumerate(factors)] elif isinstance(factors, dict): # Dictionary with explicit time points self.time_factors = sorted(factors.items()) else: raise TypeError("factors must be a list or dict") # Ensure we have at least one point if not self.time_factors: self.time_factors = [(0.0, 1.0)] # Extract times and factors for efficient interpolation self.times = np.array([t for t, _ in self.time_factors]) self.factors_array = np.array([f for _, f in self.time_factors])
[docs] def get_multiplier(self, time: float) -> float: """Get interpolated multiplier at given time. Args: time: Time in years from start. Can be fractional. Negative times return 1.0. Returns: float: Multiplicative factor, interpolated from scenario points. - Before first point: returns 1.0 - After last point: returns last factor - Between points: interpolated based on method Examples: Interpolation behavior:: scenario = ScenarioTrend([1.0, 1.1, 1.2, 1.15]) # Exact points mult_0 = scenario.get_multiplier(0.0) # 1.0 mult_2 = scenario.get_multiplier(2.0) # 1.2 # Linear interpolation mult_1_5 = scenario.get_multiplier(1.5) # 1.15 # Beyond range mult_neg = scenario.get_multiplier(-1.0) # 1.0 mult_10 = scenario.get_multiplier(10.0) # 1.15 (last) """ if time < 0: return 1.0 # Handle edge cases if time <= self.times[0]: return float(self.factors_array[0]) if time == self.times[0] else 1.0 if time >= self.times[-1]: return float(self.factors_array[-1]) # Interpolation if self.interpolation == "linear": # Linear interpolation using numpy return float(np.interp(time, self.times, self.factors_array)) # step # Find the largest time <= given time idx = np.searchsorted(self.times, time, side="right") - 1 return float(self.factors_array[idx])