Module timexseries_clustering.data_clustering.models.predictor
Expand source code
import json
import logging
import math
import multiprocessing
import pkgutil
from functools import reduce
from typing import List
import pandas as pd
from pandas import DataFrame, Series
import numpy as np
from timexseries_clustering.data_clustering.transformation import transformation_factory
from timexseries_clustering.data_clustering.distance_metric import distance_metric_factory
from timexseries_clustering.data_clustering.validation_performances import ValidationPerformance
log = logging.getLogger(__name__)
class SingleResult:
"""
Class for the result of a model, trained on specific model parameter (e.g using a specific number of clusters and transformation).
Parameters
----------
characteristics : dict
Characteristics, using this model parameters
performances : ValidationPerformance
Model performances (`timexseries_clustering.data_prediction.validation_performances.ValidationPerformance`),
obtained using different number of clusters and transformation.
"""
def __init__(self, characteristics: dict, performances: ValidationPerformance):
self.characteristics = characteristics
self.performances = performances
class ModelResult:
"""
Class to collect the global results of a model clustering of all the time-series.
Parameters
----------
best_clustering : DataFrame
Clustering obtained using the best model parameters and _all_ the available time-series. This are
the cluster indexes that users are most likely to want.
results : [SingleResults]
List of all the results obtained using all the possible model parameters set for this model and metric, on the time series.
This is useful to create plots which show how the performance vary changing the number of clusters (e.g.
`timexseries.data_visualization.functions.performance_plot`).
characteristics : dict
Model parameters. This dictionary collects human-readable characteristics of the model, e.g. the number
of clusters used, the distance metric applied, etc.
cluster_centers : DataFrame
Cluster centers computed to obtained the best clustering of all the time-series.
"""
def __init__(self, best_clustering: DataFrame, results: List[SingleResult], characteristics: dict, cluster_centers: DataFrame):
self.best_clustering = best_clustering
self.results = results
self.characteristics = characteristics
self.cluster_centers = cluster_centers
class ClustersModel:
"""
Base class for every cluster model which can be used on a time series.
Parameters
----------
params : dict
A dictionary corresponding to a TIMEX JSON configuration file.
The various attributes, described below, will be extracted from this.
If `params` does not contain the entry `model_parameters`, then TIMEX will attempt to load a default
configuration parameter dictionary.
approach : str
Clustering approach use, it can be of three types: Observation based, Feature based or Model based.
name : str
Class of the model.
distance_metric : str
The class of distance metric which the model will use to meausure similarity among time series.
transformation : str, None, optional
The class of transformation which the model should use. If not specified, the one in `params` will be used.
Attributes
----------
freq : str
If available, the frequency of the time-series.
test_values : int **
Number of the last points of the time-series to use for the validation set. Default 0. If this is not available
in the configuration parameter dictionary, `test_percentage` will be used.
test_percentage : float **
Percentage of the time-series length to used for the validation set. Default 0
distance_metric : str
Distance metric to to meausure similarity among time series. Default ED
transformation : str
Transformation to apply to the time series before using it. Default None
prediction_lags : int **
Number of future lags for which the prediction has to be made. Default 0
delta_training_percentage : float **
Length, in percentage of the time-series length, of the training windows.
delta_training_values : int **
Length of the training windows, obtained by computing `delta_training_percentage * length of the time-series`.
main_accuracy_estimator : str **
Error metric to use when deciding which prediction is better. Default: MAE.
min_values : dict
Key-values where key is the name of a column and value is the minimum expected value in that column.
If "_all" is in the dict the corresponding value will be used for all values in each column.
max_values : dict
Key-values where key is the name of a column and value is the maximum expected value in that column.
If "_all" is in the dict the corresponding value will be used for all values in each column.
round_to_integer : list
List of columns name which should be rounded to integer. If "_all" is in the list, all values in every column
will be rounded to integer.
model_characteristics : dict
Dictionary of values containing the main characteristics and parameters of the model. Default {}
"""
def __init__(self, params: dict, approach: str, name: str, distance_metric: str = None, transformation: str = None) -> None:
self.name = name
self.approach = approach
log.info(f"Creating a {self.name} model using {self.approach} clustering approach...")
if "model_parameters" not in params:
log.debug(f"Loading default settings...")
parsed = pkgutil.get_data(__name__, "default_prediction_parameters/" + self.name + ".json")
model_parameters = json.loads(parsed)
else:
log.debug(f"Loading user settings...")
model_parameters = params["model_parameters"]
if distance_metric is not None:
self.distance_metric = distance_metric_factory(distance_metric)
else:
self.distance_metric = distance_metric_factory(model_parameters["distance_metric"])
if transformation is not None:
self.transformation = transformation_factory(transformation)
else:
self.transformation = transformation_factory(model_parameters["transformation"])
if "min_values" in model_parameters:
self.min_values = model_parameters["min_values"]
else:
self.min_values = None
if "max_values" in model_parameters:
self.max_values = model_parameters["max_values"]
else:
self.max_values = None
if "round_to_integer" in model_parameters:
self.round_to_integer = list(model_parameters["round_to_integer"].split(","))
else:
self.round_to_integer = None
self.prediction_lags = model_parameters["prediction_lags"]
self.main_accuracy_estimator = model_parameters["main_accuracy_estimator"]
self.delta_training_values = 0
self.model_characteristics = {}
self.freq = ""
log.debug(f"Finished model creation.")
def train(self, ingested_data: DataFrame):
"""
Train the model using all the columns of `ingested_data`. The predictor, after launching `train`, is ready to make predictions for the future.
Note that this and `predict` do not split anything in training data/validation data; this is done with other
functions, like `launch_model`.
Parameters
----------
ingested_data : DataFrame
Training set. The entire time-series in the first column of `ingested_data` will be used for training.
Examples
--------
We will use as example the `timexseries_clustering.data_prediction.models.prophet_predictor.FBProphetModel`, an instance of
`Predictor`.
>>> param_config = {} # This will make the predictor use default values...
>>> predictor = FBProphetModel(params=param_config)
Create some training data.
>>> dates = pd.date_range('2000-01-01', periods=30) # Last index is 2000-01-30
>>> ds = pd.DatetimeIndex(dates, freq="D")
>>> a = np.arange(30, 60)
>>> training_dataframe = DataFrame(data={"a": a}, index=ds)
Train the model.
>>> predictor.train(training_dataframe)
"""
pass
def predict(self, future_dataframe: DataFrame) -> DataFrame:
"""
Return a DataFrame with the shape of `future_dataframe`, filled with predicted values.
This function is used by `compute_training`; `future_dataframe` has the length of the time-series used for
training, plus the number of desired prediction points.
Additional `extra_regressor`, which will contain additional time-series useful to improve the prediction in case
of multivariate models, must have the same points of `future_dataframe`.
Parameters
----------
future_dataframe : DataFrame
DataFrame which will be filled with prediction values. This DataFrame should have the same index of the data
used for training (plus the number of predicted values), and a column named `yhat`, which corresponds to the
prediction that should be computed.
Returns
-------
forecast : DataFrame
DataFrame which contains the prediction computed by the model, in the column `yhat`. `forecast` may contain
additional columns, e.g. `yhat_lower`, `yhat_upper` etc.
Examples
--------
We will use as example the `timexseries.data_prediction.models.prophet_predictor.FBProphetModel`, an instance of
`Predictor`.
If the model has been trained, as shown in `predict` example, we can create a forecast.
First, create the future dataframe which will be filled:
>>> future_dates = pd.date_range('2000-01-31', periods=7)
>>> future_ds = pd.DatetimeIndex(future_dates, freq="D")
>>> future_df = DataFrame(columns=["yhat"], index=future_ds)
>>> future_df
yhat
ds
2000-01-31 NaN
2000-02-01 NaN
2000-02-02 NaN
2000-02-03 NaN
2000-02-04 NaN
2000-02-05 NaN
2000-02-06 NaN
Now we can create the forecast:
>>> predictions = predictor.predict(future_dataframe=future_df)
>>> predictions
ds
2000-01-31 59.992592
2000-02-01 60.992592
2000-02-02 61.992592
2000-02-03 62.992592
2000-02-04 63.992592
2000-02-05 64.992592
2000-02-06 65.992592
Name: yhat, dtype: float64
"""
pass
def _compute_trainings(self, train_ts: DataFrame, test_ts: DataFrame, max_threads: int):
"""
Compute the training of a model on a set of different training sets, of increasing length.
`train_ts` is split in `n` different training sets, according to the length of `train_ts` and the value of
`self.delta_training_values`. The computation is split across different processes, according to the value of
max_threads which indicates the maximum number of usable processes.
Parameters
----------
train_ts : DataFrame
The entire training set which can be used; it will be split in different training sets, in order to test
which sub-training-set performs better.
test_ts : DataFrame
Testing set to be used to compute the models' performances.
extra_regressors : DataFrame
Additional time-series to pass to `train` in order to improve the performances.
max_threads : int
Maximum number of threads to use in the training phase.
Returns
-------
results : list
List of SingleResult. Each one is the result relative to the use of a specific train set.
"""
train_sets_number = math.ceil(len(train_ts) / self.delta_training_values)
log.info(f"Model will use {train_sets_number} different training sets...")
def c(targets: List[int], _return_dict: dict, thread_number: int):
_results = []
for _i in range(targets[0], targets[1]):
tr = train_ts.iloc[-(_i+1) * self.delta_training_values:]
log.debug(f"Trying with last {len(tr)} values as training set, in thread {thread_number}")
self.train(tr.copy())
future_df = pd.DataFrame(index=pd.date_range(freq=self.freq,
start=tr.index.values[0],
periods=len(tr) + self.test_values + self.prediction_lags),
columns=["yhat"], dtype=tr.iloc[:, 0].dtype)
forecast = self.predict(future_df)
forecast.loc[:, 'yhat'] = self.transformation.inverse(forecast['yhat'])
try:
forecast.loc[:, 'yhat_lower'] = self.transformation.inverse(forecast['yhat_lower'])
forecast.loc[:, 'yhat_upper'] = self.transformation.inverse(forecast['yhat_upper'])
except KeyError:
pass
forecast = self.adjust_forecast(train_ts.columns[0], forecast)
testing_prediction = forecast.iloc[-self.prediction_lags - self.test_values:-self.prediction_lags]
first_used_index = tr.index.values[0]
tp = ValidationPerformance(first_used_index)
tp.set_testing_stats(test_ts.iloc[:, 0], testing_prediction["yhat"])
_results.append(SingleResult(forecast, tp))
_return_dict[thread_number] = _results
manager = multiprocessing.Manager()
return_dict = manager.dict()
processes = []
if self.name == 'LSTM' or self.name == 'NeuralProphet':
log.info(f"LSTM/NeuralProphet model. Cant use multiprocessing.")
return_d = {}
distributions = [[0, train_sets_number]]
c(distributions[0], return_d, 0)
return return_d[0]
if max_threads == 1:
return_d = {}
distributions = [[0, train_sets_number]]
c(distributions[0], return_d, 0)
return return_d[0]
else:
distributions = []
if train_sets_number % max_threads == 0:
n_threads = max_threads
subtraining_dim = train_sets_number // n_threads
for i in range(0, n_threads):
distributions.append([i*subtraining_dim, i*subtraining_dim + subtraining_dim])
else:
n_threads = min(max_threads, train_sets_number)
subtraining_dim = train_sets_number // n_threads
for i in range(0, n_threads):
distributions.append([i*subtraining_dim, i*subtraining_dim+subtraining_dim])
for k in range(0, (train_sets_number % n_threads)):
distributions[k][1] += 1
distributions[k+1::] = [ [x+1, y+1] for x, y in distributions[k+1::]]
for i in range(0, n_threads):
processes.append(multiprocessing.Process(target=c, args=(distributions[i], return_dict, i)))
processes[-1].start()
for p in processes:
p.join()
results = reduce(lambda x, y: x+y, [return_dict[key] for key in return_dict])
return results
def _compute_best_prediction(self, ingested_data: DataFrame, training_results: List[SingleResult]):
"""
Given the ingested data and the training results, identify the best training window and compute a prediction
using all the possible data, till the end of the series (hence, including the validation set).
Parameters
----------
ingested_data : DataFrame
Initial time-series data, in a DataFrame. The first column of the DataFrame is the time-series.
training_results : [SingleResult]
List of `SingleResult` object: each one is the result of the model on a specific training-set.
Returns
-------
DataFrame
Best available prediction for this time-series, with this model.
"""
training_results.sort(key=lambda x: getattr(x.testing_performances, self.main_accuracy_estimator.upper()))
best_starting_index = training_results[0].testing_performances.first_used_index
training_data = ingested_data.copy().loc[best_starting_index:]
training_data.iloc[:, 0] = self.transformation.apply(training_data.iloc[:, 0])
self.train(training_data.copy())
future_df = pd.DataFrame(index=pd.date_range(freq=self.freq,
start=training_data.index.values[0],
periods=len(training_data) + self.prediction_lags),
columns=["yhat"], dtype=training_data.iloc[:, 0].dtype)
forecast = self.predict(future_df)
forecast.loc[:, 'yhat'] = self.transformation.inverse(forecast['yhat'])
try:
forecast.loc[:, 'yhat_lower'] = self.transformation.inverse(forecast['yhat_lower'])
forecast.loc[:, 'yhat_upper'] = self.transformation.inverse(forecast['yhat_upper'])
except KeyError:
pass
forecast = self.adjust_forecast(training_data.columns[0], forecast)
return forecast
def launch_model(self, ingested_data: DataFrame, extra_regressors: DataFrame = None, max_threads: int = 1)-> ModelResult:
"""
Train the model on `ingested_data` and returns a `ModelResult` object.
This function is at the highest possible level of abstraction to train a model on a time-series.
Parameters
----------
ingested_data : DataFrame
DataFrame containing the historical time series value; it will be split in training and test parts.
extra_regressors : DataFrame, optional, default None
Additional time-series to passed to `train` in order to improve the performances.
max_threads : int, optional, default 1
Maximum number of threads to use in the training phase.
Returns
-------
model_result : ModelResult
`ModelResult` containing the results of the model, trained on ingested_data.
Examples
--------
First, create some training data:
>>> dates = pd.date_range('2000-01-01', periods=30) # Last index is 2000-01-30
>>> ds = pd.DatetimeIndex(dates, freq="D")
>>> a = np.arange(30, 60)
>>> timeseries_dataframe = DataFrame(data={"a": a}, index=ds)
Create the model, with default values:
>>> param_config = {}
>>> predictor = FBProphetModel(params=param_config)
Launch the model:
>>> model_output = predictor.launch_model(timeseries_dataframe)
The result is an object of class `ModelResult`: it contains the best prediction...
>>> model_output.best_prediction['yhat']
ds
2000-01-22 51.0
2000-01-23 52.0
2000-01-24 53.0
2000-01-25 54.0
2000-01-26 55.0
2000-01-27 56.0
2000-01-28 57.0
2000-01-29 58.0
2000-01-30 59.0
2000-01-31 60.0
2000-02-01 61.0
2000-02-02 62.0
2000-02-03 63.0
2000-02-04 64.0
2000-02-05 65.0
2000-02-06 66.0
2000-02-07 67.0
2000-02-08 68.0
2000-02-09 69.0
Name: yhat, dtype: float64
As well as the `characteristic` dictionary, which contains useful information on the model:
>>> model_output.characteristics
{'name': 'FBProphet', 'delta_training_percentage': 20, 'delta_training_values': 6, 'test_values': 3,
'transformation': <timexseries.data_prediction.transformation.Identity object at 0x7f29214b3a00>}
The `results` attribute, instead, contains the results of the training on each of the tested sub-training-sets.
>>> model_output.results
[<timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e4a90>,
<timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e48b0>,
<timexseries.data_prediction.models.predictor.SingleResult at 0x7f286793a730>,
<timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e4c10>,
<timexseries.data_prediction.models.predictor.SingleResult at 0x7f2875cd2490>]
Each `SingleResult` has an attribute `prediction`, which contains the prediction
on the validation set (in this case, composed by the last 3 values of `timeseries_dataframe`) and
`testing_performances` which recaps the performance, in terms of MAE, MSE, etc. of that `SingleResult` on the
validation set.
"""
model_characteristics = self.model_characteristics
#self.delta_training_values = int(round(len(ingested_data) * self.delta_training_percentage / 100))
#if self.test_values == -1:
# self.test_values = int(round(len(ingested_data) * (self.test_percentage / 100)))
self.freq = pd.infer_freq(ingested_data.index)
# We need to pass ingested data both to compute_training and compute_best_prediction, so better use copy()
# because, otherwise, we may have side effects.
#train_ts = ingested_data.copy().iloc[:-self.test_values]
#test_ts = ingested_data.copy().iloc[-self.test_values:]
train_ts = ingested_data.copy()
test_ts = ingested_data.copy()
train_ts.iloc[:, 0] = self.transformation.apply(train_ts.iloc[:, 0])
model_training_results = self._compute_trainings(train_ts, test_ts, extra_regressors, max_threads)
best_prediction = self._compute_best_prediction(ingested_data, model_training_results, extra_regressors)
if extra_regressors is not None:
model_characteristics["extra_regressors"] = ', '.join([*extra_regressors.columns])
model_characteristics["name"] = self.name
#model_characteristics["delta_training_percentage"] = self.delta_training_percentage
#model_characteristics["delta_training_values"] = self.delta_training_values
#model_characteristics["test_values"] = self.test_values
model_characteristics["transformation"] = self.transformation
return ModelResult(results=model_training_results, characteristics=model_characteristics,
best_prediction=best_prediction)
def adjust_forecast(self, column_name: str, df: DataFrame):
"""
Check `s` for values below the minimum set by the user (if any) or above the maximum.
Apply the rounding to integer, if the user specified it.
Parameters
----------
column_name : str
Name of the column.
df : DataFrame
Forecast to check.
Returns
-------
DataFrame
Adjusted dataframe.
"""
if self.min_values is not None:
min_value = None
if "_all" in self.min_values:
min_value = self.min_values["_all"]
elif column_name in self.min_values:
min_value = self.min_values[column_name]
if min_value is not None:
df.loc[:, 'yhat'] = df['yhat'].apply(lambda x: min_value if x < min_value else x)
try:
df.loc[:, 'yhat_lower'] = df['yhat_lower'].apply(lambda x: min_value if x < min_value else x)
df.loc[:, 'yhat_upper'] = df['yhat_upper'].apply(lambda x: min_value if x < min_value else x)
except KeyError:
pass
if self.max_values is not None:
max_value = None
if "_all" in self.max_values:
max_value = self.max_values["_all"]
elif column_name in self.max_values:
max_value = self.max_values[column_name]
if max_value is not None:
df.loc[:, 'yhat'] = df['yhat'].apply(lambda x: max_value if x > max_value else x)
try:
df.loc[:, 'yhat_lower'] = df['yhat_lower'].apply(lambda x: max_value if x > max_value else x)
df.loc[:, 'yhat_upper'] = df['yhat_upper'].apply(lambda x: max_value if x > max_value else x)
except KeyError:
pass
if self.round_to_integer is not None:
if "_all" in self.round_to_integer or column_name in self.round_to_integer:
try:
df = df.astype({"yhat": 'int', "yhat_lower": 'int', "yhat_upper": 'int'})
except KeyError:
df = df.astype({"yhat": 'int'})
return df
Classes
class ClustersModel (params: dict, approach: str, name: str, distance_metric: str = None, transformation: str = None)
-
Base class for every cluster model which can be used on a time series.
Parameters
params
:dict
- A dictionary corresponding to a TIMEX JSON configuration file.
The various attributes, described below, will be extracted from this.
If
params
does not contain the entrymodel_parameters
, then TIMEX will attempt to load a default configuration parameter dictionary. approach
:str
- Clustering approach use, it can be of three types: Observation based, Feature based or Model based.
name
:str
- Class of the model.
distance_metric
:str
- The class of distance metric which the model will use to meausure similarity among time series.
transformation
:str, None
, optional- The class of transformation which the model should use. If not specified, the one in
params
will be used.
Attributes
freq
:str
- If available, the frequency of the time-series.
test_values
:int **
- Number of the last points of the time-series to use for the validation set. Default 0. If this is not available
in the configuration parameter dictionary,
test_percentage
will be used. test_percentage
:float **
- Percentage of the time-series length to used for the validation set. Default 0
distance_metric
:str
- Distance metric to to meausure similarity among time series. Default ED
transformation
:str
- Transformation to apply to the time series before using it. Default None
prediction_lags
:int **
- Number of future lags for which the prediction has to be made. Default 0
delta_training_percentage
:float **
- Length, in percentage of the time-series length, of the training windows.
delta_training_values
:int **
- Length of the training windows, obtained by computing
delta_training_percentage * length of the time-series
. main_accuracy_estimator
:str **
- Error metric to use when deciding which prediction is better. Default: MAE.
min_values
:dict
- Key-values where key is the name of a column and value is the minimum expected value in that column. If "_all" is in the dict the corresponding value will be used for all values in each column.
max_values
:dict
- Key-values where key is the name of a column and value is the maximum expected value in that column. If "_all" is in the dict the corresponding value will be used for all values in each column.
round_to_integer
:list
- List of columns name which should be rounded to integer. If "_all" is in the list, all values in every column will be rounded to integer.
model_characteristics
:dict
- Dictionary of values containing the main characteristics and parameters of the model. Default {}
Expand source code
class ClustersModel: """ Base class for every cluster model which can be used on a time series. Parameters ---------- params : dict A dictionary corresponding to a TIMEX JSON configuration file. The various attributes, described below, will be extracted from this. If `params` does not contain the entry `model_parameters`, then TIMEX will attempt to load a default configuration parameter dictionary. approach : str Clustering approach use, it can be of three types: Observation based, Feature based or Model based. name : str Class of the model. distance_metric : str The class of distance metric which the model will use to meausure similarity among time series. transformation : str, None, optional The class of transformation which the model should use. If not specified, the one in `params` will be used. Attributes ---------- freq : str If available, the frequency of the time-series. test_values : int ** Number of the last points of the time-series to use for the validation set. Default 0. If this is not available in the configuration parameter dictionary, `test_percentage` will be used. test_percentage : float ** Percentage of the time-series length to used for the validation set. Default 0 distance_metric : str Distance metric to to meausure similarity among time series. Default ED transformation : str Transformation to apply to the time series before using it. Default None prediction_lags : int ** Number of future lags for which the prediction has to be made. Default 0 delta_training_percentage : float ** Length, in percentage of the time-series length, of the training windows. delta_training_values : int ** Length of the training windows, obtained by computing `delta_training_percentage * length of the time-series`. main_accuracy_estimator : str ** Error metric to use when deciding which prediction is better. Default: MAE. min_values : dict Key-values where key is the name of a column and value is the minimum expected value in that column. If "_all" is in the dict the corresponding value will be used for all values in each column. max_values : dict Key-values where key is the name of a column and value is the maximum expected value in that column. If "_all" is in the dict the corresponding value will be used for all values in each column. round_to_integer : list List of columns name which should be rounded to integer. If "_all" is in the list, all values in every column will be rounded to integer. model_characteristics : dict Dictionary of values containing the main characteristics and parameters of the model. Default {} """ def __init__(self, params: dict, approach: str, name: str, distance_metric: str = None, transformation: str = None) -> None: self.name = name self.approach = approach log.info(f"Creating a {self.name} model using {self.approach} clustering approach...") if "model_parameters" not in params: log.debug(f"Loading default settings...") parsed = pkgutil.get_data(__name__, "default_prediction_parameters/" + self.name + ".json") model_parameters = json.loads(parsed) else: log.debug(f"Loading user settings...") model_parameters = params["model_parameters"] if distance_metric is not None: self.distance_metric = distance_metric_factory(distance_metric) else: self.distance_metric = distance_metric_factory(model_parameters["distance_metric"]) if transformation is not None: self.transformation = transformation_factory(transformation) else: self.transformation = transformation_factory(model_parameters["transformation"]) if "min_values" in model_parameters: self.min_values = model_parameters["min_values"] else: self.min_values = None if "max_values" in model_parameters: self.max_values = model_parameters["max_values"] else: self.max_values = None if "round_to_integer" in model_parameters: self.round_to_integer = list(model_parameters["round_to_integer"].split(",")) else: self.round_to_integer = None self.prediction_lags = model_parameters["prediction_lags"] self.main_accuracy_estimator = model_parameters["main_accuracy_estimator"] self.delta_training_values = 0 self.model_characteristics = {} self.freq = "" log.debug(f"Finished model creation.") def train(self, ingested_data: DataFrame): """ Train the model using all the columns of `ingested_data`. The predictor, after launching `train`, is ready to make predictions for the future. Note that this and `predict` do not split anything in training data/validation data; this is done with other functions, like `launch_model`. Parameters ---------- ingested_data : DataFrame Training set. The entire time-series in the first column of `ingested_data` will be used for training. Examples -------- We will use as example the `timexseries_clustering.data_prediction.models.prophet_predictor.FBProphetModel`, an instance of `Predictor`. >>> param_config = {} # This will make the predictor use default values... >>> predictor = FBProphetModel(params=param_config) Create some training data. >>> dates = pd.date_range('2000-01-01', periods=30) # Last index is 2000-01-30 >>> ds = pd.DatetimeIndex(dates, freq="D") >>> a = np.arange(30, 60) >>> training_dataframe = DataFrame(data={"a": a}, index=ds) Train the model. >>> predictor.train(training_dataframe) """ pass def predict(self, future_dataframe: DataFrame) -> DataFrame: """ Return a DataFrame with the shape of `future_dataframe`, filled with predicted values. This function is used by `compute_training`; `future_dataframe` has the length of the time-series used for training, plus the number of desired prediction points. Additional `extra_regressor`, which will contain additional time-series useful to improve the prediction in case of multivariate models, must have the same points of `future_dataframe`. Parameters ---------- future_dataframe : DataFrame DataFrame which will be filled with prediction values. This DataFrame should have the same index of the data used for training (plus the number of predicted values), and a column named `yhat`, which corresponds to the prediction that should be computed. Returns ------- forecast : DataFrame DataFrame which contains the prediction computed by the model, in the column `yhat`. `forecast` may contain additional columns, e.g. `yhat_lower`, `yhat_upper` etc. Examples -------- We will use as example the `timexseries.data_prediction.models.prophet_predictor.FBProphetModel`, an instance of `Predictor`. If the model has been trained, as shown in `predict` example, we can create a forecast. First, create the future dataframe which will be filled: >>> future_dates = pd.date_range('2000-01-31', periods=7) >>> future_ds = pd.DatetimeIndex(future_dates, freq="D") >>> future_df = DataFrame(columns=["yhat"], index=future_ds) >>> future_df yhat ds 2000-01-31 NaN 2000-02-01 NaN 2000-02-02 NaN 2000-02-03 NaN 2000-02-04 NaN 2000-02-05 NaN 2000-02-06 NaN Now we can create the forecast: >>> predictions = predictor.predict(future_dataframe=future_df) >>> predictions ds 2000-01-31 59.992592 2000-02-01 60.992592 2000-02-02 61.992592 2000-02-03 62.992592 2000-02-04 63.992592 2000-02-05 64.992592 2000-02-06 65.992592 Name: yhat, dtype: float64 """ pass def _compute_trainings(self, train_ts: DataFrame, test_ts: DataFrame, max_threads: int): """ Compute the training of a model on a set of different training sets, of increasing length. `train_ts` is split in `n` different training sets, according to the length of `train_ts` and the value of `self.delta_training_values`. The computation is split across different processes, according to the value of max_threads which indicates the maximum number of usable processes. Parameters ---------- train_ts : DataFrame The entire training set which can be used; it will be split in different training sets, in order to test which sub-training-set performs better. test_ts : DataFrame Testing set to be used to compute the models' performances. extra_regressors : DataFrame Additional time-series to pass to `train` in order to improve the performances. max_threads : int Maximum number of threads to use in the training phase. Returns ------- results : list List of SingleResult. Each one is the result relative to the use of a specific train set. """ train_sets_number = math.ceil(len(train_ts) / self.delta_training_values) log.info(f"Model will use {train_sets_number} different training sets...") def c(targets: List[int], _return_dict: dict, thread_number: int): _results = [] for _i in range(targets[0], targets[1]): tr = train_ts.iloc[-(_i+1) * self.delta_training_values:] log.debug(f"Trying with last {len(tr)} values as training set, in thread {thread_number}") self.train(tr.copy()) future_df = pd.DataFrame(index=pd.date_range(freq=self.freq, start=tr.index.values[0], periods=len(tr) + self.test_values + self.prediction_lags), columns=["yhat"], dtype=tr.iloc[:, 0].dtype) forecast = self.predict(future_df) forecast.loc[:, 'yhat'] = self.transformation.inverse(forecast['yhat']) try: forecast.loc[:, 'yhat_lower'] = self.transformation.inverse(forecast['yhat_lower']) forecast.loc[:, 'yhat_upper'] = self.transformation.inverse(forecast['yhat_upper']) except KeyError: pass forecast = self.adjust_forecast(train_ts.columns[0], forecast) testing_prediction = forecast.iloc[-self.prediction_lags - self.test_values:-self.prediction_lags] first_used_index = tr.index.values[0] tp = ValidationPerformance(first_used_index) tp.set_testing_stats(test_ts.iloc[:, 0], testing_prediction["yhat"]) _results.append(SingleResult(forecast, tp)) _return_dict[thread_number] = _results manager = multiprocessing.Manager() return_dict = manager.dict() processes = [] if self.name == 'LSTM' or self.name == 'NeuralProphet': log.info(f"LSTM/NeuralProphet model. Cant use multiprocessing.") return_d = {} distributions = [[0, train_sets_number]] c(distributions[0], return_d, 0) return return_d[0] if max_threads == 1: return_d = {} distributions = [[0, train_sets_number]] c(distributions[0], return_d, 0) return return_d[0] else: distributions = [] if train_sets_number % max_threads == 0: n_threads = max_threads subtraining_dim = train_sets_number // n_threads for i in range(0, n_threads): distributions.append([i*subtraining_dim, i*subtraining_dim + subtraining_dim]) else: n_threads = min(max_threads, train_sets_number) subtraining_dim = train_sets_number // n_threads for i in range(0, n_threads): distributions.append([i*subtraining_dim, i*subtraining_dim+subtraining_dim]) for k in range(0, (train_sets_number % n_threads)): distributions[k][1] += 1 distributions[k+1::] = [ [x+1, y+1] for x, y in distributions[k+1::]] for i in range(0, n_threads): processes.append(multiprocessing.Process(target=c, args=(distributions[i], return_dict, i))) processes[-1].start() for p in processes: p.join() results = reduce(lambda x, y: x+y, [return_dict[key] for key in return_dict]) return results def _compute_best_prediction(self, ingested_data: DataFrame, training_results: List[SingleResult]): """ Given the ingested data and the training results, identify the best training window and compute a prediction using all the possible data, till the end of the series (hence, including the validation set). Parameters ---------- ingested_data : DataFrame Initial time-series data, in a DataFrame. The first column of the DataFrame is the time-series. training_results : [SingleResult] List of `SingleResult` object: each one is the result of the model on a specific training-set. Returns ------- DataFrame Best available prediction for this time-series, with this model. """ training_results.sort(key=lambda x: getattr(x.testing_performances, self.main_accuracy_estimator.upper())) best_starting_index = training_results[0].testing_performances.first_used_index training_data = ingested_data.copy().loc[best_starting_index:] training_data.iloc[:, 0] = self.transformation.apply(training_data.iloc[:, 0]) self.train(training_data.copy()) future_df = pd.DataFrame(index=pd.date_range(freq=self.freq, start=training_data.index.values[0], periods=len(training_data) + self.prediction_lags), columns=["yhat"], dtype=training_data.iloc[:, 0].dtype) forecast = self.predict(future_df) forecast.loc[:, 'yhat'] = self.transformation.inverse(forecast['yhat']) try: forecast.loc[:, 'yhat_lower'] = self.transformation.inverse(forecast['yhat_lower']) forecast.loc[:, 'yhat_upper'] = self.transformation.inverse(forecast['yhat_upper']) except KeyError: pass forecast = self.adjust_forecast(training_data.columns[0], forecast) return forecast def launch_model(self, ingested_data: DataFrame, extra_regressors: DataFrame = None, max_threads: int = 1)-> ModelResult: """ Train the model on `ingested_data` and returns a `ModelResult` object. This function is at the highest possible level of abstraction to train a model on a time-series. Parameters ---------- ingested_data : DataFrame DataFrame containing the historical time series value; it will be split in training and test parts. extra_regressors : DataFrame, optional, default None Additional time-series to passed to `train` in order to improve the performances. max_threads : int, optional, default 1 Maximum number of threads to use in the training phase. Returns ------- model_result : ModelResult `ModelResult` containing the results of the model, trained on ingested_data. Examples -------- First, create some training data: >>> dates = pd.date_range('2000-01-01', periods=30) # Last index is 2000-01-30 >>> ds = pd.DatetimeIndex(dates, freq="D") >>> a = np.arange(30, 60) >>> timeseries_dataframe = DataFrame(data={"a": a}, index=ds) Create the model, with default values: >>> param_config = {} >>> predictor = FBProphetModel(params=param_config) Launch the model: >>> model_output = predictor.launch_model(timeseries_dataframe) The result is an object of class `ModelResult`: it contains the best prediction... >>> model_output.best_prediction['yhat'] ds 2000-01-22 51.0 2000-01-23 52.0 2000-01-24 53.0 2000-01-25 54.0 2000-01-26 55.0 2000-01-27 56.0 2000-01-28 57.0 2000-01-29 58.0 2000-01-30 59.0 2000-01-31 60.0 2000-02-01 61.0 2000-02-02 62.0 2000-02-03 63.0 2000-02-04 64.0 2000-02-05 65.0 2000-02-06 66.0 2000-02-07 67.0 2000-02-08 68.0 2000-02-09 69.0 Name: yhat, dtype: float64 As well as the `characteristic` dictionary, which contains useful information on the model: >>> model_output.characteristics {'name': 'FBProphet', 'delta_training_percentage': 20, 'delta_training_values': 6, 'test_values': 3, 'transformation': <timexseries.data_prediction.transformation.Identity object at 0x7f29214b3a00>} The `results` attribute, instead, contains the results of the training on each of the tested sub-training-sets. >>> model_output.results [<timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e4a90>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e48b0>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f286793a730>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e4c10>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f2875cd2490>] Each `SingleResult` has an attribute `prediction`, which contains the prediction on the validation set (in this case, composed by the last 3 values of `timeseries_dataframe`) and `testing_performances` which recaps the performance, in terms of MAE, MSE, etc. of that `SingleResult` on the validation set. """ model_characteristics = self.model_characteristics #self.delta_training_values = int(round(len(ingested_data) * self.delta_training_percentage / 100)) #if self.test_values == -1: # self.test_values = int(round(len(ingested_data) * (self.test_percentage / 100))) self.freq = pd.infer_freq(ingested_data.index) # We need to pass ingested data both to compute_training and compute_best_prediction, so better use copy() # because, otherwise, we may have side effects. #train_ts = ingested_data.copy().iloc[:-self.test_values] #test_ts = ingested_data.copy().iloc[-self.test_values:] train_ts = ingested_data.copy() test_ts = ingested_data.copy() train_ts.iloc[:, 0] = self.transformation.apply(train_ts.iloc[:, 0]) model_training_results = self._compute_trainings(train_ts, test_ts, extra_regressors, max_threads) best_prediction = self._compute_best_prediction(ingested_data, model_training_results, extra_regressors) if extra_regressors is not None: model_characteristics["extra_regressors"] = ', '.join([*extra_regressors.columns]) model_characteristics["name"] = self.name #model_characteristics["delta_training_percentage"] = self.delta_training_percentage #model_characteristics["delta_training_values"] = self.delta_training_values #model_characteristics["test_values"] = self.test_values model_characteristics["transformation"] = self.transformation return ModelResult(results=model_training_results, characteristics=model_characteristics, best_prediction=best_prediction) def adjust_forecast(self, column_name: str, df: DataFrame): """ Check `s` for values below the minimum set by the user (if any) or above the maximum. Apply the rounding to integer, if the user specified it. Parameters ---------- column_name : str Name of the column. df : DataFrame Forecast to check. Returns ------- DataFrame Adjusted dataframe. """ if self.min_values is not None: min_value = None if "_all" in self.min_values: min_value = self.min_values["_all"] elif column_name in self.min_values: min_value = self.min_values[column_name] if min_value is not None: df.loc[:, 'yhat'] = df['yhat'].apply(lambda x: min_value if x < min_value else x) try: df.loc[:, 'yhat_lower'] = df['yhat_lower'].apply(lambda x: min_value if x < min_value else x) df.loc[:, 'yhat_upper'] = df['yhat_upper'].apply(lambda x: min_value if x < min_value else x) except KeyError: pass if self.max_values is not None: max_value = None if "_all" in self.max_values: max_value = self.max_values["_all"] elif column_name in self.max_values: max_value = self.max_values[column_name] if max_value is not None: df.loc[:, 'yhat'] = df['yhat'].apply(lambda x: max_value if x > max_value else x) try: df.loc[:, 'yhat_lower'] = df['yhat_lower'].apply(lambda x: max_value if x > max_value else x) df.loc[:, 'yhat_upper'] = df['yhat_upper'].apply(lambda x: max_value if x > max_value else x) except KeyError: pass if self.round_to_integer is not None: if "_all" in self.round_to_integer or column_name in self.round_to_integer: try: df = df.astype({"yhat": 'int', "yhat_lower": 'int', "yhat_upper": 'int'}) except KeyError: df = df.astype({"yhat": 'int'}) return df
Methods
def adjust_forecast(self, column_name: str, df: pandas.core.frame.DataFrame)
-
Check
s
for values below the minimum set by the user (if any) or above the maximum. Apply the rounding to integer, if the user specified it.Parameters
column_name
:str
- Name of the column.
df
:DataFrame
- Forecast to check.
Returns
DataFrame
- Adjusted dataframe.
Expand source code
def adjust_forecast(self, column_name: str, df: DataFrame): """ Check `s` for values below the minimum set by the user (if any) or above the maximum. Apply the rounding to integer, if the user specified it. Parameters ---------- column_name : str Name of the column. df : DataFrame Forecast to check. Returns ------- DataFrame Adjusted dataframe. """ if self.min_values is not None: min_value = None if "_all" in self.min_values: min_value = self.min_values["_all"] elif column_name in self.min_values: min_value = self.min_values[column_name] if min_value is not None: df.loc[:, 'yhat'] = df['yhat'].apply(lambda x: min_value if x < min_value else x) try: df.loc[:, 'yhat_lower'] = df['yhat_lower'].apply(lambda x: min_value if x < min_value else x) df.loc[:, 'yhat_upper'] = df['yhat_upper'].apply(lambda x: min_value if x < min_value else x) except KeyError: pass if self.max_values is not None: max_value = None if "_all" in self.max_values: max_value = self.max_values["_all"] elif column_name in self.max_values: max_value = self.max_values[column_name] if max_value is not None: df.loc[:, 'yhat'] = df['yhat'].apply(lambda x: max_value if x > max_value else x) try: df.loc[:, 'yhat_lower'] = df['yhat_lower'].apply(lambda x: max_value if x > max_value else x) df.loc[:, 'yhat_upper'] = df['yhat_upper'].apply(lambda x: max_value if x > max_value else x) except KeyError: pass if self.round_to_integer is not None: if "_all" in self.round_to_integer or column_name in self.round_to_integer: try: df = df.astype({"yhat": 'int', "yhat_lower": 'int', "yhat_upper": 'int'}) except KeyError: df = df.astype({"yhat": 'int'}) return df
def launch_model(self, ingested_data: pandas.core.frame.DataFrame, extra_regressors: pandas.core.frame.DataFrame = None, max_threads: int = 1) ‑> ModelResult
-
Train the model on
ingested_data
and returns aModelResult
object. This function is at the highest possible level of abstraction to train a model on a time-series.Parameters
ingested_data
:DataFrame
- DataFrame containing the historical time series value; it will be split in training and test parts.
extra_regressors
:DataFrame
, optional, defaultNone
- Additional time-series to passed to
train
in order to improve the performances. max_threads
:int
, optional, default1
- Maximum number of threads to use in the training phase.
Returns
model_result
:ModelResult
ModelResult
containing the results of the model, trained on ingested_data.
Examples
First, create some training data:
>>> dates = pd.date_range('2000-01-01', periods=30) # Last index is 2000-01-30 >>> ds = pd.DatetimeIndex(dates, freq="D") >>> a = np.arange(30, 60) >>> timeseries_dataframe = DataFrame(data={"a": a}, index=ds)
Create the model, with default values:
>>> param_config = {} >>> predictor = FBProphetModel(params=param_config)
Launch the model:
>>> model_output = predictor.launch_model(timeseries_dataframe)
The result is an object of class
ModelResult
: it contains the best prediction…>>> model_output.best_prediction['yhat'] ds 2000-01-22 51.0 2000-01-23 52.0 2000-01-24 53.0 2000-01-25 54.0 2000-01-26 55.0 2000-01-27 56.0 2000-01-28 57.0 2000-01-29 58.0 2000-01-30 59.0 2000-01-31 60.0 2000-02-01 61.0 2000-02-02 62.0 2000-02-03 63.0 2000-02-04 64.0 2000-02-05 65.0 2000-02-06 66.0 2000-02-07 67.0 2000-02-08 68.0 2000-02-09 69.0 Name: yhat, dtype: float64
As well as the
characteristic
dictionary, which contains useful information on the model:>>> model_output.characteristics {'name': 'FBProphet', 'delta_training_percentage': 20, 'delta_training_values': 6, 'test_values': 3, 'transformation': <timexseries.data_prediction.transformation.Identity object at 0x7f29214b3a00>}
The
results
attribute, instead, contains the results of the training on each of the tested sub-training-sets.>>> model_output.results [<timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e4a90>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e48b0>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f286793a730>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e4c10>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f2875cd2490>]
Each
SingleResult
has an attributeprediction
, which contains the prediction on the validation set (in this case, composed by the last 3 values oftimeseries_dataframe
) andtesting_performances
which recaps the performance, in terms of MAE, MSE, etc. of thatSingleResult
on the validation set.Expand source code
def launch_model(self, ingested_data: DataFrame, extra_regressors: DataFrame = None, max_threads: int = 1)-> ModelResult: """ Train the model on `ingested_data` and returns a `ModelResult` object. This function is at the highest possible level of abstraction to train a model on a time-series. Parameters ---------- ingested_data : DataFrame DataFrame containing the historical time series value; it will be split in training and test parts. extra_regressors : DataFrame, optional, default None Additional time-series to passed to `train` in order to improve the performances. max_threads : int, optional, default 1 Maximum number of threads to use in the training phase. Returns ------- model_result : ModelResult `ModelResult` containing the results of the model, trained on ingested_data. Examples -------- First, create some training data: >>> dates = pd.date_range('2000-01-01', periods=30) # Last index is 2000-01-30 >>> ds = pd.DatetimeIndex(dates, freq="D") >>> a = np.arange(30, 60) >>> timeseries_dataframe = DataFrame(data={"a": a}, index=ds) Create the model, with default values: >>> param_config = {} >>> predictor = FBProphetModel(params=param_config) Launch the model: >>> model_output = predictor.launch_model(timeseries_dataframe) The result is an object of class `ModelResult`: it contains the best prediction... >>> model_output.best_prediction['yhat'] ds 2000-01-22 51.0 2000-01-23 52.0 2000-01-24 53.0 2000-01-25 54.0 2000-01-26 55.0 2000-01-27 56.0 2000-01-28 57.0 2000-01-29 58.0 2000-01-30 59.0 2000-01-31 60.0 2000-02-01 61.0 2000-02-02 62.0 2000-02-03 63.0 2000-02-04 64.0 2000-02-05 65.0 2000-02-06 66.0 2000-02-07 67.0 2000-02-08 68.0 2000-02-09 69.0 Name: yhat, dtype: float64 As well as the `characteristic` dictionary, which contains useful information on the model: >>> model_output.characteristics {'name': 'FBProphet', 'delta_training_percentage': 20, 'delta_training_values': 6, 'test_values': 3, 'transformation': <timexseries.data_prediction.transformation.Identity object at 0x7f29214b3a00>} The `results` attribute, instead, contains the results of the training on each of the tested sub-training-sets. >>> model_output.results [<timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e4a90>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e48b0>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f286793a730>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f28679e4c10>, <timexseries.data_prediction.models.predictor.SingleResult at 0x7f2875cd2490>] Each `SingleResult` has an attribute `prediction`, which contains the prediction on the validation set (in this case, composed by the last 3 values of `timeseries_dataframe`) and `testing_performances` which recaps the performance, in terms of MAE, MSE, etc. of that `SingleResult` on the validation set. """ model_characteristics = self.model_characteristics #self.delta_training_values = int(round(len(ingested_data) * self.delta_training_percentage / 100)) #if self.test_values == -1: # self.test_values = int(round(len(ingested_data) * (self.test_percentage / 100))) self.freq = pd.infer_freq(ingested_data.index) # We need to pass ingested data both to compute_training and compute_best_prediction, so better use copy() # because, otherwise, we may have side effects. #train_ts = ingested_data.copy().iloc[:-self.test_values] #test_ts = ingested_data.copy().iloc[-self.test_values:] train_ts = ingested_data.copy() test_ts = ingested_data.copy() train_ts.iloc[:, 0] = self.transformation.apply(train_ts.iloc[:, 0]) model_training_results = self._compute_trainings(train_ts, test_ts, extra_regressors, max_threads) best_prediction = self._compute_best_prediction(ingested_data, model_training_results, extra_regressors) if extra_regressors is not None: model_characteristics["extra_regressors"] = ', '.join([*extra_regressors.columns]) model_characteristics["name"] = self.name #model_characteristics["delta_training_percentage"] = self.delta_training_percentage #model_characteristics["delta_training_values"] = self.delta_training_values #model_characteristics["test_values"] = self.test_values model_characteristics["transformation"] = self.transformation return ModelResult(results=model_training_results, characteristics=model_characteristics, best_prediction=best_prediction)
def predict(self, future_dataframe: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame
-
Return a DataFrame with the shape of
future_dataframe
, filled with predicted values.This function is used by
compute_training
;future_dataframe
has the length of the time-series used for training, plus the number of desired prediction points.Additional
extra_regressor
, which will contain additional time-series useful to improve the prediction in case of multivariate models, must have the same points offuture_dataframe
.Parameters
future_dataframe
:DataFrame
- DataFrame which will be filled with prediction values. This DataFrame should have the same index of the data
used for training (plus the number of predicted values), and a column named
yhat
, which corresponds to the prediction that should be computed.
Returns
forecast
:DataFrame
- DataFrame which contains the prediction computed by the model, in the column
yhat
.forecast
may contain additional columns, e.g.yhat_lower
,yhat_upper
etc.
Examples
We will use as example the
timexseries.data_prediction.models.prophet_predictor.FBProphetModel
, an instance ofPredictor
. If the model has been trained, as shown inpredict
example, we can create a forecast.First, create the future dataframe which will be filled:
>>> future_dates = pd.date_range('2000-01-31', periods=7) >>> future_ds = pd.DatetimeIndex(future_dates, freq="D") >>> future_df = DataFrame(columns=["yhat"], index=future_ds) >>> future_df yhat ds 2000-01-31 NaN 2000-02-01 NaN 2000-02-02 NaN 2000-02-03 NaN 2000-02-04 NaN 2000-02-05 NaN 2000-02-06 NaN
Now we can create the forecast:
>>> predictions = predictor.predict(future_dataframe=future_df) >>> predictions ds 2000-01-31 59.992592 2000-02-01 60.992592 2000-02-02 61.992592 2000-02-03 62.992592 2000-02-04 63.992592 2000-02-05 64.992592 2000-02-06 65.992592 Name: yhat, dtype: float64
Expand source code
def predict(self, future_dataframe: DataFrame) -> DataFrame: """ Return a DataFrame with the shape of `future_dataframe`, filled with predicted values. This function is used by `compute_training`; `future_dataframe` has the length of the time-series used for training, plus the number of desired prediction points. Additional `extra_regressor`, which will contain additional time-series useful to improve the prediction in case of multivariate models, must have the same points of `future_dataframe`. Parameters ---------- future_dataframe : DataFrame DataFrame which will be filled with prediction values. This DataFrame should have the same index of the data used for training (plus the number of predicted values), and a column named `yhat`, which corresponds to the prediction that should be computed. Returns ------- forecast : DataFrame DataFrame which contains the prediction computed by the model, in the column `yhat`. `forecast` may contain additional columns, e.g. `yhat_lower`, `yhat_upper` etc. Examples -------- We will use as example the `timexseries.data_prediction.models.prophet_predictor.FBProphetModel`, an instance of `Predictor`. If the model has been trained, as shown in `predict` example, we can create a forecast. First, create the future dataframe which will be filled: >>> future_dates = pd.date_range('2000-01-31', periods=7) >>> future_ds = pd.DatetimeIndex(future_dates, freq="D") >>> future_df = DataFrame(columns=["yhat"], index=future_ds) >>> future_df yhat ds 2000-01-31 NaN 2000-02-01 NaN 2000-02-02 NaN 2000-02-03 NaN 2000-02-04 NaN 2000-02-05 NaN 2000-02-06 NaN Now we can create the forecast: >>> predictions = predictor.predict(future_dataframe=future_df) >>> predictions ds 2000-01-31 59.992592 2000-02-01 60.992592 2000-02-02 61.992592 2000-02-03 62.992592 2000-02-04 63.992592 2000-02-05 64.992592 2000-02-06 65.992592 Name: yhat, dtype: float64 """ pass
def train(self, ingested_data: pandas.core.frame.DataFrame)
-
Train the model using all the columns of
ingested_data
. The predictor, after launchingtrain
, is ready to make predictions for the future.Note that this and
predict
do not split anything in training data/validation data; this is done with other functions, likelaunch_model
.Parameters
ingested_data
:DataFrame
- Training set. The entire time-series in the first column of
ingested_data
will be used for training.
Examples
We will use as example the
timexseries_clustering.data_prediction.models.prophet_predictor.FBProphetModel
, an instance ofPredictor
.>>> param_config = {} # This will make the predictor use default values... >>> predictor = FBProphetModel(params=param_config)
Create some training data.
>>> dates = pd.date_range('2000-01-01', periods=30) # Last index is 2000-01-30 >>> ds = pd.DatetimeIndex(dates, freq="D") >>> a = np.arange(30, 60) >>> training_dataframe = DataFrame(data={"a": a}, index=ds)
Train the model.
>>> predictor.train(training_dataframe)
Expand source code
def train(self, ingested_data: DataFrame): """ Train the model using all the columns of `ingested_data`. The predictor, after launching `train`, is ready to make predictions for the future. Note that this and `predict` do not split anything in training data/validation data; this is done with other functions, like `launch_model`. Parameters ---------- ingested_data : DataFrame Training set. The entire time-series in the first column of `ingested_data` will be used for training. Examples -------- We will use as example the `timexseries_clustering.data_prediction.models.prophet_predictor.FBProphetModel`, an instance of `Predictor`. >>> param_config = {} # This will make the predictor use default values... >>> predictor = FBProphetModel(params=param_config) Create some training data. >>> dates = pd.date_range('2000-01-01', periods=30) # Last index is 2000-01-30 >>> ds = pd.DatetimeIndex(dates, freq="D") >>> a = np.arange(30, 60) >>> training_dataframe = DataFrame(data={"a": a}, index=ds) Train the model. >>> predictor.train(training_dataframe) """ pass
class ModelResult (best_clustering: pandas.core.frame.DataFrame, results: List[SingleResult], characteristics: dict, cluster_centers: pandas.core.frame.DataFrame)
-
Class to collect the global results of a model clustering of all the time-series.
Parameters
best_clustering
:DataFrame
Clustering obtained using the best model parameters and all the available time-series. This are the cluster indexes that users are most likely to want.
results
:[SingleResults]
- List of all the results obtained using all the possible model parameters set for this model and metric, on the time series.
This is useful to create plots which show how the performance vary changing the number of clusters (e.g.
timexseries.data_visualization.functions.performance_plot
). characteristics
:dict
- Model parameters. This dictionary collects human-readable characteristics of the model, e.g. the number of clusters used, the distance metric applied, etc.
cluster_centers
:DataFrame
- Cluster centers computed to obtained the best clustering of all the time-series.
Expand source code
class ModelResult: """ Class to collect the global results of a model clustering of all the time-series. Parameters ---------- best_clustering : DataFrame Clustering obtained using the best model parameters and _all_ the available time-series. This are the cluster indexes that users are most likely to want. results : [SingleResults] List of all the results obtained using all the possible model parameters set for this model and metric, on the time series. This is useful to create plots which show how the performance vary changing the number of clusters (e.g. `timexseries.data_visualization.functions.performance_plot`). characteristics : dict Model parameters. This dictionary collects human-readable characteristics of the model, e.g. the number of clusters used, the distance metric applied, etc. cluster_centers : DataFrame Cluster centers computed to obtained the best clustering of all the time-series. """ def __init__(self, best_clustering: DataFrame, results: List[SingleResult], characteristics: dict, cluster_centers: DataFrame): self.best_clustering = best_clustering self.results = results self.characteristics = characteristics self.cluster_centers = cluster_centers
class SingleResult (characteristics: dict, performances: ValidationPerformance)
-
Class for the result of a model, trained on specific model parameter (e.g using a specific number of clusters and transformation).
Parameters
characteristics
:dict
- Characteristics, using this model parameters
performances
:ValidationPerformance
- Model performances (
timexseries_clustering.data_prediction.validation_performances.ValidationPerformance
), obtained using different number of clusters and transformation.
Expand source code
class SingleResult: """ Class for the result of a model, trained on specific model parameter (e.g using a specific number of clusters and transformation). Parameters ---------- characteristics : dict Characteristics, using this model parameters performances : ValidationPerformance Model performances (`timexseries_clustering.data_prediction.validation_performances.ValidationPerformance`), obtained using different number of clusters and transformation. """ def __init__(self, characteristics: dict, performances: ValidationPerformance): self.characteristics = characteristics self.performances = performances