Source code for greybox.rolling

"""Rolling origin evaluation.

This module provides functions for rolling origin cross-validation
for time series models.
"""

import inspect
import warnings
import numpy as np
from typing import Callable, Optional, Any


[docs] class RollingOriginResult: """Result of rolling origin evaluation. Attributes ---------- actuals : np.ndarray The original data. holdout : np.ndarray Matrix of actual holdout values, shape (h, origins). NaN where h_actual < h (co=False, near end of series). mean : np.ndarray Point forecasts, shape (h, origins). lower : np.ndarray, optional Lower interval bounds, shape (h, origins). Present if call returned 'lower' key/attribute. upper : np.ndarray, optional Upper interval bounds, shape (h, origins). Present if call returned 'upper' key/attribute. origins : int Number of rolling origins evaluated. h : int Forecast horizon. _fields : list[str] Names of all forecast fields stored (always includes 'mean'). """ def __init__( self, actuals: np.ndarray, holdout: np.ndarray, origins: int, h: int, fields: dict, ): self.actuals = actuals self.holdout = holdout self.origins = origins self.h = h self._fields = list(fields.keys()) for name, arr in fields.items(): setattr(self, name, arr) def __str__(self) -> str: # Detect constant vs decreasing holdout from NaN pattern has_nan = np.any(np.isnan(self.holdout)) window_type = "decreasing" if has_nan else "constant" lines = [ f"Rolling origin with {window_type} holdout was done.", f" Forecast horizon: {self.h}", f" Number of origins: {self.origins}", ] return "\n".join(lines) def __repr__(self) -> str: return self.__str__()
def _parse_call_result(result: Any, h_actual: int) -> dict: """Convert a call() return value to {field: array} dict. Supported return types: - np.ndarray or list → {"mean": array} - dict with string keys → one entry per key - object with .mean attr → {"mean": arr, "lower": arr, ...} - 3-tuple (mean, lower, upper) → {"mean", "lower", "upper"} """ def _to_arr(v): return np.asarray(v, dtype=float) if isinstance(result, np.ndarray) or isinstance(result, list): return {"mean": _to_arr(result)} if isinstance(result, tuple): if len(result) == 3: mean, lower, upper = result return { "mean": _to_arr(mean), "lower": _to_arr(lower), "upper": _to_arr(upper), } elif len(result) == 2: mean, lower = result return {"mean": _to_arr(mean), "lower": _to_arr(lower)} else: return {"mean": _to_arr(result[0])} if isinstance(result, dict): return {k: _to_arr(v) for k, v in result.items()} # Object with attributes out = {} for attr in ("mean", "lower", "upper"): if hasattr(result, attr): val = getattr(result, attr) if val is not None: out[attr] = _to_arr(val) if out: return out # Fallback: try to coerce directly return {"mean": _to_arr(result)}
[docs] def rolling_origin( data: np.ndarray, h: int = 10, origins: int = 10, step: int = 1, ci: bool = False, co: bool = True, call: Optional[Callable] = None, silent: bool = True, # deprecated model_fn: Optional[Callable] = None, predict_fn: Optional[Callable] = None, ) -> RollingOriginResult: """Rolling Origin Evaluation. Produces rolling origin forecasts using a single callable that accepts training data and horizon h. This is the standard approach for evaluating forecast accuracy on time series. Parameters ---------- data : np.ndarray Time series data (1-D). h : int, default=10 Forecast horizon (number of steps ahead). origins : int, default=10 Number of rolling origins. step : int, default=1 Number of observations to advance between origins. ci : bool, default=False If False (default), expanding window — training set grows each origin. If True, sliding window — training set has constant size. co : bool, default=True If True (default), constant holdout: all origins forecast exactly h steps. If False, decreasing holdout: later origins may forecast fewer than h steps (when fewer observations remain). call : callable ``call(data, h, **optional) -> forecasts`` ``data`` is the training slice (np.ndarray), ``h`` is the horizon. The function may optionally accept keyword arguments ``counti``, ``counto``, and/or ``countf`` (index arrays into the original series) — they are injected automatically if present in the signature. Return value can be: - ``np.ndarray`` of length h → stored as ``result.mean`` - ``dict`` with string keys → one attribute per key - object with ``.mean`` / ``.lower`` / ``.upper`` attributes - 3-tuple ``(mean, lower, upper)`` silent : bool, default=True If False, print progress information per origin. model_fn : callable, deprecated Old API: function ``train_data -> fitted_model``. predict_fn : callable, deprecated Old API: function ``(model, h) -> forecasts``. Returns ------- RollingOriginResult - ``.actuals`` — original data - ``.holdout`` — actual values, shape ``(h, origins)``, NaN-padded - ``.mean`` — point forecasts, shape ``(h, origins)`` - ``.lower`` / ``.upper`` — interval bounds (if returned by call) - ``.origins`` — number of origins evaluated - ``.h`` — forecast horizon - ``._fields`` — list of forecast field names Examples -------- >>> import numpy as np >>> from greybox import rolling_origin >>> np.random.seed(42) >>> y = np.cumsum(np.random.randn(100)) >>> result = rolling_origin(y, h=5, origins=10, ... call=lambda data, h: np.full(h, data.mean())) >>> print(result) Rolling origin with constant holdout was done. Forecast horizon: 5 Number of origins: 10 >>> result.mean.shape (5, 10) """ # --- backward compatibility --- if call is None and (model_fn is not None or predict_fn is not None): warnings.warn( "model_fn and predict_fn are deprecated. " "Use a single call=lambda data, h: ... instead.", DeprecationWarning, stacklevel=2, ) if model_fn is None or predict_fn is None: raise ValueError("Both model_fn and predict_fn must be provided together.") _mfn = model_fn _pfn = predict_fn call = lambda data, h: _pfn(_mfn(data), h) # noqa: E731 if call is None: raise ValueError( "call must be provided. " "Example: call=lambda data, h: np.full(h, data.mean())" ) data = np.asarray(data, dtype=float) n = len(data) # R formula: obsInSample = obs - (origins*step + (h-step)*co) obs_in_sample = n - (origins * step + (h - step) * int(co)) if obs_in_sample < 1: raise ValueError( f"obs_in_sample={obs_in_sample} < 1. Reduce origins, h, or step (n={n})." ) # Inspect call signature for optional index kwargs try: sig = inspect.signature(call) _call_params = set(sig.parameters.keys()) except (ValueError, TypeError): _call_params = set() _wants_counti = "counti" in _call_params _wants_counto = "counto" in _call_params _wants_countf = "countf" in _call_params # Allocate output matrices (h, origins), NaN-filled holdout_mat = np.full((h, origins), np.nan) # Forecast matrices allocated after first call to know fields fields_data: Optional[dict] = None n_done = 0 for i in range(origins): # Effective horizon for this origin if co: h_actual = h else: h_actual = min(h, step * (origins - i)) # In-sample index range train_end = obs_in_sample + step * i if not ci: # expanding: [0, train_end) counti = np.arange(0, train_end) else: # sliding: constant window of size obs_in_sample counti = np.arange(step * i, train_end) # Out-of-sample index range counto = np.arange(train_end, train_end + h_actual) countf = np.concatenate([counti, counto]) if train_end > n or (len(counto) > 0 and counto[-1] >= n): if not silent: print(f"Origin {i + 1}: out of bounds, stopping.") break train_data = data[counti] # Build kwargs for call extra_kwargs = {} if _wants_counti: extra_kwargs["counti"] = counti if _wants_counto: extra_kwargs["counto"] = counto if _wants_countf: extra_kwargs["countf"] = countf try: raw = call(train_data, h_actual, **extra_kwargs) except Exception as e: if not silent: print(f"Origin {i + 1}/{origins} failed: {e}") break parsed = _parse_call_result(raw, h_actual) # Ensure all arrays have length h_actual; pad to h with NaN for key, arr in parsed.items(): if len(arr) < h: padded = np.full(h, np.nan) padded[: len(arr)] = arr[:h_actual] parsed[key] = padded else: parsed[key] = arr[:h] # Initialise forecast matrices on first successful call if fields_data is None: fields_data = {k: np.full((h, origins), np.nan) for k in parsed} for key, arr in parsed.items(): if key in fields_data: fields_data[key][:, i] = arr # Fill holdout actual_h = data[counto] holdout_mat[:h_actual, i] = actual_h if not silent: print( f"Origin {i + 1}/{origins}: " f"train=[{counti[0]}:{counti[-1] + 1}], " f"holdout=[{counto[0]}:{counto[-1] + 1}], " f"h_actual={h_actual}" ) n_done += 1 if fields_data is None: # No origins succeeded; return empty result with mean only fields_data = {"mean": np.full((h, origins), np.nan)} return RollingOriginResult( actuals=data, holdout=holdout_mat[:, :n_done], origins=n_done, h=h, fields={k: v[:, :n_done] for k, v in fields_data.items()}, )