"""Comprehensive risk metrics suite for tail risk analysis.
This module provides industry-standard risk metrics including VaR, TVaR, PML,
and Expected Shortfall to quantify tail risk and support insurance optimization
decisions.
"""
from dataclasses import dataclass
import logging
from typing import Any, Dict, List, Literal, Optional, Tuple, Union, overload
import warnings
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy import stats
from .config import DEFAULT_RISK_FREE_RATE
logger = logging.getLogger(__name__)
[docs]
@dataclass
class RiskMetricsResult:
"""Container for risk metric calculation results."""
metric_name: str
value: float
confidence_level: Optional[float] = None
confidence_interval: Optional[Tuple[float, float]] = None
metadata: Optional[Dict[str, Any]] = None
[docs]
class RiskMetrics:
"""Calculate comprehensive risk metrics for loss distributions.
This class provides industry-standard risk metrics for analyzing
tail risk in insurance and financial applications.
**Sign convention:** By default, input values are treated as **losses**
(positive values = losses, negative values = gains). All percentile-based
metrics (VaR, TVaR, PML, Expected Shortfall) are computed on this loss
distribution.
If your data represents **returns** (positive values = gains), pass
``convention="return"`` and the class will automatically negate the
input internally so that the loss-based metrics remain correct.
A heuristic check is performed on construction: if more than 80 % of
finite values are negative under the ``"loss"`` convention, a warning is
emitted because the data likely represents returns rather than losses.
"""
def __init__(
self,
losses: np.ndarray,
weights: Optional[np.ndarray] = None,
seed: Optional[int] = None,
convention: Literal["loss", "return"] = "loss",
):
"""Initialize risk metrics calculator.
Args:
losses: Array of loss values. Interpretation depends on
*convention*: with ``"loss"`` (default) positive values are
losses; with ``"return"`` positive values are gains.
weights: Optional importance sampling weights.
seed: Random seed for bootstrap calculations.
convention: Sign convention of the input data.
* ``"loss"`` (default) — positive values represent losses.
* ``"return"`` — positive values represent gains. The data
is negated internally so that all metrics are computed in
loss space.
Raises:
ValueError: If losses array is empty, contains invalid values,
or *convention* is not ``"loss"`` or ``"return"``.
"""
if convention not in ("loss", "return"):
raise ValueError(f"convention must be 'loss' or 'return', got {convention!r}")
if len(losses) == 0:
raise ValueError("Losses array cannot be empty")
# Handle NaN and infinite values
valid_mask = np.isfinite(losses)
if not np.all(valid_mask):
logger.warning("Removing %d non-finite values", np.sum(~valid_mask))
losses = losses[valid_mask]
if weights is not None:
weights = weights[valid_mask]
# Heuristic sign-convention check
if convention == "loss" and len(losses) > 0:
neg_frac = np.sum(losses < 0) / len(losses)
if neg_frac > 0.8:
warnings.warn(
f"More than {neg_frac:.0%} of values are negative. "
"RiskMetrics expects losses (positive values = losses). "
"If these are returns, use convention='return' or negate "
"them first: RiskMetrics(-data)",
UserWarning,
stacklevel=2,
)
# Convert returns to losses when needed
if convention == "return":
losses = -np.asarray(losses)
self.losses = np.asarray(losses)
self.convention = convention
self.weights = weights
self.rng = np.random.default_rng(seed)
# Pre-calculate sorted losses for percentile-based metrics
if weights is None:
self._sorted_losses = np.sort(self.losses)
self._sorted_weights = None
else:
# Weighted sorting
sort_idx = np.argsort(self.losses)
self._sorted_losses = self.losses[sort_idx]
if self.weights is not None:
self._sorted_weights = self.weights[sort_idx]
self._cumulative_weights = np.cumsum(self._sorted_weights)
self._cumulative_weights /= self._cumulative_weights[-1]
else:
self._sorted_weights = None
@overload
def var(
self,
confidence: float = ...,
method: str = ...,
*,
bootstrap_ci: Literal[False] = ...,
n_bootstrap: int = ...,
) -> float: ...
@overload
def var(
self,
confidence: float = ...,
method: str = ...,
*,
bootstrap_ci: Literal[True],
n_bootstrap: int = ...,
) -> "RiskMetricsResult": ...
[docs]
def var(
self,
confidence: float = 0.99,
method: str = "empirical",
bootstrap_ci: bool = False,
n_bootstrap: int = 1000,
) -> Union[float, "RiskMetricsResult"]:
"""Calculate Value at Risk (VaR).
VaR represents the loss amount that will not be exceeded with
a given confidence level over a specific time period.
Args:
confidence: Confidence level (e.g., 0.99 for 99% VaR).
method: 'empirical' or 'parametric' (assumes normal distribution).
bootstrap_ci: Deprecated. Use ``var_with_ci()`` instead.
When True, delegates to ``var_with_ci()`` and returns a
``RiskMetricsResult``. Will be removed in a future release.
n_bootstrap: Deprecated. Use ``var_with_ci(n_bootstrap=...)`` instead.
Returns:
VaR value as a float. (When the deprecated *bootstrap_ci* flag is
True, returns ``RiskMetricsResult`` for backward compatibility.)
Raises:
ValueError: If confidence level is not in (0, 1).
"""
if not 0 < confidence < 1:
raise ValueError(f"Confidence must be in (0, 1), got {confidence}")
if method == "empirical":
var_value = self._empirical_var(confidence)
elif method == "parametric":
var_value = self._parametric_var(confidence)
else:
raise ValueError(f"Method must be 'empirical' or 'parametric', got {method}")
if bootstrap_ci:
warnings.warn(
"The 'bootstrap_ci' parameter is deprecated. " "Use var_with_ci() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.var_with_ci(
confidence=confidence,
method=method,
n_bootstrap=n_bootstrap,
)
return var_value
[docs]
def var_with_ci(
self,
confidence: float = 0.99,
method: str = "empirical",
n_bootstrap: int = 1000,
) -> "RiskMetricsResult":
"""Calculate Value at Risk (VaR) with bootstrap confidence intervals.
Args:
confidence: Confidence level (e.g., 0.99 for 99% VaR).
method: 'empirical' or 'parametric' (assumes normal distribution).
n_bootstrap: Number of bootstrap samples for CI calculation.
Returns:
RiskMetricsResult containing the VaR value and confidence interval.
Raises:
ValueError: If confidence level is not in (0, 1).
"""
if not 0 < confidence < 1:
raise ValueError(f"Confidence must be in (0, 1), got {confidence}")
if method == "empirical":
var_value = self._empirical_var(confidence)
elif method == "parametric":
var_value = self._parametric_var(confidence)
else:
raise ValueError(f"Method must be 'empirical' or 'parametric', got {method}")
ci = self._bootstrap_var_ci(confidence, n_bootstrap)
return RiskMetricsResult(
metric_name="VaR",
value=var_value,
confidence_level=confidence,
confidence_interval=ci,
metadata={"method": method},
)
def _empirical_var(self, confidence: float) -> float:
"""Calculate empirical VaR using percentiles."""
if self.weights is None:
return float(np.percentile(self.losses, confidence * 100))
# Weighted percentile
idx = np.searchsorted(self._cumulative_weights, confidence)
if idx >= len(self._sorted_losses):
idx = len(self._sorted_losses) - 1 # type: ignore
return float(self._sorted_losses[idx])
def _parametric_var(self, confidence: float) -> float:
"""Calculate parametric VaR assuming normal distribution."""
mean = np.average(self.losses, weights=self.weights)
if self.weights is None:
std = np.std(self.losses, ddof=1)
else:
variance = np.average((self.losses - mean) ** 2, weights=self.weights)
std = np.sqrt(variance)
return float(mean + std * stats.norm.ppf(confidence))
def _bootstrap_var_ci(self, confidence: float, n_bootstrap: int) -> Tuple[float, float]:
"""Calculate bootstrap confidence interval for VaR."""
n = len(self.losses)
chunk_size = max(1, min(n_bootstrap, 10_000_000 // max(1, n)))
var_bootstrap = np.empty(n_bootstrap)
if self.weights is None:
for start in range(0, n_bootstrap, chunk_size):
end = min(start + chunk_size, n_bootstrap)
batch = end - start
all_idx = self.rng.choice(n, size=(batch, n), replace=True)
all_samples = self.losses[all_idx]
var_bootstrap[start:end] = np.percentile(all_samples, confidence * 100, axis=1)
else:
for start in range(0, n_bootstrap, chunk_size):
end = min(start + chunk_size, n_bootstrap)
batch = end - start
all_idx = self.rng.choice(n, size=(batch, n), replace=True)
all_samples = self.losses[all_idx]
all_weights = self.weights[all_idx]
sort_idx = np.argsort(all_samples, axis=1)
sorted_samples = np.take_along_axis(all_samples, sort_idx, axis=1)
sorted_weights = np.take_along_axis(all_weights, sort_idx, axis=1)
cum_weights = np.cumsum(sorted_weights, axis=1)
cum_weights /= cum_weights[:, -1:]
# Vectorized searchsorted: find first index where cum_weights >= confidence
found = cum_weights >= confidence
idx_var = np.argmax(found, axis=1)
# argmax returns 0 when no match; guard those rows
no_match = ~found.any(axis=1)
idx_var[no_match] = n - 1
var_bootstrap[start:end] = sorted_samples[np.arange(batch), idx_var]
result = np.percentile(var_bootstrap, [2.5, 97.5])
return (float(result[0]), float(result[1]))
def _bootstrap_tvar_ci(self, confidence: float, n_bootstrap: int) -> Tuple[float, float]:
"""Calculate bootstrap confidence interval for TVaR."""
n = len(self.losses)
chunk_size = max(1, min(n_bootstrap, 10_000_000 // max(1, n)))
tvar_bootstrap = np.empty(n_bootstrap)
if self.weights is None:
for start in range(0, n_bootstrap, chunk_size):
end = min(start + chunk_size, n_bootstrap)
batch = end - start
all_idx = self.rng.choice(n, size=(batch, n), replace=True)
all_samples = self.losses[all_idx]
var_values = np.percentile(all_samples, confidence * 100, axis=1)
# Boolean mask: which samples are in the tail
mask = all_samples >= var_values[:, np.newaxis]
masked_samples = all_samples * mask
tail_sums = masked_samples.sum(axis=1)
tail_counts = mask.sum(axis=1)
# Fallback to VaR where no tail samples exist
has_tail = tail_counts > 0
tvar_bootstrap[start:end] = np.where(
has_tail, tail_sums / np.maximum(tail_counts, 1), var_values
)
else:
for start in range(0, n_bootstrap, chunk_size):
end = min(start + chunk_size, n_bootstrap)
batch = end - start
all_idx = self.rng.choice(n, size=(batch, n), replace=True)
all_samples = self.losses[all_idx]
all_weights = self.weights[all_idx]
# Compute weighted VaR for each bootstrap sample
sort_idx = np.argsort(all_samples, axis=1)
sorted_samples = np.take_along_axis(all_samples, sort_idx, axis=1)
sorted_weights = np.take_along_axis(all_weights, sort_idx, axis=1)
cum_weights = np.cumsum(sorted_weights, axis=1)
cum_weights /= cum_weights[:, -1:]
found = cum_weights >= confidence
idx_var = np.argmax(found, axis=1)
no_match = ~found.any(axis=1)
idx_var[no_match] = n - 1
var_values = sorted_samples[np.arange(batch), idx_var]
# Weighted tail average
mask = all_samples >= var_values[:, np.newaxis]
weighted_tail_sums = (all_samples * all_weights * mask).sum(axis=1)
weight_sums = (all_weights * mask).sum(axis=1)
has_tail = weight_sums > 0
tvar_bootstrap[start:end] = np.where(
has_tail,
weighted_tail_sums / np.maximum(weight_sums, 1e-300),
var_values,
)
result = np.percentile(tvar_bootstrap, [2.5, 97.5])
return (float(result[0]), float(result[1]))
[docs]
def tvar(
self,
confidence: float = 0.99,
var_value: Optional[float] = None,
) -> float:
"""Calculate Tail Value at Risk (TVaR/CVaR).
TVaR represents the expected loss given that the loss exceeds VaR.
It's a coherent risk measure that satisfies sub-additivity.
Args:
confidence: Confidence level for VaR threshold.
var_value: Pre-calculated VaR value (if None, will calculate).
Returns:
TVaR value as a float.
"""
if var_value is None:
var_value = self.var(confidence)
if self.weights is None:
tail_losses = self.losses[self.losses >= var_value]
if len(tail_losses) == 0:
return float(var_value) if var_value is not None else 0.0
return float(np.mean(tail_losses))
mask = self.losses >= var_value
if not np.any(mask):
return float(var_value) if var_value is not None else 0.0
tail_losses = self.losses[mask]
tail_weights = self.weights[mask]
return float(np.average(tail_losses, weights=tail_weights))
[docs]
def tvar_with_ci(
self,
confidence: float = 0.99,
n_bootstrap: int = 1000,
) -> "RiskMetricsResult":
"""Calculate Tail Value at Risk (TVaR/CVaR) with bootstrap confidence intervals.
Args:
confidence: Confidence level for VaR threshold.
n_bootstrap: Number of bootstrap samples for CI calculation.
Returns:
RiskMetricsResult containing the TVaR value and confidence interval.
"""
tvar_value = self.tvar(confidence)
ci = self._bootstrap_tvar_ci(confidence, n_bootstrap)
return RiskMetricsResult(
metric_name="TVaR",
value=tvar_value,
confidence_level=confidence,
confidence_interval=ci,
)
[docs]
def expected_shortfall(
self,
threshold: float,
) -> float:
"""Calculate Expected Shortfall (ES) above a threshold.
ES is the average of all losses that exceed a given threshold.
Delegates to tvar() with a pre-computed VaR value.
Args:
threshold: Loss threshold.
Returns:
Expected shortfall value, or 0.0 if no losses exceed threshold.
"""
# Check if any losses exceed the threshold first
if self.weights is None:
tail_losses = self.losses[self.losses >= threshold]
if len(tail_losses) == 0:
return 0.0
else:
mask = self.losses >= threshold
if not np.any(mask):
return 0.0
# Delegate to tvar with pre-computed threshold as the VaR value
return self.tvar(var_value=threshold)
[docs]
def pml(self, return_period: int) -> float:
"""Calculate Probable Maximum Loss (PML) for a given return period.
PML represents the loss amount expected to be equaled or exceeded
once every 'return_period' years on average.
Args:
return_period: Return period in years (e.g., 100 for 100-year event).
Returns:
PML value.
Raises:
ValueError: If return period is less than 1.
"""
if return_period < 1:
raise ValueError(f"Return period must be >= 1, got {return_period}")
# PML corresponds to the (1 - 1/return_period) percentile
confidence = 1 - 1 / return_period
return self.var(confidence)
[docs]
def conditional_tail_expectation(
self,
confidence: float = 0.99,
) -> float:
"""Calculate Conditional Tail Expectation (CTE).
CTE is similar to TVaR but uses a slightly different calculation method.
It's the expected value of losses that exceed the VaR threshold.
Args:
confidence: Confidence level.
Returns:
CTE value.
"""
# CTE is essentially the same as TVaR in our implementation
return self.tvar(confidence)
[docs]
def maximum_drawdown(self) -> float:
"""Calculate Maximum Drawdown on cumulative losses.
Computes the largest peak-to-trough decline in the **cumulative sum
of losses**, not in portfolio value. This measures the worst
stretch of accumulated losses and is useful for sizing reserves.
It is *not* the same as the standard portfolio-return drawdown
commonly used in asset management.
.. note::
When ``convention="return"`` was used at construction, the stored
losses are the negated returns. The drawdown is therefore computed
on the cumulative *negated* returns, which represents the
cumulative loss experienced by the portfolio.
Returns:
Maximum drawdown value (non-negative).
"""
# Calculate cumulative sum with overflow protection
with np.errstate(over="ignore"):
if self.weights is not None:
# For weighted data, use weighted cumulative sum
cumsum = np.cumsum(self.losses * self.weights)
else:
cumsum = np.cumsum(self.losses)
# Handle any overflow by replacing inf values
if not np.all(np.isfinite(cumsum)):
max_val = np.finfo(np.float64).max / 100 # Leave some headroom
cumsum = np.where(np.isfinite(cumsum), cumsum, max_val)
# Calculate running maximum with overflow protection
with np.errstate(over="ignore"):
running_max = np.maximum.accumulate(cumsum)
# Calculate drawdown
drawdown = running_max - cumsum
return float(np.max(drawdown))
[docs]
def economic_capital(
self,
confidence: float = 0.999,
expected_loss: Optional[float] = None,
) -> float:
"""Calculate Economic Capital requirement.
Economic capital is the amount of capital needed to cover
unexpected losses at a given confidence level.
Args:
confidence: Confidence level (typically 99.9% for regulatory).
expected_loss: Expected loss (if None, will calculate mean).
Returns:
Economic capital requirement.
"""
var_val = self.var(confidence)
if expected_loss is None:
expected_loss = np.average(self.losses, weights=self.weights)
# Economic capital = VaR - Expected Loss
return max(0, var_val - expected_loss)
[docs]
def return_period_curve(
self,
return_periods: Optional[np.ndarray] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""Generate return period curve (exceedance probability curve).
Args:
return_periods: Array of return periods to calculate.
If None, uses standard periods.
Returns:
Tuple of (return_periods, loss_values).
"""
if return_periods is None:
return_periods = np.array([2, 5, 10, 25, 50, 100, 200, 250, 500, 1000])
# Validate all return periods at once
if np.any(return_periods < 1):
raise ValueError(
f"Return period must be >= 1, got {return_periods[return_periods < 1][0]}"
)
# Compute all percentiles in a single vectorized call instead of
# looping through pml() -> var() -> np.percentile() one at a time.
# Fixes #506.
confidences = 1 - 1 / return_periods
if self.weights is None:
loss_values = np.percentile(self.losses, confidences * 100)
else:
# Weighted percentile: reuse the pre-sorted data
indices = np.searchsorted(self._cumulative_weights, confidences)
indices = np.clip(indices, 0, len(self._sorted_losses) - 1)
loss_values = self._sorted_losses[indices].astype(float)
return return_periods, loss_values
[docs]
def tail_index(self, threshold: Optional[float] = None) -> float:
"""Estimate the Pareto tail index alpha via Hill's method.
Computes the Pareto shape parameter alpha (= 1 / gamma), where
gamma is the extreme value index from Hill (1975). Larger alpha
means thinner tails; smaller alpha means heavier tails.
Note:
The classical Hill estimator returns gamma = (1/k) * sum(ln(X_i/u)).
This method returns its reciprocal, alpha = k / sum(ln(X_i/u)),
which is the maximum-likelihood estimate of the Pareto shape
parameter. To recover the Hill gamma, compute ``1 / tail_index()``.
Args:
threshold: Threshold for tail definition (if None, uses 90th percentile).
Returns:
Estimated Pareto shape parameter alpha (= 1 / Hill gamma).
"""
if threshold is None:
threshold = np.percentile(self.losses, 90)
tail_losses = self.losses[self.losses > threshold]
if len(tail_losses) < 2:
return np.nan
# Pareto alpha MLE via Hill's method (reciprocal of Hill gamma)
k = len(tail_losses)
hill_estimate = k / np.sum(np.log(tail_losses / threshold))
return float(hill_estimate)
[docs]
def risk_adjusted_metrics(
self,
returns: Optional[np.ndarray] = None,
risk_free_rate: float = DEFAULT_RISK_FREE_RATE,
) -> Dict[str, float]:
"""Calculate risk-adjusted return metrics.
When *returns* is ``None`` the method derives returns as
``-self.losses``. Because the ``"return"`` convention negates the
input on construction, ``-self.losses`` correctly recovers the
original return values regardless of which convention was used.
Args:
returns: Array of returns (if None, uses negative of losses).
risk_free_rate: Risk-free rate for Sharpe ratio calculation.
Returns:
Dictionary of risk-adjusted metrics.
"""
if returns is None:
# Convert losses to returns (negative losses)
returns = -self.losses
if self.weights is None:
mean_return = np.mean(returns)
std_return = np.std(returns, ddof=1)
else:
mean_return = np.average(returns, weights=self.weights)
variance = np.average((returns - mean_return) ** 2, weights=self.weights)
# Bessel correction for reliability weights:
# corrected = pop_variance * V1^2 / (V1^2 - V2)
# where V1 = sum(w), V2 = sum(w^2)
v1 = np.sum(self.weights)
v2 = np.sum(self.weights**2)
denom = v1**2 - v2
if denom > 0:
variance = variance * v1**2 / denom
std_return = np.sqrt(variance)
# Sharpe ratio
sharpe = (mean_return - risk_free_rate) / std_return if std_return > 0 else 0
# Sortino ratio (downside deviation over all observations)
# DD = sqrt( (1/N) * sum( min(r_i - target, 0)^2 ) )
# Reference: Sortino & Price (1994)
downside_deviations = np.minimum(returns - risk_free_rate, 0)
if self.weights is None:
downside_std = np.sqrt(np.mean(downside_deviations**2))
else:
downside_std = np.sqrt(np.average(downside_deviations**2, weights=self.weights))
if downside_std > 0:
sortino = (mean_return - risk_free_rate) / downside_std
else:
sortino = np.inf if mean_return > risk_free_rate else 0
return {
"sharpe_ratio": sharpe,
"sortino_ratio": sortino,
"mean_return": mean_return,
"volatility": std_return,
}
[docs]
def coherence_test(self) -> Dict[str, bool]:
"""Test coherence properties of risk measures.
A coherent risk measure satisfies:
1. Monotonicity
2. Sub-additivity
3. Positive homogeneity
4. Translation invariance
Returns:
Dictionary indicating which properties are satisfied.
"""
# This is a simplified test - full testing would require multiple portfolios
results = {}
# Test positive homogeneity for TVaR
tvar_original = self.tvar(0.99)
scaled_losses = self.losses * 2
metrics_scaled = RiskMetrics(scaled_losses, self.weights)
tvar_scaled = metrics_scaled.tvar(0.99)
results["tvar_positive_homogeneity"] = np.isclose(tvar_scaled, 2 * tvar_original, rtol=0.01)
# Test translation invariance
shift = 1000
shifted_losses = self.losses + shift
metrics_shifted = RiskMetrics(shifted_losses, self.weights)
tvar_shifted = metrics_shifted.tvar(0.99)
results["tvar_translation_invariance"] = np.isclose(
tvar_shifted, tvar_original + shift, rtol=0.01
)
return {k: bool(v) for k, v in results.items()}
[docs]
def summary_statistics(self) -> Dict[str, float]:
"""Calculate comprehensive summary statistics.
Returns:
Dictionary of summary statistics.
"""
if self.weights is None:
mean = np.mean(self.losses)
std = np.std(self.losses)
skew = stats.skew(self.losses)
kurt = stats.kurtosis(self.losses)
median = np.median(self.losses)
else:
mean = np.average(self.losses, weights=self.weights)
variance = np.average((self.losses - mean) ** 2, weights=self.weights)
std = np.sqrt(variance)
# Weighted skewness
m3 = np.average((self.losses - mean) ** 3, weights=self.weights)
skew = m3 / (std**3) if std > 0 else 0
# Weighted kurtosis
m4 = np.average((self.losses - mean) ** 4, weights=self.weights)
kurt = (m4 / (std**4) - 3) if std > 0 else 0
# Weighted median
idx = int(np.searchsorted(self._cumulative_weights, 0.5))
idx = min(idx, len(self._sorted_losses) - 1)
median = self._sorted_losses[idx]
return {
"mean": mean,
"median": median,
"std": std,
"skewness": skew,
"kurtosis": kurt,
"min": np.min(self.losses),
"max": np.max(self.losses),
"count": len(self.losses),
}
[docs]
def plot_distribution( # pylint: disable=too-many-locals
self,
bins: int = 50,
show_metrics: bool = True,
confidence_levels: Optional[List[float]] = None,
figsize: Tuple[int, int] = (12, 8),
) -> plt.Figure:
"""Plot loss distribution with risk metrics overlay.
Args:
bins: Number of bins for histogram.
show_metrics: Whether to show VaR and TVaR lines.
confidence_levels: Confidence levels for metrics to show.
figsize: Figure size.
Returns:
Matplotlib figure object.
"""
if confidence_levels is None:
confidence_levels = [0.95, 0.99]
fig, axes = plt.subplots(2, 2, figsize=figsize)
# Histogram
ax = axes[0, 0]
if self.weights is None:
ax.hist(self.losses, bins=bins, density=True, alpha=0.7, edgecolor="black")
else:
ax.hist(
self.losses,
bins=bins,
weights=self.weights,
density=True,
alpha=0.7,
edgecolor="black",
)
if show_metrics:
colors = ["red", "orange", "yellow"]
for i, conf in enumerate(confidence_levels[:3]):
var_val = self.var(conf)
tvar_val = self.tvar(conf)
color = colors[i % len(colors)]
ax.axvline(var_val, color=color, linestyle="--", label=f"VaR {conf:.0%}")
ax.axvline(tvar_val, color=color, linestyle=":", label=f"TVaR {conf:.0%}")
ax.set_xlabel("Loss Amount")
ax.set_ylabel("Density")
ax.set_title("Loss Distribution")
ax.legend()
ax.grid(True, alpha=0.3)
# Q-Q plot
ax = axes[0, 1]
if self.weights is None:
stats.probplot(self.losses, dist="norm", plot=ax)
else:
# Weighted Q-Q plot approximation
theoretical_quantiles = stats.norm.ppf(self._cumulative_weights)
theoretical_quantiles = theoretical_quantiles[np.isfinite(theoretical_quantiles)]
empirical_quantiles = self._sorted_losses[: len(theoretical_quantiles)]
ax.scatter(theoretical_quantiles, empirical_quantiles, alpha=0.5)
ax.plot(
[theoretical_quantiles.min(), theoretical_quantiles.max()],
[theoretical_quantiles.min(), theoretical_quantiles.max()],
"r--",
)
ax.set_xlabel("Theoretical Quantiles")
ax.set_ylabel("Sample Quantiles")
ax.set_title("Q-Q Plot (Normal)")
ax.grid(True, alpha=0.3)
# Return period curve
ax = axes[1, 0]
periods, losses = self.return_period_curve()
ax.semilogx(periods, losses, "o-", linewidth=2, markersize=6)
ax.set_xlabel("Return Period (years)")
ax.set_ylabel("Loss Amount")
ax.set_title("Return Period Curve")
ax.grid(True, alpha=0.3, which="both")
# Risk metrics summary
ax = axes[1, 1]
ax.axis("off")
metrics_text = "Risk Metrics Summary\n" + "=" * 30 + "\n"
for conf in confidence_levels:
var_val = self.var(conf)
tvar_val = self.tvar(conf)
metrics_text += f"\nConfidence Level: {conf:.1%}\n"
metrics_text += f" VaR: ${var_val:,.0f}\n"
metrics_text += f" TVaR: ${tvar_val:,.0f}\n"
pml_periods = [100, 250]
metrics_text += "\nPML Values:\n"
for period in pml_periods:
pml_val = self.pml(period)
metrics_text += f" {period}-year: ${pml_val:,.0f}\n"
var_99 = self.var(0.99)
es_99 = self.expected_shortfall(var_99)
metrics_text += f"\nExpected Shortfall (99%): ${es_99:,.0f}\n"
ec = self.economic_capital(0.999)
metrics_text += f"Economic Capital (99.9%): ${ec:,.0f}\n"
ax.text(
0.1,
0.5,
metrics_text,
transform=ax.transAxes,
fontsize=10,
verticalalignment="center",
family="monospace",
)
plt.tight_layout()
return fig
[docs]
def compare_risk_metrics(
scenarios: Dict[str, np.ndarray],
confidence_levels: Optional[List[float]] = None,
) -> pd.DataFrame:
"""Compare risk metrics across multiple scenarios.
Args:
scenarios: Dictionary mapping scenario names to loss arrays.
confidence_levels: Confidence levels to evaluate.
Returns:
DataFrame with comparative metrics.
"""
if confidence_levels is None:
confidence_levels = [0.95, 0.99, 0.995]
results = []
for scenario_name, losses in scenarios.items():
metrics = RiskMetrics(losses)
stats = metrics.summary_statistics()
row = {"scenario": scenario_name, **stats}
for conf in confidence_levels:
row[f"var_{conf:.1%}"] = metrics.var(conf)
row[f"tvar_{conf:.1%}"] = metrics.tvar(conf)
row["pml_100yr"] = metrics.pml(100)
row["pml_250yr"] = metrics.pml(250)
row["max_drawdown"] = metrics.maximum_drawdown()
row["economic_capital"] = metrics.economic_capital(0.999)
results.append(row)
return pd.DataFrame(results)
[docs]
class ROEAnalyzer:
"""Comprehensive ROE analysis framework.
This class provides specialized metrics and analysis tools for
Return on Equity (ROE) calculations, including time-weighted averages,
component breakdowns, and volatility analysis.
"""
def __init__(self, roe_series: np.ndarray, equity_series: Optional[np.ndarray] = None):
"""Initialize ROE analyzer.
Args:
roe_series: Array of ROE values over time.
equity_series: Optional array of equity values for weighted calculations.
"""
self.roe_series = np.asarray(roe_series)
self.equity_series = np.asarray(equity_series) if equity_series is not None else None
# Filter out NaN values for clean analysis
self.valid_mask = ~np.isnan(self.roe_series)
self.valid_roe = self.roe_series[self.valid_mask]
[docs]
def time_weighted_average(self) -> float:
"""Calculate time-weighted average ROE using geometric mean.
Time-weighted average gives equal weight to each period regardless
of the equity level, providing a measure of consistent performance.
Returns:
Time-weighted average ROE.
"""
if len(self.valid_roe) == 0:
return 0.0
# Convert to growth factors and compute geometric mean
growth_factors = 1 + self.valid_roe
# Handle negative growth factors by using arithmetic mean as fallback
if np.any(growth_factors <= 0):
return float(np.mean(self.valid_roe))
return float(np.exp(np.mean(np.log(growth_factors))) - 1)
[docs]
def equity_weighted_average(self) -> float:
"""Calculate equity-weighted average ROE.
Equity-weighted average gives more weight to periods with higher
equity levels, reflecting the actual dollar impact.
Returns:
Equity-weighted average ROE.
"""
if self.equity_series is None or len(self.valid_roe) == 0:
return self.time_weighted_average()
valid_equity = self.equity_series[self.valid_mask]
if np.sum(valid_equity) == 0:
return 0.0
weights = valid_equity / np.sum(valid_equity)
return float(np.sum(self.valid_roe * weights))
[docs]
def rolling_statistics(self, window: int) -> Dict[str, np.ndarray]:
"""Calculate rolling window statistics for ROE.
Args:
window: Window size in periods.
Returns:
Dictionary with rolling mean, std, min, max arrays.
"""
n = len(self.roe_series)
if window > n:
raise ValueError(f"Window {window} larger than series length {n}")
# Use pandas rolling for vectorized computation instead of a Python
# for-loop. min_periods=1 mirrors the original NaN-skipping behaviour
# while pd.Series.rolling(min_periods=window) ensures the first
# (window-1) entries stay NaN. Fixes #483.
series = pd.Series(self.roe_series)
rolling = series.rolling(window, min_periods=window)
roll_mean = rolling.mean().to_numpy()
# ddof=0 matches the original np.std (population std)
roll_std = rolling.std(ddof=0).to_numpy()
roll_min = rolling.min().to_numpy()
roll_max = rolling.max().to_numpy()
risk_free_rate = DEFAULT_RISK_FREE_RATE
# Sharpe = (mean - risk_free) / std, only where std > 0
roll_sharpe = np.full(n, np.nan)
valid = (~np.isnan(roll_std)) & (roll_std > 0)
roll_sharpe[valid] = (roll_mean[valid] - risk_free_rate) / roll_std[valid]
return {
"mean": roll_mean,
"std": roll_std,
"min": roll_min,
"max": roll_max,
"sharpe": roll_sharpe,
}
[docs]
def volatility_metrics(self) -> Dict[str, float]:
"""Calculate comprehensive volatility metrics for ROE.
Returns:
Dictionary with volatility measures.
"""
if len(self.valid_roe) < 2:
return {
"standard_deviation": 0.0,
"downside_deviation": 0.0,
"upside_deviation": 0.0,
"semi_variance": 0.0,
"coefficient_variation": 0.0,
}
mean_roe = np.mean(self.valid_roe)
std_roe = np.std(self.valid_roe)
# Downside deviation (below mean, over all observations)
downside_deviations = np.minimum(self.valid_roe - mean_roe, 0)
downside_dev = np.sqrt(np.mean(downside_deviations**2))
# Upside deviation (above mean)
above_mean = self.valid_roe[self.valid_roe > mean_roe]
upside_dev = np.std(above_mean) if len(above_mean) > 0 else 0.0
# Semi-variance (below target, using 0 as target, over ALL observations)
# SV = (1/N) * sum( min(r_i - target, 0)^2 )
semi_var = float(np.mean(np.minimum(self.valid_roe, 0) ** 2))
# Coefficient of variation
cv = std_roe / abs(mean_roe) if mean_roe != 0 else float("inf")
return {
"standard_deviation": std_roe,
"downside_deviation": downside_dev,
"upside_deviation": upside_dev,
"semi_variance": semi_var,
"coefficient_variation": cv,
}
def _calculate_max_drawdown(self) -> float:
"""Calculate maximum drawdown for ROE series.
Returns:
Maximum drawdown value.
"""
if len(self.valid_roe) < 2:
return 0.0
# Calculate cumulative returns with overflow protection
# Use log-space calculation to avoid overflow
try:
# Clip extreme values to prevent overflow
clipped_roe = np.clip(self.valid_roe, -0.99, 10.0)
# Calculate cumulative returns
with np.errstate(over="raise"):
cumulative = np.cumprod(1 + clipped_roe)
except (FloatingPointError, OverflowError):
# Fallback to log-space calculation
log_returns = np.log1p(np.clip(self.valid_roe, -0.99, 10.0))
with np.errstate(over="ignore"):
cumulative = np.exp(np.cumsum(log_returns))
# Handle overflow in exp
if not np.all(np.isfinite(cumulative)):
cumulative = np.where(np.isfinite(cumulative), cumulative, 1e10)
# Handle any remaining inf/nan values
if not np.all(np.isfinite(cumulative)):
# Replace inf/nan with large but finite values
cumulative = np.where(np.isfinite(cumulative), cumulative, 1e10)
running_max = np.maximum.accumulate(cumulative)
# Avoid division by zero or near-zero
with np.errstate(divide="ignore", invalid="ignore"):
drawdown = (cumulative - running_max) / np.maximum(running_max, 1e-10)
drawdown = np.where(np.isfinite(drawdown), drawdown, 0.0)
return float(np.min(drawdown))
[docs]
def distribution_analysis(self) -> Dict[str, float]:
"""Analyze the distribution of ROE values.
Returns:
Dictionary with distribution statistics.
"""
if len(self.valid_roe) == 0:
return {
"mean": 0.0,
"median": 0.0,
"skewness": 0.0,
"kurtosis": 0.0,
"percentile_5": 0.0,
"percentile_25": 0.0,
"percentile_75": 0.0,
"percentile_95": 0.0,
}
return {
"mean": np.mean(self.valid_roe),
"median": np.median(self.valid_roe),
"skewness": stats.skew(self.valid_roe) if len(self.valid_roe) > 2 else 0.0,
"kurtosis": stats.kurtosis(self.valid_roe) if len(self.valid_roe) > 3 else 0.0,
"percentile_5": np.percentile(self.valid_roe, 5),
"percentile_25": np.percentile(self.valid_roe, 25),
"percentile_75": np.percentile(self.valid_roe, 75),
"percentile_95": np.percentile(self.valid_roe, 95),
}
[docs]
def stability_analysis(self, periods: Optional[List[int]] = None) -> Dict[str, Any]:
"""Analyze ROE stability across different time periods.
Args:
periods: List of period lengths to analyze (default: [1, 3, 5, 10]).
Returns:
Dictionary with stability metrics for each period.
"""
if periods is None:
periods = [1, 3, 5, 10]
stability_metrics = {}
for period in periods:
if period > len(self.roe_series):
continue
rolling_stats = self.rolling_statistics(period)
stability_metrics[f"{period}yr"] = {
"mean_stability": 1
- np.nanstd(rolling_stats["mean"]) / (np.nanmean(rolling_stats["mean"]) + 1e-10),
"volatility_stability": 1
- np.nanstd(rolling_stats["std"]) / (np.nanmean(rolling_stats["std"]) + 1e-10),
"range": np.nanmax(rolling_stats["max"]) - np.nanmin(rolling_stats["min"]),
"consistency": np.sum(rolling_stats["mean"] > 0)
/ np.sum(~np.isnan(rolling_stats["mean"])),
}
return stability_metrics