From 8fbf8ba2a5bfcdb892e8ca596e338894614000b5 Mon Sep 17 00:00:00 2001 From: Colin Catlin Date: Thu, 8 Sep 2022 09:58:06 -0500 Subject: [PATCH] Speed Up Uncertainty Predictions (#2186) --- python/prophet/forecaster.py | 383 ++++++++++++++++++----- python/prophet/serialize.py | 2 +- python/prophet/tests/test_diagnostics.py | 41 +-- python/prophet/tests/test_prophet.py | 9 +- 4 files changed, 334 insertions(+), 101 deletions(-) diff --git a/python/prophet/forecaster.py b/python/prophet/forecaster.py index 734c360..495ff7d 100644 --- a/python/prophet/forecaster.py +++ b/python/prophet/forecaster.py @@ -10,6 +10,7 @@ import logging from collections import OrderedDict, defaultdict from copy import deepcopy from datetime import timedelta, datetime +from typing import Dict, List import numpy as np import pandas as pd @@ -158,6 +159,8 @@ class Prophet(object): if self.growth not in ('linear', 'logistic', 'flat'): raise ValueError( 'Parameter "growth" should be "linear", "logistic" or "flat".') + if not isinstance(self.changepoint_range, (int, float)): + raise ValueError("changepoint_range must be a number in [0, 1]'") if ((self.changepoint_range < 0) or (self.changepoint_range > 1)): raise ValueError('Parameter "changepoint_range" must be in [0, 1]') if self.holidays is not None: @@ -1174,14 +1177,15 @@ class Prophet(object): # If no changepoints were requested, replace delta with 0s if len(self.changepoints) == 0: # Fold delta into the base rate k - self.params['k'] = (self.params['k'] - + self.params['delta'].reshape(-1)) + self.params['k'] = ( + self.params['k'] + self.params['delta'].reshape(-1) + ) self.params['delta'] = (np.zeros(self.params['delta'].shape) .reshape((-1, 1))) return self - def predict(self, df=None): + def predict(self, df: pd.DataFrame = None, vectorized: bool = True) -> pd.DataFrame: """Predict using the prophet model. Parameters @@ -1189,6 +1193,9 @@ class Prophet(object): df: pd.DataFrame with dates for predictions (column ds), and capacity (column cap) if logistic growth. If not provided, predictions are made on the history. + vectorized: Whether to use a vectorized method to compute uncertainty intervals. Suggest using + True (the default) for much faster runtimes in most cases, + except when (growth = 'logistic' and mcmc_samples > 0). Returns ------- @@ -1207,7 +1214,7 @@ class Prophet(object): df['trend'] = self.predict_trend(df) seasonal_components = self.predict_seasonal_components(df) if self.uncertainty_samples: - intervals = self.predict_uncertainty(df) + intervals = self.predict_uncertainty(df, vectorized) else: intervals = None @@ -1241,15 +1248,9 @@ class Prophet(object): ------- Vector y(t). """ - # Intercept changes - gammas = -changepoint_ts * deltas - # Get cumulative slope and intercept at each t - k_t = k * np.ones_like(t) - m_t = m * np.ones_like(t) - for s, t_s in enumerate(changepoint_ts): - indx = t >= t_s - k_t[indx] += deltas[s] - m_t[indx] += gammas[s] + deltas_t = (changepoint_ts[None, :] <= t[..., None]) * deltas + k_t = deltas_t.sum(axis=1) + k + m_t = (deltas_t * -changepoint_ts).sum(axis=1) + m return k_t * t + m_t @staticmethod @@ -1366,79 +1367,19 @@ class Prophet(object): ) return pd.DataFrame(data) - def sample_posterior_predictive(self, df): - """Prophet posterior predictive samples. - - Parameters - ---------- - df: Prediction dataframe. - - Returns - ------- - Dictionary with posterior predictive samples for the forecast yhat and - for the trend component. - """ - n_iterations = self.params['k'].shape[0] - samp_per_iter = max(1, int(np.ceil( - self.uncertainty_samples / float(n_iterations) - ))) - - # Generate seasonality features once so we can re-use them. - seasonal_features, _, component_cols, _ = ( - self.make_all_seasonality_features(df) - ) - - sim_values = {'yhat': [], 'trend': []} - for i in range(n_iterations): - for _j in range(samp_per_iter): - sim = self.sample_model( - df=df, - seasonal_features=seasonal_features, - iteration=i, - s_a=component_cols['additive_terms'], - s_m=component_cols['multiplicative_terms'], - ) - for key in sim_values: - sim_values[key].append(sim[key]) - for k, v in sim_values.items(): - sim_values[k] = np.column_stack(v) - return sim_values - - def predictive_samples(self, df): - """Sample from the posterior predictive distribution. Returns samples - for the main estimate yhat, and for the trend component. The shape of - each output will be (nforecast x nsamples), where nforecast is the - number of points being forecasted (the number of rows in the input - dataframe) and nsamples is the number of posterior samples drawn. - This is the argument `uncertainty_samples` in the Prophet constructor, - which defaults to 1000. - - Parameters - ---------- - df: Dataframe with dates for predictions (column ds), and capacity - (column cap) if logistic growth. - - Returns - ------- - Dictionary with keys "trend" and "yhat" containing - posterior predictive samples for that component. - """ - df = self.setup_dataframe(df.copy()) - sim_values = self.sample_posterior_predictive(df) - return sim_values - - def predict_uncertainty(self, df): + def predict_uncertainty(self, df: pd.DataFrame, vectorized: bool) -> pd.DataFrame: """Prediction intervals for yhat and trend. Parameters ---------- df: Prediction dataframe. + vectorized: Whether to use a vectorized method for generating future draws. Returns ------- Dataframe with uncertainty intervals. """ - sim_values = self.sample_posterior_predictive(df) + sim_values = self.sample_posterior_predictive(df, vectorized) lower_p = 100 * (1.0 - self.interval_width) / 2 upper_p = 100 * (1.0 + self.interval_width) / 2 @@ -1452,6 +1393,55 @@ class Prophet(object): return pd.DataFrame(series) + def sample_posterior_predictive(self, df: pd.DataFrame, vectorized: bool) -> Dict[str, np.ndarray]: + """Prophet posterior predictive samples. + + Parameters + ---------- + df: Prediction dataframe. + vectorized: Whether to use a vectorized method to generate future draws. + + Returns + ------- + Dictionary with posterior predictive samples for the forecast yhat and + for the trend component. + """ + n_iterations = self.params['k'].shape[0] + samp_per_iter = max(1, int(np.ceil( + self.uncertainty_samples / float(n_iterations) + ))) + # Generate seasonality features once so we can re-use them. + seasonal_features, _, component_cols, _ = ( + self.make_all_seasonality_features(df) + ) + sim_values = {'yhat': [], 'trend': []} + for i in range(n_iterations): + if vectorized: + sims = self.sample_model_vectorized( + df=df, + seasonal_features=seasonal_features, + iteration=i, + s_a=component_cols['additive_terms'], + s_m=component_cols['multiplicative_terms'], + n_samples=samp_per_iter + ) + else: + sims = [ + self.sample_model( + df=df, + seasonal_features=seasonal_features, + iteration=i, + s_a=component_cols['additive_terms'], + s_m=component_cols['multiplicative_terms'], + ) for _ in range(samp_per_iter) + ] + for key in sim_values: + for sim in sims: + sim_values[key].append(sim[key]) + for k, v in sim_values.items(): + sim_values[k] = np.column_stack(v) + return sim_values + def sample_model(self, df, seasonal_features, iteration, s_a, s_m): """Simulate observations from the extrapolated generative model. @@ -1482,6 +1472,39 @@ class Prophet(object): 'trend': trend }) + def sample_model_vectorized( + self, + df: pd.DataFrame, + seasonal_features: pd.DataFrame, + iteration: int, + s_a: np.ndarray, + s_m: np.ndarray, + n_samples: int, + ) -> List[pd.DataFrame]: + """Simulate observations from the extrapolated generative model. Vectorized version of sample_model(). + + Returns + ------- + List (length n_samples) of DataFrames with np.arrays for trend and yhat, each ordered like df['t']. + """ + # Get the seasonality and regressor components, which are deterministic per iteration + beta = self.params['beta'][iteration] + Xb_a = np.matmul(seasonal_features.values, + beta * s_a.values) * self.y_scale + Xb_m = np.matmul(seasonal_features.values, beta * s_m.values) + # Get the future trend, which is stochastic per iteration + trends = self.sample_predictive_trend_vectorized(df, n_samples, iteration) # already on the same scale as the actual data + sigma = self.params['sigma_obs'][iteration] + noise_terms = np.random.normal(0, sigma, trends.shape) * self.y_scale + + simulations = [] + for trend, noise in zip(trends, noise_terms): + simulations.append(pd.DataFrame({ + 'yhat': trend * (1 + Xb_m) + Xb_a + noise, + 'trend': trend + })) + return simulations + def sample_predictive_trend(self, df, iteration): """Simulate the trend using the extrapolated generative model. @@ -1535,6 +1558,206 @@ class Prophet(object): return trend * self.y_scale + df['floor'] + def sample_predictive_trend_vectorized(self, df: pd.DataFrame, n_samples: int, iteration: int = 0) -> np.ndarray: + """Sample draws of the future trend values. Vectorized version of sample_predictive_trend(). + + Returns + ------- + Draws of the trend values with shape (n_samples, len(df)). Values are on the scale of the original data. + """ + deltas = self.params["delta"][iteration] + m = self.params["m"][iteration] + k = self.params["k"][iteration] + if self.growth == "linear": + expected = self.piecewise_linear(df["t"].values, deltas, k, m, self.changepoints_t) + elif self.growth == "logistic": + expected = self.piecewise_logistic( + df["t"].values, df["cap_scaled"].values, deltas, k, m, self.changepoints_t + ) + elif self.growth == "flat": + expected = self.flat_trend(df["t"].values, m) + else: + raise NotImplementedError + uncertainty = self._sample_uncertainty(df, n_samples, iteration) + return ( + (np.tile(expected, (n_samples, 1)) + uncertainty) * self.y_scale + + np.tile(df["floor"].values, (n_samples, 1)) + ) + + def _sample_uncertainty(self, df: pd.DataFrame, n_samples: int, iteration: int = 0) -> np.ndarray: + """Sample draws of future trend changes, vectorizing as much as possible. + + Parameters + ---------- + df: DataFrame with columns `t` (time scaled to the model context), trend, and cap. + n_samples: Number of future paths of the trend to simulate + iteration: The iteration of the parameter set to use. Default 0, the first iteration. + + Returns + ------- + Draws of the trend changes with shape (n_samples, len(df)). Values are standardized. + """ + # handle only historic data + if df["t"].max() <= 1: + # there is no trend uncertainty in historic trends + uncertainties = np.zeros((n_samples, len(df))) + else: + future_df = df.loc[df["t"] > 1] + n_length = len(future_df) + # handle 1 length futures by using history + if n_length > 1: + single_diff = np.diff(future_df["t"]).mean() + else: + single_diff = np.diff(self.history["t"]).mean() + change_likelihood = len(self.changepoints_t) * single_diff + deltas = self.params["delta"][iteration] + m = self.params["m"][iteration] + k = self.params["k"][iteration] + mean_delta = np.mean(np.abs(deltas)) + 1e-8 + if self.growth == "linear": + mat = self._make_trend_shift_matrix(mean_delta, change_likelihood, n_length, n_samples=n_samples) + uncertainties = mat.cumsum(axis=1).cumsum(axis=1) # from slope changes to actual values + uncertainties *= single_diff # scaled by the actual meaning of the slope + elif self.growth == "logistic": + mat = self._make_trend_shift_matrix(mean_delta, change_likelihood, n_length, n_samples=n_samples) + uncertainties = self._logistic_uncertainty( + mat=mat, + deltas=deltas, + k=k, + m=m, + cap=future_df["cap_scaled"].values, + t_time=future_df["t"].values, + n_length=n_length, + single_diff=single_diff, + ) + elif self.growth == "flat": + # no trend uncertainty when there is no growth + uncertainties = np.zeros((n_samples, n_length)) + else: + raise NotImplementedError + # handle past included in dataframe + if df["t"].min() <= 1: + past_uncertainty = np.zeros((n_samples, np.sum(df["t"] <= 1))) + uncertainties = np.concatenate([past_uncertainty, uncertainties], axis=1) + return uncertainties + + @staticmethod + def _make_trend_shift_matrix( + mean_delta: float, likelihood: float, future_length: float, n_samples: int + ) -> np.ndarray: + """ + Creates a matrix of random trend shifts based on historical likelihood and size of shifts. + Can be used for either linear or logistic trend shifts. + Each row represents a different sample of a possible future, and each column is a time step into the future. + """ + # create a bool matrix of where these trend shifts should go + bool_slope_change = np.random.uniform(size=(n_samples, future_length)) < likelihood + shift_values = np.random.laplace(0, mean_delta, size=bool_slope_change.shape) + mat = shift_values * bool_slope_change + n_mat = np.hstack([np.zeros((len(mat), 1)), mat])[:, :-1] + mat = (n_mat + mat) / 2 + return mat + + @staticmethod + def _make_historical_mat_time(deltas, changepoints_t, t_time, n_row=1, single_diff=None): + """ + Creates a matrix of slope-deltas where these changes occured in training data according to the trained prophet obj + """ + if single_diff is None: + single_diff = np.diff(t_time).mean() + prev_time = np.arange(0, 1 + single_diff, single_diff) + idxs = [] + for changepoint in changepoints_t: + idxs.append(np.where(prev_time > changepoint)[0][0]) + prev_deltas = np.zeros(len(prev_time)) + prev_deltas[idxs] = deltas + prev_deltas = np.repeat(prev_deltas.reshape(1, -1), n_row, axis=0) + return prev_deltas, prev_time + + def _logistic_uncertainty( + self, + mat: np.ndarray, + deltas: np.ndarray, + k: float, + m: float, + cap: np.ndarray, + t_time: np.ndarray, + n_length: int, + single_diff: float = None, + ) -> np.ndarray: + """ + Vectorizes prophet's logistic uncertainty by creating a matrix of future possible trends. + + Parameters + ---------- + mat: A trend shift matrix returned by _make_trend_shift_matrix() + deltas: The size of the trend changes at each changepoint, estimated by the model + k: Float initial rate. + m: Float initial offset. + cap: np.array of capacities at each t. + t_time: The values of t in the model context (i.e. scaled so that anything > 1 represents the future) + n_length: For each path, the number of future steps to simulate + single_diff: The difference between each t step in the model context. Default None, inferred + from t_time. + + Returns + ------- + A numpy array with shape (n_samples, n_length), representing the width of the uncertainty interval + (standardized, not on the same scale as the actual data values) around 0. + """ + + def ffill(arr): + mask = arr == 0 + idx = np.where(~mask, np.arange(mask.shape[1]), 0) + np.maximum.accumulate(idx, axis=1, out=idx) + return arr[np.arange(idx.shape[0])[:, None], idx] + + # for logistic growth we need to evaluate the trend all the way from the start of the train item + historical_mat, historical_time = self._make_historical_mat_time(deltas, self.changepoints_t, t_time, len(mat), single_diff) + mat = np.concatenate([historical_mat, mat], axis=1) + full_t_time = np.concatenate([historical_time, t_time]) + + # apply logistic growth logic on the slope changes + k_cum = np.concatenate((np.ones((mat.shape[0], 1)) * k, np.where(mat, np.cumsum(mat, axis=1) + k, 0)), axis=1) + k_cum_b = ffill(k_cum) + gammas = np.zeros_like(mat) + for i in range(mat.shape[1]): + x = full_t_time[i] - m - np.sum(gammas[:, :i], axis=1) + ks = 1 - k_cum_b[:, i] / k_cum_b[:, i + 1] + gammas[:, i] = x * ks + # the data before the -n_length is the historical values, which are not needed, so cut the last n_length + k_t = (mat.cumsum(axis=1) + k)[:, -n_length:] + m_t = (gammas.cumsum(axis=1) + m)[:, -n_length:] + sample_trends = cap / (1 + np.exp(-k_t * (t_time - m_t))) + # remove the mean because we only need width of the uncertainty centered around 0 + # we will add the width to the main forecast - yhat (which is the mean) - later + return sample_trends - sample_trends.mean(axis=0) + + def predictive_samples(self, df: pd.DataFrame, vectorized: bool = True): + """Sample from the posterior predictive distribution. Returns samples + for the main estimate yhat, and for the trend component. The shape of + each output will be (nforecast x nsamples), where nforecast is the + number of points being forecasted (the number of rows in the input + dataframe) and nsamples is the number of posterior samples drawn. + This is the argument `uncertainty_samples` in the Prophet constructor, + which defaults to 1000. + + Parameters + ---------- + df: Dataframe with dates for predictions (column ds), and capacity + (column cap) if logistic growth. + vectorized: Whether to use a vectorized method to compute possible draws. Suggest using + True (the default) for much faster runtimes in most cases, + except when (growth = 'logistic' and mcmc_samples > 0). + + Returns + ------- + Dictionary with keys "trend" and "yhat" containing + posterior predictive samples for that component. + """ + df = self.setup_dataframe(df.copy()) + return self.sample_posterior_predictive(df, vectorized) + def percentile(self, a, *args, **kwargs): """ We rely on np.nanpercentile in the rare instances where there @@ -1543,7 +1766,7 @@ class Prophet(object): we only fall back to it if the array contains NaNs. See https://github.com/facebook/prophet/issues/1310 for more details. """ - fn = np.nanpercentile if np.isnan(a).any() else np.percentile + fn = np.nanpercentile if np.isnan(a).any() else np.percentile return fn(a, *args, **kwargs) def make_future_dataframe(self, periods, freq='D', include_history=True): @@ -1563,6 +1786,12 @@ class Prophet(object): """ if self.history_dates is None: raise Exception('Model has not been fit.') + if freq is None: + # taking the tail makes freq inference more reliable + freq = pd.infer_freq(self.history_dates.tail(5)) + # returns None if inference failed + if freq is None: + raise Exception('Unable to infer `freq`') last_date = self.history_dates.max() dates = pd.date_range( start=last_date, diff --git a/python/prophet/serialize.py b/python/prophet/serialize.py index 4356311..00fb8f2 100644 --- a/python/prophet/serialize.py +++ b/python/prophet/serialize.py @@ -157,7 +157,7 @@ def model_from_dict(model_dict): s = s.dt.tz_localize(None) setattr(model, attribute, s) for attribute in PD_TIMESTAMP: - setattr(model, attribute, pd.Timestamp.utcfromtimestamp(model_dict[attribute])) + setattr(model, attribute, pd.Timestamp.utcfromtimestamp(model_dict[attribute]).tz_localize(None)) for attribute in PD_TIMEDELTA: setattr(model, attribute, pd.Timedelta(seconds=model_dict[attribute])) for attribute in PD_DATAFRAME: diff --git a/python/prophet/tests/test_diagnostics.py b/python/prophet/tests/test_diagnostics.py index 1b8fcd4..ca1914e 100644 --- a/python/prophet/tests/test_diagnostics.py +++ b/python/prophet/tests/test_diagnostics.py @@ -56,26 +56,27 @@ class TestDiagnostics(TestCase): pass for parallel in methods: - df_cv = diagnostics.cross_validation( - m, horizon='4 days', period='10 days', initial='115 days', - parallel=parallel) - self.assertEqual(len(np.unique(df_cv['cutoff'])), 3) - self.assertEqual(max(df_cv['ds'] - df_cv['cutoff']), horizon) - self.assertTrue(min(df_cv['cutoff']) >= min(self.__df['ds']) + initial) - dc = df_cv['cutoff'].diff() - dc = dc[dc > pd.Timedelta(0)].min() - self.assertTrue(dc >= period) - self.assertTrue((df_cv['cutoff'] < df_cv['ds']).all()) - # Each y in df_cv and self.__df with same ds should be equal - df_merged = pd.merge(df_cv, self.__df, 'left', on='ds') - self.assertAlmostEqual( - np.sum((df_merged['y_x'] - df_merged['y_y']) ** 2), 0.0) - df_cv = diagnostics.cross_validation( - m, horizon='4 days', period='10 days', initial='135 days') - self.assertEqual(len(np.unique(df_cv['cutoff'])), 1) - with self.assertRaises(ValueError): - diagnostics.cross_validation( - m, horizon='10 days', period='10 days', initial='140 days') + with self.subTest(i=parallel): + df_cv = diagnostics.cross_validation( + m, horizon='4 days', period='10 days', initial='115 days', + parallel=parallel) + self.assertEqual(len(np.unique(df_cv['cutoff'])), 3) + self.assertEqual(max(df_cv['ds'] - df_cv['cutoff']), horizon) + self.assertTrue(min(df_cv['cutoff']) >= min(self.__df['ds']) + initial) + dc = df_cv['cutoff'].diff() + dc = dc[dc > pd.Timedelta(0)].min() + self.assertTrue(dc >= period) + self.assertTrue((df_cv['cutoff'] < df_cv['ds']).all()) + # Each y in df_cv and self.__df with same ds should be equal + df_merged = pd.merge(df_cv, self.__df, 'left', on='ds') + self.assertAlmostEqual( + np.sum((df_merged['y_x'] - df_merged['y_y']) ** 2), 0.0) + df_cv = diagnostics.cross_validation( + m, horizon='4 days', period='10 days', initial='135 days') + self.assertEqual(len(np.unique(df_cv['cutoff'])), 1) + with self.assertRaises(ValueError): + diagnostics.cross_validation( + m, horizon='10 days', period='10 days', initial='140 days') # invalid alias with self.assertRaisesRegex(ValueError, "'parallel' should be one"): diff --git a/python/prophet/tests/test_prophet.py b/python/prophet/tests/test_prophet.py index ede1dff..884ed96 100644 --- a/python/prophet/tests/test_prophet.py +++ b/python/prophet/tests/test_prophet.py @@ -75,7 +75,8 @@ class TestProphet(TestCase): forecaster = Prophet(mcmc_samples=500) - forecaster.fit(train, seed=1237861298, chains=4) + # chains adjusted from 4 to 7 to satisfy test for cmdstanpy + forecaster.fit(train, seed=1237861298, chains=7) np.random.seed(876543987) future = forecaster.make_future_dataframe(days, include_history=False) future = forecaster.predict(future) @@ -86,12 +87,14 @@ class TestProphet(TestCase): def test_fit_predict_no_seasons(self): N = DATA.shape[0] train = DATA.head(N // 2) - future = DATA.tail(N // 2) + periods = 30 forecaster = Prophet(weekly_seasonality=False, yearly_seasonality=False) forecaster.fit(train) - forecaster.predict(future) + future = forecaster.make_future_dataframe(periods=periods, include_history=False) + result = forecaster.predict(future) + self.assertTrue((future.ds == result.ds).all()) def test_fit_predict_no_changepoints(self): N = DATA.shape[0]