Speed Up Uncertainty Predictions (#2186)

This commit is contained in:
Colin Catlin 2022-09-08 09:58:06 -05:00 committed by GitHub
parent b3fedb560e
commit 8fbf8ba2a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 334 additions and 101 deletions

View file

@ -10,6 +10,7 @@ import logging
from collections import OrderedDict, defaultdict
from copy import deepcopy
from datetime import timedelta, datetime
from typing import Dict, List
import numpy as np
import pandas as pd
@ -158,6 +159,8 @@ class Prophet(object):
if self.growth not in ('linear', 'logistic', 'flat'):
raise ValueError(
'Parameter "growth" should be "linear", "logistic" or "flat".')
if not isinstance(self.changepoint_range, (int, float)):
raise ValueError("changepoint_range must be a number in [0, 1]'")
if ((self.changepoint_range < 0) or (self.changepoint_range > 1)):
raise ValueError('Parameter "changepoint_range" must be in [0, 1]')
if self.holidays is not None:
@ -1174,14 +1177,15 @@ class Prophet(object):
# If no changepoints were requested, replace delta with 0s
if len(self.changepoints) == 0:
# Fold delta into the base rate k
self.params['k'] = (self.params['k']
+ self.params['delta'].reshape(-1))
self.params['k'] = (
self.params['k'] + self.params['delta'].reshape(-1)
)
self.params['delta'] = (np.zeros(self.params['delta'].shape)
.reshape((-1, 1)))
return self
def predict(self, df=None):
def predict(self, df: pd.DataFrame = None, vectorized: bool = True) -> pd.DataFrame:
"""Predict using the prophet model.
Parameters
@ -1189,6 +1193,9 @@ class Prophet(object):
df: pd.DataFrame with dates for predictions (column ds), and capacity
(column cap) if logistic growth. If not provided, predictions are
made on the history.
vectorized: Whether to use a vectorized method to compute uncertainty intervals. Suggest using
True (the default) for much faster runtimes in most cases,
except when (growth = 'logistic' and mcmc_samples > 0).
Returns
-------
@ -1207,7 +1214,7 @@ class Prophet(object):
df['trend'] = self.predict_trend(df)
seasonal_components = self.predict_seasonal_components(df)
if self.uncertainty_samples:
intervals = self.predict_uncertainty(df)
intervals = self.predict_uncertainty(df, vectorized)
else:
intervals = None
@ -1241,15 +1248,9 @@ class Prophet(object):
-------
Vector y(t).
"""
# Intercept changes
gammas = -changepoint_ts * deltas
# Get cumulative slope and intercept at each t
k_t = k * np.ones_like(t)
m_t = m * np.ones_like(t)
for s, t_s in enumerate(changepoint_ts):
indx = t >= t_s
k_t[indx] += deltas[s]
m_t[indx] += gammas[s]
deltas_t = (changepoint_ts[None, :] <= t[..., None]) * deltas
k_t = deltas_t.sum(axis=1) + k
m_t = (deltas_t * -changepoint_ts).sum(axis=1) + m
return k_t * t + m_t
@staticmethod
@ -1366,79 +1367,19 @@ class Prophet(object):
)
return pd.DataFrame(data)
def sample_posterior_predictive(self, df):
"""Prophet posterior predictive samples.
Parameters
----------
df: Prediction dataframe.
Returns
-------
Dictionary with posterior predictive samples for the forecast yhat and
for the trend component.
"""
n_iterations = self.params['k'].shape[0]
samp_per_iter = max(1, int(np.ceil(
self.uncertainty_samples / float(n_iterations)
)))
# Generate seasonality features once so we can re-use them.
seasonal_features, _, component_cols, _ = (
self.make_all_seasonality_features(df)
)
sim_values = {'yhat': [], 'trend': []}
for i in range(n_iterations):
for _j in range(samp_per_iter):
sim = self.sample_model(
df=df,
seasonal_features=seasonal_features,
iteration=i,
s_a=component_cols['additive_terms'],
s_m=component_cols['multiplicative_terms'],
)
for key in sim_values:
sim_values[key].append(sim[key])
for k, v in sim_values.items():
sim_values[k] = np.column_stack(v)
return sim_values
def predictive_samples(self, df):
"""Sample from the posterior predictive distribution. Returns samples
for the main estimate yhat, and for the trend component. The shape of
each output will be (nforecast x nsamples), where nforecast is the
number of points being forecasted (the number of rows in the input
dataframe) and nsamples is the number of posterior samples drawn.
This is the argument `uncertainty_samples` in the Prophet constructor,
which defaults to 1000.
Parameters
----------
df: Dataframe with dates for predictions (column ds), and capacity
(column cap) if logistic growth.
Returns
-------
Dictionary with keys "trend" and "yhat" containing
posterior predictive samples for that component.
"""
df = self.setup_dataframe(df.copy())
sim_values = self.sample_posterior_predictive(df)
return sim_values
def predict_uncertainty(self, df):
def predict_uncertainty(self, df: pd.DataFrame, vectorized: bool) -> pd.DataFrame:
"""Prediction intervals for yhat and trend.
Parameters
----------
df: Prediction dataframe.
vectorized: Whether to use a vectorized method for generating future draws.
Returns
-------
Dataframe with uncertainty intervals.
"""
sim_values = self.sample_posterior_predictive(df)
sim_values = self.sample_posterior_predictive(df, vectorized)
lower_p = 100 * (1.0 - self.interval_width) / 2
upper_p = 100 * (1.0 + self.interval_width) / 2
@ -1452,6 +1393,55 @@ class Prophet(object):
return pd.DataFrame(series)
def sample_posterior_predictive(self, df: pd.DataFrame, vectorized: bool) -> Dict[str, np.ndarray]:
"""Prophet posterior predictive samples.
Parameters
----------
df: Prediction dataframe.
vectorized: Whether to use a vectorized method to generate future draws.
Returns
-------
Dictionary with posterior predictive samples for the forecast yhat and
for the trend component.
"""
n_iterations = self.params['k'].shape[0]
samp_per_iter = max(1, int(np.ceil(
self.uncertainty_samples / float(n_iterations)
)))
# Generate seasonality features once so we can re-use them.
seasonal_features, _, component_cols, _ = (
self.make_all_seasonality_features(df)
)
sim_values = {'yhat': [], 'trend': []}
for i in range(n_iterations):
if vectorized:
sims = self.sample_model_vectorized(
df=df,
seasonal_features=seasonal_features,
iteration=i,
s_a=component_cols['additive_terms'],
s_m=component_cols['multiplicative_terms'],
n_samples=samp_per_iter
)
else:
sims = [
self.sample_model(
df=df,
seasonal_features=seasonal_features,
iteration=i,
s_a=component_cols['additive_terms'],
s_m=component_cols['multiplicative_terms'],
) for _ in range(samp_per_iter)
]
for key in sim_values:
for sim in sims:
sim_values[key].append(sim[key])
for k, v in sim_values.items():
sim_values[k] = np.column_stack(v)
return sim_values
def sample_model(self, df, seasonal_features, iteration, s_a, s_m):
"""Simulate observations from the extrapolated generative model.
@ -1482,6 +1472,39 @@ class Prophet(object):
'trend': trend
})
def sample_model_vectorized(
self,
df: pd.DataFrame,
seasonal_features: pd.DataFrame,
iteration: int,
s_a: np.ndarray,
s_m: np.ndarray,
n_samples: int,
) -> List[pd.DataFrame]:
"""Simulate observations from the extrapolated generative model. Vectorized version of sample_model().
Returns
-------
List (length n_samples) of DataFrames with np.arrays for trend and yhat, each ordered like df['t'].
"""
# Get the seasonality and regressor components, which are deterministic per iteration
beta = self.params['beta'][iteration]
Xb_a = np.matmul(seasonal_features.values,
beta * s_a.values) * self.y_scale
Xb_m = np.matmul(seasonal_features.values, beta * s_m.values)
# Get the future trend, which is stochastic per iteration
trends = self.sample_predictive_trend_vectorized(df, n_samples, iteration) # already on the same scale as the actual data
sigma = self.params['sigma_obs'][iteration]
noise_terms = np.random.normal(0, sigma, trends.shape) * self.y_scale
simulations = []
for trend, noise in zip(trends, noise_terms):
simulations.append(pd.DataFrame({
'yhat': trend * (1 + Xb_m) + Xb_a + noise,
'trend': trend
}))
return simulations
def sample_predictive_trend(self, df, iteration):
"""Simulate the trend using the extrapolated generative model.
@ -1535,6 +1558,206 @@ class Prophet(object):
return trend * self.y_scale + df['floor']
def sample_predictive_trend_vectorized(self, df: pd.DataFrame, n_samples: int, iteration: int = 0) -> np.ndarray:
"""Sample draws of the future trend values. Vectorized version of sample_predictive_trend().
Returns
-------
Draws of the trend values with shape (n_samples, len(df)). Values are on the scale of the original data.
"""
deltas = self.params["delta"][iteration]
m = self.params["m"][iteration]
k = self.params["k"][iteration]
if self.growth == "linear":
expected = self.piecewise_linear(df["t"].values, deltas, k, m, self.changepoints_t)
elif self.growth == "logistic":
expected = self.piecewise_logistic(
df["t"].values, df["cap_scaled"].values, deltas, k, m, self.changepoints_t
)
elif self.growth == "flat":
expected = self.flat_trend(df["t"].values, m)
else:
raise NotImplementedError
uncertainty = self._sample_uncertainty(df, n_samples, iteration)
return (
(np.tile(expected, (n_samples, 1)) + uncertainty) * self.y_scale +
np.tile(df["floor"].values, (n_samples, 1))
)
def _sample_uncertainty(self, df: pd.DataFrame, n_samples: int, iteration: int = 0) -> np.ndarray:
"""Sample draws of future trend changes, vectorizing as much as possible.
Parameters
----------
df: DataFrame with columns `t` (time scaled to the model context), trend, and cap.
n_samples: Number of future paths of the trend to simulate
iteration: The iteration of the parameter set to use. Default 0, the first iteration.
Returns
-------
Draws of the trend changes with shape (n_samples, len(df)). Values are standardized.
"""
# handle only historic data
if df["t"].max() <= 1:
# there is no trend uncertainty in historic trends
uncertainties = np.zeros((n_samples, len(df)))
else:
future_df = df.loc[df["t"] > 1]
n_length = len(future_df)
# handle 1 length futures by using history
if n_length > 1:
single_diff = np.diff(future_df["t"]).mean()
else:
single_diff = np.diff(self.history["t"]).mean()
change_likelihood = len(self.changepoints_t) * single_diff
deltas = self.params["delta"][iteration]
m = self.params["m"][iteration]
k = self.params["k"][iteration]
mean_delta = np.mean(np.abs(deltas)) + 1e-8
if self.growth == "linear":
mat = self._make_trend_shift_matrix(mean_delta, change_likelihood, n_length, n_samples=n_samples)
uncertainties = mat.cumsum(axis=1).cumsum(axis=1) # from slope changes to actual values
uncertainties *= single_diff # scaled by the actual meaning of the slope
elif self.growth == "logistic":
mat = self._make_trend_shift_matrix(mean_delta, change_likelihood, n_length, n_samples=n_samples)
uncertainties = self._logistic_uncertainty(
mat=mat,
deltas=deltas,
k=k,
m=m,
cap=future_df["cap_scaled"].values,
t_time=future_df["t"].values,
n_length=n_length,
single_diff=single_diff,
)
elif self.growth == "flat":
# no trend uncertainty when there is no growth
uncertainties = np.zeros((n_samples, n_length))
else:
raise NotImplementedError
# handle past included in dataframe
if df["t"].min() <= 1:
past_uncertainty = np.zeros((n_samples, np.sum(df["t"] <= 1)))
uncertainties = np.concatenate([past_uncertainty, uncertainties], axis=1)
return uncertainties
@staticmethod
def _make_trend_shift_matrix(
mean_delta: float, likelihood: float, future_length: float, n_samples: int
) -> np.ndarray:
"""
Creates a matrix of random trend shifts based on historical likelihood and size of shifts.
Can be used for either linear or logistic trend shifts.
Each row represents a different sample of a possible future, and each column is a time step into the future.
"""
# create a bool matrix of where these trend shifts should go
bool_slope_change = np.random.uniform(size=(n_samples, future_length)) < likelihood
shift_values = np.random.laplace(0, mean_delta, size=bool_slope_change.shape)
mat = shift_values * bool_slope_change
n_mat = np.hstack([np.zeros((len(mat), 1)), mat])[:, :-1]
mat = (n_mat + mat) / 2
return mat
@staticmethod
def _make_historical_mat_time(deltas, changepoints_t, t_time, n_row=1, single_diff=None):
"""
Creates a matrix of slope-deltas where these changes occured in training data according to the trained prophet obj
"""
if single_diff is None:
single_diff = np.diff(t_time).mean()
prev_time = np.arange(0, 1 + single_diff, single_diff)
idxs = []
for changepoint in changepoints_t:
idxs.append(np.where(prev_time > changepoint)[0][0])
prev_deltas = np.zeros(len(prev_time))
prev_deltas[idxs] = deltas
prev_deltas = np.repeat(prev_deltas.reshape(1, -1), n_row, axis=0)
return prev_deltas, prev_time
def _logistic_uncertainty(
self,
mat: np.ndarray,
deltas: np.ndarray,
k: float,
m: float,
cap: np.ndarray,
t_time: np.ndarray,
n_length: int,
single_diff: float = None,
) -> np.ndarray:
"""
Vectorizes prophet's logistic uncertainty by creating a matrix of future possible trends.
Parameters
----------
mat: A trend shift matrix returned by _make_trend_shift_matrix()
deltas: The size of the trend changes at each changepoint, estimated by the model
k: Float initial rate.
m: Float initial offset.
cap: np.array of capacities at each t.
t_time: The values of t in the model context (i.e. scaled so that anything > 1 represents the future)
n_length: For each path, the number of future steps to simulate
single_diff: The difference between each t step in the model context. Default None, inferred
from t_time.
Returns
-------
A numpy array with shape (n_samples, n_length), representing the width of the uncertainty interval
(standardized, not on the same scale as the actual data values) around 0.
"""
def ffill(arr):
mask = arr == 0
idx = np.where(~mask, np.arange(mask.shape[1]), 0)
np.maximum.accumulate(idx, axis=1, out=idx)
return arr[np.arange(idx.shape[0])[:, None], idx]
# for logistic growth we need to evaluate the trend all the way from the start of the train item
historical_mat, historical_time = self._make_historical_mat_time(deltas, self.changepoints_t, t_time, len(mat), single_diff)
mat = np.concatenate([historical_mat, mat], axis=1)
full_t_time = np.concatenate([historical_time, t_time])
# apply logistic growth logic on the slope changes
k_cum = np.concatenate((np.ones((mat.shape[0], 1)) * k, np.where(mat, np.cumsum(mat, axis=1) + k, 0)), axis=1)
k_cum_b = ffill(k_cum)
gammas = np.zeros_like(mat)
for i in range(mat.shape[1]):
x = full_t_time[i] - m - np.sum(gammas[:, :i], axis=1)
ks = 1 - k_cum_b[:, i] / k_cum_b[:, i + 1]
gammas[:, i] = x * ks
# the data before the -n_length is the historical values, which are not needed, so cut the last n_length
k_t = (mat.cumsum(axis=1) + k)[:, -n_length:]
m_t = (gammas.cumsum(axis=1) + m)[:, -n_length:]
sample_trends = cap / (1 + np.exp(-k_t * (t_time - m_t)))
# remove the mean because we only need width of the uncertainty centered around 0
# we will add the width to the main forecast - yhat (which is the mean) - later
return sample_trends - sample_trends.mean(axis=0)
def predictive_samples(self, df: pd.DataFrame, vectorized: bool = True):
"""Sample from the posterior predictive distribution. Returns samples
for the main estimate yhat, and for the trend component. The shape of
each output will be (nforecast x nsamples), where nforecast is the
number of points being forecasted (the number of rows in the input
dataframe) and nsamples is the number of posterior samples drawn.
This is the argument `uncertainty_samples` in the Prophet constructor,
which defaults to 1000.
Parameters
----------
df: Dataframe with dates for predictions (column ds), and capacity
(column cap) if logistic growth.
vectorized: Whether to use a vectorized method to compute possible draws. Suggest using
True (the default) for much faster runtimes in most cases,
except when (growth = 'logistic' and mcmc_samples > 0).
Returns
-------
Dictionary with keys "trend" and "yhat" containing
posterior predictive samples for that component.
"""
df = self.setup_dataframe(df.copy())
return self.sample_posterior_predictive(df, vectorized)
def percentile(self, a, *args, **kwargs):
"""
We rely on np.nanpercentile in the rare instances where there
@ -1543,7 +1766,7 @@ class Prophet(object):
we only fall back to it if the array contains NaNs. See
https://github.com/facebook/prophet/issues/1310 for more details.
"""
fn = np.nanpercentile if np.isnan(a).any() else np.percentile
fn = np.nanpercentile if np.isnan(a).any() else np.percentile
return fn(a, *args, **kwargs)
def make_future_dataframe(self, periods, freq='D', include_history=True):
@ -1563,6 +1786,12 @@ class Prophet(object):
"""
if self.history_dates is None:
raise Exception('Model has not been fit.')
if freq is None:
# taking the tail makes freq inference more reliable
freq = pd.infer_freq(self.history_dates.tail(5))
# returns None if inference failed
if freq is None:
raise Exception('Unable to infer `freq`')
last_date = self.history_dates.max()
dates = pd.date_range(
start=last_date,

View file

@ -157,7 +157,7 @@ def model_from_dict(model_dict):
s = s.dt.tz_localize(None)
setattr(model, attribute, s)
for attribute in PD_TIMESTAMP:
setattr(model, attribute, pd.Timestamp.utcfromtimestamp(model_dict[attribute]))
setattr(model, attribute, pd.Timestamp.utcfromtimestamp(model_dict[attribute]).tz_localize(None))
for attribute in PD_TIMEDELTA:
setattr(model, attribute, pd.Timedelta(seconds=model_dict[attribute]))
for attribute in PD_DATAFRAME:

View file

@ -56,26 +56,27 @@ class TestDiagnostics(TestCase):
pass
for parallel in methods:
df_cv = diagnostics.cross_validation(
m, horizon='4 days', period='10 days', initial='115 days',
parallel=parallel)
self.assertEqual(len(np.unique(df_cv['cutoff'])), 3)
self.assertEqual(max(df_cv['ds'] - df_cv['cutoff']), horizon)
self.assertTrue(min(df_cv['cutoff']) >= min(self.__df['ds']) + initial)
dc = df_cv['cutoff'].diff()
dc = dc[dc > pd.Timedelta(0)].min()
self.assertTrue(dc >= period)
self.assertTrue((df_cv['cutoff'] < df_cv['ds']).all())
# Each y in df_cv and self.__df with same ds should be equal
df_merged = pd.merge(df_cv, self.__df, 'left', on='ds')
self.assertAlmostEqual(
np.sum((df_merged['y_x'] - df_merged['y_y']) ** 2), 0.0)
df_cv = diagnostics.cross_validation(
m, horizon='4 days', period='10 days', initial='135 days')
self.assertEqual(len(np.unique(df_cv['cutoff'])), 1)
with self.assertRaises(ValueError):
diagnostics.cross_validation(
m, horizon='10 days', period='10 days', initial='140 days')
with self.subTest(i=parallel):
df_cv = diagnostics.cross_validation(
m, horizon='4 days', period='10 days', initial='115 days',
parallel=parallel)
self.assertEqual(len(np.unique(df_cv['cutoff'])), 3)
self.assertEqual(max(df_cv['ds'] - df_cv['cutoff']), horizon)
self.assertTrue(min(df_cv['cutoff']) >= min(self.__df['ds']) + initial)
dc = df_cv['cutoff'].diff()
dc = dc[dc > pd.Timedelta(0)].min()
self.assertTrue(dc >= period)
self.assertTrue((df_cv['cutoff'] < df_cv['ds']).all())
# Each y in df_cv and self.__df with same ds should be equal
df_merged = pd.merge(df_cv, self.__df, 'left', on='ds')
self.assertAlmostEqual(
np.sum((df_merged['y_x'] - df_merged['y_y']) ** 2), 0.0)
df_cv = diagnostics.cross_validation(
m, horizon='4 days', period='10 days', initial='135 days')
self.assertEqual(len(np.unique(df_cv['cutoff'])), 1)
with self.assertRaises(ValueError):
diagnostics.cross_validation(
m, horizon='10 days', period='10 days', initial='140 days')
# invalid alias
with self.assertRaisesRegex(ValueError, "'parallel' should be one"):

View file

@ -75,7 +75,8 @@ class TestProphet(TestCase):
forecaster = Prophet(mcmc_samples=500)
forecaster.fit(train, seed=1237861298, chains=4)
# chains adjusted from 4 to 7 to satisfy test for cmdstanpy
forecaster.fit(train, seed=1237861298, chains=7)
np.random.seed(876543987)
future = forecaster.make_future_dataframe(days, include_history=False)
future = forecaster.predict(future)
@ -86,12 +87,14 @@ class TestProphet(TestCase):
def test_fit_predict_no_seasons(self):
N = DATA.shape[0]
train = DATA.head(N // 2)
future = DATA.tail(N // 2)
periods = 30
forecaster = Prophet(weekly_seasonality=False,
yearly_seasonality=False)
forecaster.fit(train)
forecaster.predict(future)
future = forecaster.make_future_dataframe(periods=periods, include_history=False)
result = forecaster.predict(future)
self.assertTrue((future.ds == result.ds).all())
def test_fit_predict_no_changepoints(self):
N = DATA.shape[0]