#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import ray
from ray import tune
from copy import deepcopy
from zoo.automl.search.abstract import *
from zoo.automl.common.util import *
from zoo.automl.impute.impute import *
from ray.tune import Trainable
import ray.tune.track
from ray.tune.suggest.bayesopt import BayesOptSearch
[docs]class RayTuneSearchEngine(SearchEngine):
"""
Tune driver
"""
def __init__(self,
logs_dir="",
resources_per_trial=None,
name="",
remote_dir=None,
):
"""
Constructor
:param resources_per_trial: resources for each trial
"""
self.pipeline = None
self.train_func = None
self.trainable_class = None
self.resources_per_trail = resources_per_trial
self.trials = None
self.remote_dir = remote_dir
self.name = name
[docs] def compile(self,
input_df,
model_create_func,
search_space,
recipe,
feature_transformers=None,
# model=None,
future_seq_len=1,
validation_df=None,
mc=False,
metric="mse",
metric_mode="min"):
"""
Do necessary preparations for the engine
:param input_df:
:param search_space:
:param num_samples:
:param stop:
:param search_algorithm:
:param search_algorithm_params:
:param fixed_params:
:param feature_transformers:
:param model:
:param validation_df:
:param metric:
:return:
"""
# prepare parameters for search engine
runtime_params = recipe.runtime_params()
num_samples = runtime_params['num_samples']
stop = dict(runtime_params)
search_algorithm_params = recipe.search_algorithm_params()
search_algorithm = recipe.search_algorithm()
fixed_params = recipe.fixed_params()
schedule_algorithm = recipe.scheduler_algorithm()
del stop['num_samples']
self.search_space = self._prepare_tune_config(search_space)
self.stop_criteria = stop
self.num_samples = num_samples
if schedule_algorithm == 'AsyncHyperBand':
from ray.tune.schedulers import AsyncHyperBandScheduler
self.sched = AsyncHyperBandScheduler(
time_attr="training_iteration",
metric="reward_metric",
mode="max",
max_t=50,
grace_period=1,
reduction_factor=3,
brackets=3,
)
else:
from ray.tune.schedulers import FIFOScheduler
self.sched = FIFOScheduler()
if search_algorithm == 'BayesOpt':
self.search_algorithm = BayesOptSearch(
self.search_space,
metric="reward_metric",
mode="max",
utility_kwargs=search_algorithm_params["utility_kwargs"]
)
elif search_algorithm == 'SkOpt':
from skopt import Optimizer
from ray.tune.suggest.skopt import SkOptSearch
opt_params = recipe.opt_params()
optimizer = Optimizer(opt_params)
self.search_algorithm = SkOptSearch(
optimizer,
list(self.search_space.keys()),
metric="reward_metric",
mode="max",
)
else:
self.search_algorithm = None
self.fixed_params = fixed_params
self.train_func = self._prepare_train_func(input_df=input_df,
model_create_func=model_create_func,
feature_transformers=feature_transformers,
future_seq_len=future_seq_len,
validation_df=validation_df,
metric=metric,
metric_mode=metric_mode,
mc=mc,
remote_dir=self.remote_dir
)
# self.trainable_class = self._prepare_trainable_class(input_df,
# feature_transformers,
# # model,
# future_seq_len,
# validation_df,
# metric_op,
# self.remote_dir)
[docs] def run(self):
"""
Run trials
:return: trials result
"""
# function based
if not self.search_algorithm:
analysis = tune.run(
self.train_func,
name=self.name,
stop=self.stop_criteria,
config=self.search_space,
num_samples=self.num_samples,
scheduler=self.sched,
resources_per_trial=self.resources_per_trail,
verbose=1,
reuse_actors=True
)
else:
analysis = tune.run(
self.train_func,
name=self.name,
config=self.fixed_params,
stop=self.stop_criteria,
search_alg=self.search_algorithm,
scheduler=self.sched,
num_samples=self.num_samples,
resources_per_trial=self.resources_per_trail,
verbose=1,
reuse_actors=True
)
self.trials = analysis.trials
return analysis
[docs] def get_best_trials(self, k=1):
sorted_trials = RayTuneSearchEngine._get_sorted_trials(self.trials, metric="reward_metric")
best_trials = sorted_trials[:k]
return [self._make_trial_output(t) for t in best_trials]
def _make_trial_output(self, trial):
return TrialOutput(config=trial.config,
model_path=os.path.join(trial.logdir, trial.last_result["checkpoint"]))
@staticmethod
def _get_best_trial(trial_list, metric):
"""Retrieve the best trial."""
return max(trial_list, key=lambda trial: trial.last_result.get(metric, 0))
@staticmethod
def _get_sorted_trials(trial_list, metric):
return sorted(
trial_list,
key=lambda trial: trial.last_result.get(metric, 0),
reverse=True)
@staticmethod
def _get_best_result(trial_list, metric):
"""Retrieve the last result from the best trial."""
return {metric: RayTuneSearchEngine._get_best_trial(trial_list, metric).last_result[metric]}
[docs] def test_run(self):
def mock_reporter(**kwargs):
assert "reward_metric" in kwargs, "Did not report proper metric"
assert "checkpoint" in kwargs, "Accidentally removed `checkpoint`?"
raise GoodError("This works.")
try:
self.train_func({'out_units': 1,
'selected_features': ["MONTH(datetime)", "WEEKDAY(datetime)"]},
mock_reporter)
# self.train_func(self.search_space, mock_reporter)
except TypeError as e:
print("Forgot to modify function signature?")
raise e
except GoodError:
print("Works!")
return 1
raise Exception("Didn't call reporter...")
@staticmethod
def _is_validation_df_valid(validation_df):
df_not_empty = isinstance(validation_df, pd.DataFrame) and not validation_df.empty
df_list_not_empty = isinstance(validation_df, list) and validation_df \
and not all([d.empty for d in validation_df])
return validation_df is not None and not (df_not_empty or df_list_not_empty)
@staticmethod
def _prepare_train_func(input_df,
model_create_func,
feature_transformers,
future_seq_len,
metric,
metric_mode,
validation_df=None,
mc=False,
remote_dir=None,
):
"""
Prepare the train function for ray tune
:param input_df: input dataframe
:param feature_transformers: feature transformers
:param model: model or model selector
:param validation_df: validation dataframe
:param metric: the rewarding metric
:param metric_mode: the mode of rewarding metric. "min" or "max"
:return: the train function
"""
input_df_id = ray.put(input_df)
ft_id = ray.put(feature_transformers)
# model_id = ray.put(model)
df_not_empty = isinstance(validation_df, pd.DataFrame) and not validation_df.empty
df_list_not_empty = isinstance(validation_df, list) and validation_df \
and not all([d.empty for d in validation_df])
if validation_df is not None and (df_not_empty or df_list_not_empty):
validation_df_id = ray.put(validation_df)
is_val_df_valid = True
else:
is_val_df_valid = False
def train_func(config):
# make a copy from global variables for trial to make changes
global_ft = ray.get(ft_id)
# global_model = ray.get(model_id)
trial_ft = deepcopy(global_ft)
# trial_model = deepcopy(global_model)
# print("config is ", config)
# if 'model' in config.keys() and config['model'] == 'XGBRegressor':
# trial_model = XGBoostRegressor()
# else:
# trial_model = TimeSequenceModel(check_optional_config=False,
#
trial_model = model_create_func
imputer = None
if "imputation" in config:
if config["imputation"] == "LastFillImpute":
imputer = LastFillImpute()
elif config["imputation"] == "FillZeroImpute":
imputer = FillZeroImpute()
# handling input
global_input_df = ray.get(input_df_id)
trial_input_df = deepcopy(global_input_df)
if imputer:
trial_input_df = imputer.impute(trial_input_df)
config = convert_bayes_configs(config).copy()
# print("config is ", config)
(x_train, y_train) = trial_ft.fit_transform(trial_input_df, **config)
# trial_ft.fit(trial_input_df, **config)
# handling validation data
validation_data = None
if is_val_df_valid:
global_validation_df = ray.get(validation_df_id)
trial_validation_df = deepcopy(global_validation_df)
validation_data = trial_ft.transform(trial_validation_df)
# no need to call build since it is called the first time fit_eval is called.
# callbacks = [TuneCallback(tune_reporter)]
# fit model
best_reward_m = -999
metric_op = 1 if metric_mode is "max" else -1
# print("config:", config)
for i in range(1, 101):
result = trial_model.fit_eval(x_train,
y_train,
validation_data=validation_data,
mc=mc,
metric=metric,
# verbose=1,
**config)
reward_m = metric_op * result
ckpt_name = "best.ckpt"
if reward_m > best_reward_m:
best_reward_m = reward_m
save_zip(ckpt_name, trial_ft, trial_model, config)
if remote_dir is not None:
upload_ppl_hdfs(remote_dir, ckpt_name)
tune.track.log(training_iteration=i,
reward_metric=reward_m,
checkpoint="best.ckpt")
return train_func
@staticmethod
def _prepare_trainable_class(input_df,
feature_transformers,
future_seq_len,
metric,
metric_mode,
validation_df=None,
mc=False,
remote_dir=None
):
"""
Prepare the train function for ray tune
:param input_df: input dataframe
:param feature_transformers: feature transformers
:param model: model or model selector
:param validation_df: validation dataframe
:param metric: the rewarding metric
:param metric_mode: the mode of rewarding metric. "min" or "max"
:return: the train function
"""
input_df_id = ray.put(input_df)
ft_id = ray.put(feature_transformers)
# model_id = ray.put(model)
df_not_empty = isinstance(validation_df, pd.DataFrame) and not validation_df.empty
df_list_not_empty = isinstance(validation_df, list) and validation_df \
and not all([d.empty for d in validation_df])
if validation_df is not None and (df_not_empty or df_list_not_empty):
validation_df_id = ray.put(validation_df)
is_val_df_valid = True
else:
is_val_df_valid = False
class TrainableClass(Trainable):
def _setup(self, config):
# print("config in set up is", config)
global_ft = ray.get(ft_id)
# global_model = ray.get(model_id)
self.trial_ft = deepcopy(global_ft)
self.trial_model = TimeSequenceModel(check_optional_config=False,
future_seq_len=future_seq_len)
# handling input
global_input_df = ray.get(input_df_id)
trial_input_df = deepcopy(global_input_df)
self.config = convert_bayes_configs(config).copy()
(self.x_train, self.y_train) = self.trial_ft.fit_transform(trial_input_df,
**self.config)
# trial_ft.fit(trial_input_df, **config)
# handling validation data
self.validation_data = None
if is_val_df_valid:
global_validation_df = ray.get(validation_df_id)
trial_validation_df = deepcopy(global_validation_df)
self.validation_data = self.trial_ft.transform(trial_validation_df)
# no need to call build since it is called the first time fit_eval is called.
# callbacks = [TuneCallback(tune_reporter)]
# fit model
self.best_reward_m = -999
self.reward_m = -999
self.ckpt_name = "pipeline.ckpt"
self.metric_op = 1 if metric_mode is "max" else -1
def _train(self):
# print("self.config in train is ", self.config)
result = self.trial_model.fit_eval(self.x_train, self.y_train,
validation_data=self.validation_data,
# verbose=1,
**self.config)
self.reward_m = self.metric_op * result
# if metric == "mean_squared_error":
# self.reward_m = (-1) * result
# # print("running iteration: ",i)
# elif metric == "r_square":
# self.reward_m = result
# else:
# raise ValueError("metric can only be \"mean_squared_error\" or \"r_square\"")
return {"reward_metric": self.reward_m, "checkpoint": self.ckpt_name}
def _save(self, checkpoint_dir):
# print("checkpoint dir is ", checkpoint_dir)
ckpt_name = self.ckpt_name
# save in the working dir (without "checkpoint_{}".format(training_iteration))
path = os.path.join(checkpoint_dir, "..", ckpt_name)
# path = os.path.join(checkpoint_dir, ckpt_name)
# print("checkpoint save path is ", checkpoint_dir)
if self.reward_m > self.best_reward_m:
self.best_reward_m = self.reward_m
print("****this reward is", self.reward_m)
print("*********saving checkpoint")
save_zip(ckpt_name, self.trial_ft, self.trial_model, self.config)
if remote_dir is not None:
upload_ppl_hdfs(remote_dir, ckpt_name)
return path
def _restore(self, checkpoint_path):
# print("checkpoint path in restore is ", checkpoint_path)
if remote_dir is not None:
restore_hdfs(checkpoint_path, remote_dir, self.trial_ft, self.trial_model)
else:
restore_zip(checkpoint_path, self.trial_ft, self.trial_model)
return TrainableClass
def _prepare_tune_config(self, space):
tune_config = {}
for k, v in space.items():
if isinstance(v, RandomSample):
tune_config[k] = tune.sample_from(v.func)
elif isinstance(v, GridSearch):
tune_config[k] = tune.grid_search(v.values)
else:
tune_config[k] = v
return tune_config