Source code for caliber.regression.conformal_regression.cvplus.base

from copy import deepcopy
from typing import Any

import numpy as np
from numpy.typing import NDArray
from tqdm import tqdm

from caliber.regression.conformal_regression.base import AbstractRegressionModel


[docs] class CVPlusRegressionModel(AbstractRegressionModel): """ A conformalised bootstrap model based on cross-validation (CV). Given inputs and targets, it trains an arbitrary model multiple times with CV and stores the prediction error on the left-out inputs. At prediction time, it provides a confidence interval around the mean predicted, with quantiles corrected via the errors stored at training time. Given a coverage level `alpha`, a target variable `Y` and the predicted confidence interval `[Q1, Q2]`, if the training and test sets are IID the algorithm ensures that `P(Y in [Q1, Q2]) >= alpha`. """ def __init__( self, model: Any, coverage: float, num_folds: int = 5, seed: int = 0, cv_prediction: bool = False, max_validation_fold_size: int = 1000 ) -> None: """ Args: model (Any): An instantiated model class with `fit` and `predict` methods. coverage (float): A coverage value between 0 and 1. For example, `coverage=0.95` means that the target variable will be expected to lay within the confidence interval 95% of the times. num_folds (int, optional): The number of CV folds. Defaults to 5. seed (int, optional): The random seed. Defaults to 0. cv_prediction (bool, optional): Whether to predict the mean prediction using the cv models or rather a single model. Defaults to False. """ self._model = model self._coverage = coverage self._num_folds = num_folds self._cv_prediction = cv_prediction self._rng = np.random.default_rng(seed) self._max_validation_fold_size = max_validation_fold_size
[docs] def fit(self, inputs: NDArray[np.float64], targets: NDArray[np.float64]) -> NDArray[np.float64]: num_inputs = len(inputs) fold_size = num_inputs // self._num_folds perm = self._rng.choice(num_inputs, size=num_inputs, replace=False) fold_indices = [[perm[j * fold_size + i] for i in range(fold_size)] for j in range(self._num_folds)] fold_indices[-1].extend(perm[self._num_folds * fold_size:].tolist()) self._models = [] self._cv_errors = [] for i in tqdm(range(self._num_folds), desc="Cross-Validation"): train_indices = sum(fold_indices[:i] + fold_indices[i+1:], []) val_indices = fold_indices[i][:self._max_validation_fold_size] train_inputs, train_targets = inputs[train_indices, :], targets[train_indices] val_inputs, val_targets = inputs[val_indices], targets[val_indices] model = deepcopy(self._model) model.fit(train_inputs, train_targets) val_preds = model.predict(val_inputs) if val_preds.ndim == 2: if val_preds.shape[1] != 1: raise ValueError("This method is supported only for scalar targets.") val_preds = val_preds.squeeze(1) self._cv_errors.append(np.abs(val_targets - val_preds)) self._models.append(model) if not self._cv_prediction: self._model.fit(inputs, targets)
[docs] def predict(self, inputs: NDArray[np.float64]) -> NDArray[np.float64]: if self._cv_prediction: preds = np.zeros(len(inputs)) for model in self._models: preds_i = model.predict(inputs) if preds_i.ndim == 2: if preds_i.shape[1] != 1: raise ValueError("This method is supported only for scalar targets.") preds_i = preds_i.squeeze(1) preds += preds_i preds /= self._loo_size return preds return self._model.predict(inputs)
[docs] def predict_quantiles(self, inputs: NDArray[np.float64]) -> NDArray[np.float64]: preds = [] for i, model in enumerate(self._models): preds_i = model.predict(inputs) if preds_i.ndim == 2: if preds_i.shape[1] != 1: raise ValueError("This method is supported only for scalar targets.") preds_i = preds_i.squeeze(1) preds.append(preds_i) lefts_list = [] rights_list = [] for preds_i in preds: lefts_list.append(preds_i - self._cv_errors[i][:, None]) rights_list.append(preds_i + self._cv_errors[i][:, None]) lefts = np.concatenate(lefts_list) rights = np.concatenate(rights_list) qleft = np.quantile(lefts, q=1 - self._coverage, axis=0) qright = np.quantile(rights, q=self._coverage, axis=0) return np.array(list(zip(qleft, qright)))