Source code for zoo.automl.regression.time_sequence_predictor

#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either exp'
# ress or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from zoo.automl.search.RayTuneSearchEngine import RayTuneSearchEngine
from zoo.automl.common.metrics import Evaluator
from zoo.automl.feature.time_sequence import TimeSequenceFeatureTransformer

from zoo.automl.model.time_sequence import TimeSequenceModel
from zoo.automl.pipeline.time_sequence import TimeSequencePipeline
from zoo.automl.common.util import *
from zoo.automl.config.recipe import *


[docs]class TimeSequencePredictor(object): """ Trains a model that predicts future time sequence from past sequence. Past sequence should be > 1. Future sequence can be > 1. For example, predict the next 2 data points from past 5 data points. Output have only one target value (a scalar) for each data point in the sequence. Input can have more than one features (value plus several features) Example usage: tsp = TimeSequencePredictor() tsp.fit(input_df) result = tsp.predict(test_df) """ def __init__(self, name="automl", logs_dir="~/zoo_automl_logs", future_seq_len=1, dt_col="datetime", target_col="value", extra_features_col=None, drop_missing=True, ): """ Constructor of Time Sequence Predictor :param logs_dir where the automl tune logs file located :param future_seq_len: the future sequence length to be predicted :param dt_col: the datetime index column :param target_col: the target col (to be predicted) :param extra_features_col: extra features :param drop_missing: whether to drop missing values in the input """ self.logs_dir = logs_dir self.pipeline = None self.future_seq_len = future_seq_len self.dt_col = dt_col self.target_col = target_col self.extra_features_col = extra_features_col self.drop_missing = drop_missing self.name = name
[docs] def fit(self, input_df, validation_df=None, metric="mse", recipe=SmokeRecipe(), mc=False, resources_per_trial={"cpu": 2}, distributed=False, hdfs_url=None ): """ Trains the model for time sequence prediction. If future sequence length > 1, use seq2seq model, else use vanilla LSTM model. :param input_df: The input time series data frame, Example: datetime value "extra feature 1" "extra feature 2" 2019-01-01 1.9 1 2 2019-01-02 2.3 0 2 :param validation_df: validation data :param metric: String. Metric used for train and validation. Available values are "mean_squared_error" or "r_square" :param recipe: a Recipe object. Various recipes covers different search space and stopping criteria. Default is SmokeRecipe(). :param resources_per_trial: Machine resources to allocate per trial, e.g. ``{"cpu": 64, "gpu": 8}` :param distributed: bool. Indicate if running in distributed mode. If true, we will upload models to HDFS. :param hdfs_url: the hdfs url used to save file in distributed model. If None, the default hdfs_url will be used. :return: self """ self._check_input(input_df, validation_df, metric) if distributed: if hdfs_url is not None: remote_dir = os.path.join(hdfs_url, "ray_results", self.name) else: remote_dir = os.path.join(os.sep, "ray_results", self.name) if self.name not in get_remote_list(os.path.dirname(remote_dir)): cmd = "hadoop fs -mkdir -p {}".format(remote_dir) process(cmd) else: remote_dir = None self.pipeline = self._hp_search( input_df, validation_df=validation_df, metric=metric, recipe=recipe, mc=mc, resources_per_trial=resources_per_trial, remote_dir=remote_dir) return self.pipeline
[docs] def evaluate(self, input_df, metric=None ): """ Evaluate the model on a list of metrics. :param input_df: The input time series data frame, Example: datetime value "extra feature 1" "extra feature 2" 2019-01-01 1.9 1 2 2019-01-02 2.3 0 2 :param metric: A list of Strings Available string values are "mean_squared_error", "r_square". :return: a list of metric evaluation results. """ if not Evaluator.check_metric(metric): raise ValueError("metric" + metric + "is not supported") return self.pipeline.evaluate(input_df, metric)
[docs] def predict(self, input_df): """ Predict future sequence from past sequence. :param input_df: The input time series data frame, Example: datetime value "extra feature 1" "extra feature 2" 2019-01-01 1.9 1 2 2019-01-02 2.3 0 2 :return: a data frame with 2 columns, the 1st is the datetime, which is the last datetime of the past sequence. values are the predicted future sequence values. Example : datetime value_0 value_1 ... value_2 2019-01-03 2 3 9 """ return self.pipeline.predict(input_df)
def _check_input_format(self, input_df): if isinstance(input_df, list) and all( [isinstance(d, pd.DataFrame) for d in input_df]): input_is_list = True return input_is_list elif isinstance(input_df, pd.DataFrame): input_is_list = False return input_is_list else: raise ValueError( "input_df should be a data frame or a list of data frames") def _check_missing_col(self, input_df): cols_list = [self.dt_col, self.target_col] if self.extra_features_col is not None: if not isinstance(self.extra_features_col, (list,)): raise ValueError( "extra_features_col needs to be either None or a list") cols_list.extend(self.extra_features_col) missing_cols = set(cols_list) - set(input_df.columns) if len(missing_cols) != 0: raise ValueError("Missing Columns in the input data frame:" + ','.join(list(missing_cols))) def _check_input(self, input_df, validation_df, metric): input_is_list = self._check_input_format(input_df) if not input_is_list: self._check_missing_col(input_df) if validation_df is not None: self._check_missing_col(validation_df) else: for d in input_df: self._check_missing_col(d) if validation_df is not None: for val_d in validation_df: self._check_missing_col(val_d) allowed_fit_metrics = ["mse", "mae", "r2"] if metric not in allowed_fit_metrics: raise ValueError("metric " + metric + " is not supported") @staticmethod def _get_metric_mode(metric): max_mode_metrics = ["r2"] min_mode_metrics = ["mse", "mae", "rmse", "logloss", "error"] if metric in min_mode_metrics: return "min" elif metric in max_mode_metrics: return "max" else: return ValueError, "metric " + metric + " is not supported" def _hp_search(self, input_df, validation_df, metric, recipe, mc, resources_per_trial, remote_dir): ft = TimeSequenceFeatureTransformer(self.future_seq_len, self.dt_col, self.target_col, self.extra_features_col, self.drop_missing) if isinstance(input_df, list): feature_list = ft.get_feature_list(input_df[0]) else: feature_list = ft.get_feature_list(input_df) def model_create_func(): # model = VanillaLSTM(check_optional_config=False) _model = TimeSequenceModel( check_optional_config=False, future_seq_len=self.future_seq_len) return _model model = model_create_func() # prepare parameters for search engine search_space = recipe.search_space(feature_list) metric_mode = TimeSequencePredictor._get_metric_mode(metric) searcher = RayTuneSearchEngine(logs_dir=self.logs_dir, resources_per_trial=resources_per_trial, name=self.name, remote_dir=remote_dir, ) searcher.compile(input_df, model_create_func=model_create_func(), search_space=search_space, recipe=recipe, feature_transformers=ft, future_seq_len=self.future_seq_len, validation_df=validation_df, metric=metric, metric_mode=metric_mode, mc=mc, ) # searcher.test_run() analysis = searcher.run() pipeline = self._make_pipeline(analysis, metric_mode, feature_transformers=ft, model=model, remote_dir=remote_dir) return pipeline def _print_config(self, best_config): print("The best configurations are:") for name, value in best_config.items(): print(name, ":", value) def _make_pipeline(self, analysis, metric_mode, feature_transformers, model, remote_dir): metric = "reward_metric" best_config = analysis.get_best_config(metric=metric, mode=metric_mode) best_logdir = analysis.get_best_logdir(metric=metric, mode=metric_mode) print("best log dir is ", best_logdir) dataframe = analysis.dataframe(metric=metric, mode=metric_mode) # print(dataframe) model_path = os.path.join(best_logdir, dataframe["checkpoint"].iloc[0]) config = convert_bayes_configs(best_config).copy() self._print_config(config) if remote_dir is not None: all_config = restore_hdfs(model_path, remote_dir, feature_transformers, model, # config) ) else: all_config = restore_zip(model_path, feature_transformers, model, # config) ) return TimeSequencePipeline(name=self.name, feature_transformers=feature_transformers, model=model, config=all_config)