#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy as np
import sys
import functools
import logging
from pyspark.ml.linalg import DenseVector, SparseVector, VectorUDT
from bigdl.transform.vision.image import FeatureTransformer
from bigdl.util.common import get_node_and_core_number
from zoo.common.utils import callZooFunc
from zoo.common import Sample, JTensor
from zoo.common.nncontext import getOrCreateSparkContext
from zoo.feature.common import FeatureSet, SampleToMiniBatch, Preprocessing
from zoo.feature.image import ImagePreprocessing, ImageFeatureToSample
from zoo.util import nest
if sys.version >= '3':
long = int
unicode = str
def _to_tensor_structure(tensors):
if isinstance(tensors, tuple):
tensor_structure = TensorMeta(dtype=tensors[0], shape=tensors[1], name="input0")
elif isinstance(tensors, list):
tensor_structure = [TensorMeta(dtype=value[0], shape=value[1],
name="list_input_" + str(idx))
for (idx, value) in enumerate(tensors)]
elif isinstance(tensors, dict):
tensor_structure = {}
for key, value in tensors.items():
tensor_structure[key] = TensorMeta(dtype=value[0], shape=value[1], name=key)
else:
raise ValueError("In TFDataset.from_rdd, features and labels should be a tuple, "
"a list of tuples or a dict of tuples")
return tensor_structure
def _tensors_to_rdd(tensors, sc, splits):
import tensorflow as tf
if isinstance(tensors, np.ndarray):
tensors = (tensors,)
if isinstance(tensors, list):
for i in range(len(tensors)):
if tensors[i].dtype == np.dtype("float64"):
tensors[i] = np.float32(tensors[i])
data_list = _splits(tensors)
rdd = sc.parallelize(data_list, splits)
tensor_structure = [TensorMeta(tf.as_dtype(t.dtype),
shape=t.shape[1:],
name="input_%s" % i)
for i, t in enumerate(tensors)]
else:
flattened = nest.flatten(tensors)
for i in range(len(flattened)):
if flattened[i].dtype == np.dtype("float64"):
flattened[i] = np.float32(flattened[i])
data_list = _splits(flattened)
rdd = sc.parallelize(data_list, splits)
rdd = rdd.map(lambda x: nest.pack_sequence_as(tensors, x))
tensor_structure = nest.pack_sequence_as(tensors,
[TensorMeta(tf.as_dtype(t.dtype),
shape=t.shape[1:],
name="input_%s" % i)
for i, t in enumerate(flattened)])
return rdd, tensor_structure
def _splits(tensors):
data_list = []
data_size = tensors[0].shape[0]
for i in range(data_size):
sample = []
for j in range(len(tensors)):
sample.append(tensors[j][i])
data_list.append(sample)
return data_list
[docs]class MergeFeatureLabelImagePreprocessing(ImagePreprocessing):
def __init__(self, bigdl_type="float"):
super(MergeFeatureLabelImagePreprocessing, self).__init__(bigdl_type)
[docs]class TFDataset(object):
def __init__(self, tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size=False):
"""
TFDataset represents a distributed collection of elements (backed by a RDD)
to be feed into Tensorflow graph.
:param tensor_structure: a nested structure of TensorMeta objects specifying the
name, shape and data type of each element in this TFDataset
:param batch_size: the batch size, used for training, should be a multiple of
total core num
:param batch_per_thread: the batch size for each thread, used for inference or evaluation
:param hard_code_batch_size: whether to hard code the batch_size into tensorflow graph,
if True, the static size of the first dimension of the resulting tensors is
batch_size/total_core_num (training) or batch_per_thread for inference; if False,
it is None.
"""
if batch_size > 0 and batch_per_thread > 0:
raise ValueError("bath_size and batch_per_thread should not be set simultaneously")
self.has_batch = True
node_num, core_num = get_node_and_core_number()
self.total_core_num = node_num * core_num
self.node_num = node_num
self.core_num = core_num
if batch_size > 0:
if batch_size % self.total_core_num != 0:
raise ValueError("batch_size should be a multiple " +
"of total core number, but got batch_size: " +
"%s where total core number is %s" % (batch_size,
self.total_core_num))
if batch_size <= 0 and batch_per_thread <= 0:
batch_per_thread = 1
batch_size = self.total_core_num
self.has_batch = False
self.batch_size = batch_size
self.batch_per_thread = batch_per_thread
self.hard_code_batch_size = hard_code_batch_size
self.tensor_structure = tensor_structure
if not self.hard_code_batch_size:
self.output_shapes = nest.pack_sequence_as(
self.tensor_structure, [[None] + list(t.shape)
if t is not None else None
for t in nest.flatten(self.tensor_structure)])
else:
if self.batch_per_thread > 0:
self.output_shapes = nest.pack_sequence_as(
self.tensor_structure, [[self.batch_per_thread] + t.shape
if t is not None else None
for t in nest.flatten(self.tensor_structure)])
else:
self.output_shapes = nest.pack_sequence_as(
self.tensor_structure, [[self.batch_size // self.total_core_num] + list(t.shape)
if t is not None else None
for t in nest.flatten(self.tensor_structure)])
self.input_names = nest.pack_sequence_as(
self.tensor_structure, [t.name
if t is not None else None
for t in nest.flatten(self.tensor_structure)])
self._tensors = None
def _create_placeholders(self):
import tensorflow as tf
if not self.hard_code_batch_size:
tensors = nest.pack_sequence_as(
self.tensor_structure, [tf.placeholder(name=t.name,
dtype=t.dtype,
shape=[None] + list(t.shape))
for t in nest.flatten(self.tensor_structure)])
else:
if self.batch_per_thread > 0:
tensors = nest.pack_sequence_as(
self.tensor_structure,
[tf.placeholder(name=t.name,
dtype=t.dtype,
shape=[self.batch_per_thread] + list(t.shape))
for t in nest.flatten(self.tensor_structure)])
else:
tensors = nest.pack_sequence_as(
self.tensor_structure,
[tf.placeholder(name=t.name,
dtype=t.dtype,
shape=[self.batch_size // self.total_core_num] + list(t.shape))
for t in nest.flatten(self.tensor_structure)])
for tensor in nest.flatten(tensors):
tf.get_default_graph().clear_collection(tensor.name)
tf.add_to_collection(tensor.name, self)
self._original_tensors = tensors
self._tensors = tensors
if not self.has_batch:
self._tensors = nest.pack_sequence_as(self.tensor_structure,
[t[0] for t in nest.flatten(tensors)])
return tensors
@property
def tensors(self):
"""
a nested structure of TensorFlow tensor object in TensorFlow graph.
The elements of this dataset will be fed into these tensors on each iteration.
:return: the nested structure of TensorFlow tensor object
"""
if self._tensors is None:
self._create_placeholders()
return self._tensors
@property
def feature_tensors(self):
if self._tensors is None:
self._create_placeholders()
if not isinstance(self._tensors, tuple):
raise ValueError("To use feature_tensors, " +
"the element in TFDataset must be a tuple of two components. " +
"Please use TFDataset.from_rdd(rdd, features=..., labels=...). ")
return self._tensors[0]
@property
def label_tensors(self):
if self._tensors is None:
self._create_placeholders()
if not isinstance(self._tensors, tuple):
raise ValueError("To use label_tensors, " +
"the element in TFDataset must be a tuple of two components. " +
"Please use TFDataset.from_rdd(rdd, features=..., labels=...). ")
return self._tensors[1]
@staticmethod
def _to_tensor_structure(features, labels):
feature_structure = _to_tensor_structure(features)
if labels is not None:
label_structure = _to_tensor_structure(labels)
tensor_structure = (feature_structure, label_structure)
else:
tensor_structure = (feature_structure,)
return tensor_structure
[docs] def get_prediction_data(self):
"""
:return: an object that can be used for TFNet.predict
e.g. an RDD of Sample or a ImageSet
"""
assert self.batch_per_thread > 0, "batch_per_thread must be set when used in prediction"
return self._get_prediction_data()
[docs] def get_evaluation_data(self):
"""
:return: an object that can be used for TFNet.evaluate,
e.g. an RDD of Sample or a ImageSet
"""
assert self.batch_per_thread > 0, "batch_per_thread must be set when used in evaluation"
return self._get_evaluation_data()
[docs] def get_training_data(self):
"""
:return: an object that can be used to create a BigDL optimizer,
e.g. an RDD of Sample or a DataSet
"""
assert self.batch_size > 0, "batch_size must be set when used in training"
return self._get_training_data()
[docs] def get_validation_data(self):
"""
:return: an object that can be used to set validation in a BigDL optimizer,
e.g. an RDD of Sample or a DataSet
"""
assert self.batch_size > 0, "batch_size must be set when used in training"
return self._get_validation_data()
def _get_prediction_data(self):
raise NotImplementedError
def _get_evaluation_data(self):
raise NotImplementedError
def _get_training_data(self):
raise NotImplementedError
def _get_validation_data(self):
raise NotImplementedError
[docs] def get_num_partitions(self):
"""
:return: the num of partitions of the underlying RDD
"""
raise NotImplementedError
[docs] @staticmethod
def from_rdd(*args, **kwargs):
"""
Create a TFDataset from a rdd.
For training and evaluation, both `features` and `labels` arguments should be specified.
The element of the rdd should be a tuple of two, (features, labels), each has the
same structure of numpy.ndarrays of the argument `features`, `labels`.
E.g. if `features` is [(tf.float32, [10]), (tf.float32, [20])],
and `labels` is {"label1":(tf.float32, [10]), "label2": (tf.float32, [20])}
then a valid element of the rdd could be
(
[np.zeros(dtype=float, shape=(10,), np.zeros(dtype=float, shape=(10,)))],
{"label1": np.zeros(dtype=float, shape=(10,)),
"label2":np.zeros(dtype=float, shape=(10,))))}
)
If `labels` is not specified,
then the above element should be changed to
[np.zeros(dtype=float, shape=(10,), np.zeros(dtype=float, shape=(10,)))]
For inference, `labels` can be not specified.
The element of the rdd should be some ndarrays of the same structure of the `features`
argument.
A note on the legacy api: if you are using `names`, `shapes`, `types` arguments,
each element of the rdd should be a list of numpy.ndarray.
:param rdd: a rdd containing the numpy.ndarrays to be used
for training/evaluation/inference
:param features: the structure of input features, should one the following:
- a tuple (dtype, shape), e.g. (tf.float32, [28, 28, 1])
- a list of such tuple [(dtype1, shape1), (dtype2, shape2)],
e.g. [(tf.float32, [10]), (tf.float32, [20])],
- a dict of such tuple, mapping string names to tuple {"name": (dtype, shape},
e.g. {"input1":(tf.float32, [10]), "input2": (tf.float32, [20])}
:param labels: the structure of input labels, format is the same as features
:param batch_size: the batch size, used for training, should be a multiple of
total core num
:param batch_per_thread: the batch size for each thread, used for inference or evaluation
:param hard_code_batch_size: whether to hard code the batch_size into tensorflow graph,
if True, the static size of the first dimension of the resulting tensors is
batch_size/total_core_num (training) or batch_per_thread for inference; if False,
it is None.
:param val_rdd: validation data with the same structure of rdd
:param sequential_order: whether to iterate the elements in the Dataset
in sequential order when training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:return: a TFDataset
"""
return TFNdarrayDataset.from_rdd(*args, **kwargs)
[docs] @staticmethod
def from_ndarrays(*args, **kwargs):
"""
Create a TFDataset from a nested structure of numpy ndarrays. Each element
in the resulting TFDataset has the same structure of the argument tensors and
is created by indexing on the first dimension of each ndarray in the tensors
argument.
This method is equivalent to sc.parallize the tensors and call TFDataset.from_rdd
:param tensors: the numpy ndarrays
:param batch_size: the batch size, used for training, should be a multiple of
total core num
:param batch_per_thread: the batch size for each thread, used for inference or evaluation
:param hard_code_batch_size: whether to hard code the batch_size into tensorflow graph,
if True, the static size of the first dimension of the resulting tensors is
batch_size/total_core_num (training) or batch_per_thread for inference; if False,
it is None.
:param val_tensors: the numpy ndarrays used for validation during training
:param sequential_order: whether to iterate the elements in the Dataset
in sequential order when training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:return: a TFDataset
"""
return TFNdarrayDataset.from_ndarrays(*args, **kwargs)
[docs] @staticmethod
def from_image_set(image_set, image, label=None,
batch_size=-1, batch_per_thread=-1,
hard_code_batch_size=False,
validation_image_set=None,
sequential_order=False,
shuffle=True):
"""
Create a TFDataset from a ImagetSet. Each ImageFeature in the ImageSet should
already has the "sample" field, i.e. the result of ImageSetToSample transformer
:param image_set: the ImageSet used to create this TFDataset
:param image: a tuple of two, the first element is the type of image, the second element
is the shape of this element, i.e. (tf.float32, [224, 224, 3]))
:param label: a tuple of two, the first element is the type of label, the second element
is the shape of this element, i.e. (tf.int32, [1]))
:param batch_size: the batch size, used for training, should be a multiple of
total core num
:param batch_per_thread: the batch size for each thread, used for inference or evaluation
:param hard_code_batch_size: whether to hard code the batch_size into tensorflow graph,
if True, the static size of the first dimension of the resulting tensors is
batch_size/total_core_num (training) or batch_per_thread for inference; if False,
it is None.
:param validation_image_set: the ImageSet used for validation during training
:param sequential_order: whether to iterate the elements in the Dataset
in sequential order when training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:return: a TFDataset
"""
tensor_structure = TFDataset._to_tensor_structure(image, label)
return TFImageDataset(image_set, tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size,
validation_image_set,
sequential_order=sequential_order, shuffle=shuffle)
[docs] @staticmethod
def from_text_set(text_set, text, label=None,
batch_size=-1, batch_per_thread=-1,
hard_code_batch_size=False, validation_image_set=None,
sequential_order=False, shuffle=True):
"""
Create a TFDataset from a TextSet. The TextSet must be transformed to Sample, i.e.
the result of TextFeatureToSample transformer.
:param text_set: the TextSet used to create this TFDataset
:param text: a tuple of two, the first element is the type of this input feature,
the second element is the shape of this element, i.e. (tf.float32, [10, 100, 4])).
text can also be nested structure of this tuple of two.
:param label: a tuple of two, the first element is the type of label, the second element
is the shape of this element, i.e. (tf.int32, [1])). label can also be nested structure of
this tuple of two.
:param batch_size: the batch size, used for training, should be a multiple of
total core num
:param batch_per_thread: the batch size for each thread, used for inference or evaluation
:param hard_code_batch_size: whether to hard code the batch_size into tensorflow graph,
if True, the static size of the first dimension of the resulting tensors is
batch_size/total_core_num (training) or batch_per_thread for inference; if False,
it is None.
:param validation_image_set: The TextSet used for validation during training
:param sequential_order: whether to iterate the elements in the Dataset
in sequential order when training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:return: a TFDataset
"""
tensor_structure = TFDataset._to_tensor_structure(text, label)
return TFTextDataset(text_set, tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size,
validation_image_set,
sequential_order=sequential_order, shuffle=shuffle)
[docs] @staticmethod
def from_tfrecord_file(sc, file_path, batch_size=-1, batch_per_thread=-1,
hard_code_batch_size=False, validation_file_path=None,
sequential_order=False, shuffle=True):
"""
Create a TFDataset from tfrecord files.
:param sc: The SparkContext
:param file_path: comma seperated tfrecord file(s) path
:param batch_size: the batch size, used for training, should be a multiple of
total core num
:param batch_per_thread: the batch size for each thread, used for inference or evaluation
:param hard_code_batch_size: whether to hard code the batch_size into tensorflow graph,
if True, the static size of the first dimension of the resulting tensors is
batch_size/total_core_num (training) or batch_per_thread for inference; if False,
it is None.
:param validation_file_path: The tfrecord files used for validation
:param sequential_order: whether to iterate the elements in the Dataset
in sequential order when training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:return: a TFDataset
"""
input_format_class = "org.tensorflow.hadoop.io.TFRecordFileInputFormat"
key_class = "org.apache.hadoop.io.BytesWritable"
value_class = "org.apache.hadoop.io.NullWritable"
bytes_rdd = sc.newAPIHadoopFile(file_path, input_format_class,
keyClass=key_class,
valueClass=value_class)
bytes_rdd = bytes_rdd.map(lambda record: bytearray(record[0]))
validation_bytes_rdd = None
if validation_file_path is not None:
validation_bytes_rdd = sc.newAPIHadoopFile(validation_file_path,
input_format_class,
keyClass=key_class,
valueClass=value_class)
validation_bytes_rdd = validation_bytes_rdd.map(lambda record: bytearray(record[0]))
return TFBytesDataset(bytes_rdd, batch_size, batch_per_thread,
hard_code_batch_size, validation_bytes_rdd,
sequential_order=sequential_order, shuffle=shuffle)
[docs] @staticmethod
def from_feature_set(dataset, features, labels=None, batch_size=-1, batch_per_thread=-1,
hard_code_batch_size=False, validation_dataset=None):
"""
Create a TFDataset from a FeatureSet. Currently, the element in this Feature set must be a
Sample, i.e. the result of ImageFeatureToSample transformer
:param dataset: the feature set used to create this TFDataset
:param features: a tuple of two, the first element is the type of this input feature,
the second element is the shape of this element, i.e. (tf.float32, [224, 224, 3])).
text can also be nested structure of this tuple of two.
:param labels: a tuple of two, the first element is the type of label, the second element
is the shape of this element, i.e. (tf.int32, [1])). label can also be nested structure of
this tuple of two.
:param batch_size: the batch size, used for training, should be a multiple of
total core num
:param batch_per_thread: the batch size for each thread, used for inference or evaluation
:param hard_code_batch_size: whether to hard code the batch_size into tensorflow graph,
if True, the static size of the first dimension of the resulting tensors is
batch_size/total_core_num (training) or batch_per_thread for inference; if False,
it is None.
:param validation_dataset: The FeatureSet used for validation during training
:return: a TFDataset
"""
tensor_structure = TFDataset._to_tensor_structure(features, labels)
return TFFeatureDataset(dataset, tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size,
validation_dataset)
[docs] @staticmethod
def from_string_rdd(string_rdd, batch_size=-1, batch_per_thread=-1,
hard_code_batch_size=False, validation_string_rdd=None):
"""
Create a TFDataset from a RDD of strings. Each element is the RDD should be a single string.
The returning TFDataset's feature_tensors has only one Tensor. the type of the Tensor
is tf.string, and the shape is (None,). The returning don't have label_tensors. If the
dataset is used for training, the label should be encoded in the string.
:param string_rdd: the RDD of strings
:param batch_size: the batch size, used for training, should be a multiple of
total core num
:param batch_per_thread: the batch size for each thread, used for inference or evaluation
:param hard_code_batch_size: whether to hard code the batch_size into tensorflow graph,
if True, the static size of the first dimension of the resulting tensors is
batch_size/total_core_num (training) or batch_per_thread for inference; if False,
it is None.
:param validation_string_rdd: the RDD of strings to be used in validation
:return: a TFDataset
"""
string_rdd = string_rdd.map(lambda x: bytearray(x, "utf-8"))
if validation_string_rdd is not None:
validation_string_rdd = validation_string_rdd.map(lambda x: bytearray(x, "utf-8"))
return TFBytesDataset(string_rdd, batch_size, batch_per_thread,
hard_code_batch_size, validation_string_rdd)
[docs] @staticmethod
def from_bytes_rdd(bytes_rdd, batch_size=-1, batch_per_thread=-1,
hard_code_batch_size=False, validation_bytes_rdd=None):
"""
Create a TFDataset from a RDD of bytes. Each element is the RDD should be a bytes object.
The returning TFDataset's feature_tensors has only one Tensor. the type of the Tensor
is tf.string, and the shape is (None,). The returning don't have label_tensors. If the
dataset is used for training, the label should be encoded in the bytes.
:param bytes_rdd: the RDD of bytes
:param batch_size: the batch size, used for training, should be a multiple of
total core num
:param batch_per_thread: the batch size for each thread, used for inference or evaluation
:param hard_code_batch_size: whether to hard code the batch_size into tensorflow graph,
if True, the static size of the first dimension of the resulting tensors is
batch_size/total_core_num (training) or batch_per_thread for inference; if False,
it is None.
:param validation_bytes_rdd: the RDD of bytes to be used in validation
:return: a TFDataset
"""
return TFBytesDataset(bytes_rdd, batch_size, batch_per_thread,
hard_code_batch_size, validation_bytes_rdd)
[docs] @staticmethod
def from_tf_data_dataset(dataset, batch_size=-1,
batch_per_thread=-1, hard_code_batch_size=False,
validation_dataset=None,
sequential_order=False,
shuffle=True,
remove_checking=False, batch_outside=False,
inter_threads=None, intra_threads=None):
"""
Create a TFDataset from a tf.data.Dataset.
The recommended way to create the dataset is to reading files in a shared file
system (e.g. HDFS) that is accessible from every executor of this Spark Application.
If the dataset is created by reading files in the local file system, then the
files must exist in every executor in the exact same path. The path should be
absolute path and relative path is not supported.
A few kinds of dataset is not supported for now:
1. dataset created from tf.data.Dataset.from_generators
2. dataset with Dataset.batch operation.
3. dataset with Dataset.repeat operation
4. dataset contains tf.py_func, tf.py_function or tf.numpy_function
:param dataset: the tf.data.Dataset
:param batch_size: the batch size, used for training, should be a multiple of
total core num
:param batch_per_thread: the batch size for each thread, used for inference or evaluation
:param hard_code_batch_size: whether to hard code the batch_size into tensorflow graph,
if True, the static size of the first dimension of the resulting tensors is
batch_size/total_core_num (training) or batch_per_thread for inference; if False,
it is None.
:param validation_dataset: the dataset used for validation
:return: a TFDataset
"""
return TFDataDataset(dataset, batch_size, batch_per_thread,
hard_code_batch_size, validation_dataset,
sequential_order, shuffle, remove_checking, batch_outside,
inter_threads, intra_threads)
[docs] @staticmethod
def from_dataframe(df, feature_cols, labels_cols=None, batch_size=-1,
batch_per_thread=-1, hard_code_batch_size=False,
validation_df=None,
sequential_order=False, shuffle=True):
"""
Create a TFDataset from a pyspark.sql.DataFrame.
:param df: the DataFrame for the dataset
:param feature_cols: a list of string, indicating which columns are used as features.
Currently supported types are FloatType, DoubleType, IntegerType,
LongType, ArrayType (value should be numbers), DenseVector
and SparseVector. For ArrayType, DenseVector and SparseVector,
the sizes are assume to the same.
:param labels_cols: a list of string, indicating which columns are used as labels.
Currently supported types are FloatType, DoubleType, IntegerType,
LongType, ArrayType (value should be numbers), DenseVector
and SparseVector. For ArrayType, DenseVector and SparseVector,
the sizes are assume to the same.
:param batch_size: the batch size, used for training, should be a multiple of
total core num
:param batch_per_thread: the batch size for each thread, used for inference or evaluation
:param hard_code_batch_size: whether to hard code the batch_size into tensorflow graph,
if True, the static size of the first dimension of the resulting tensors is
batch_size/total_core_num (training) or batch_per_thread for inference; if False,
it is None.
:param validation_df: the DataFrame used for validation
:return: a TFDataset
"""
return DataFrameDataset(df, feature_cols, labels_cols, batch_size,
batch_per_thread, hard_code_batch_size, validation_df,
sequential_order, shuffle)
[docs]class TFDataDataset(TFDataset):
[docs] def get_num_partitions(self):
# only called in inference case
return self.total_core_num
@staticmethod
def _assert_not_batched(dataset):
from tensorflow.python.data.ops import dataset_ops
if isinstance(dataset, dataset_ops.DatasetV1Adapter):
TFDataDataset._assert_not_batched(dataset._dataset)
elif isinstance(dataset, dataset_ops.BatchDataset):
raise ValueError("Dataset should not be batched,"
"please use a dataset without the batch operation")
else:
for dt in dataset._inputs():
TFDataDataset._assert_not_batched(dt)
[docs] @staticmethod
def check_rules(dataset, rules, is_training):
from tensorflow.python.data.ops import dataset_ops
if isinstance(dataset, dataset_ops.DatasetV1Adapter):
TFDataDataset.check_rules(dataset._dataset, rules, is_training)
else:
for rule, message in rules:
assert not rule(dataset, is_training), message
else:
for dt in dataset._inputs():
TFDataDataset.check_rules(dt, rules, is_training)
def __init__(self, tf_data_dataset, batch_size,
batch_per_thread, hard_code_batch_size=False,
validation_dataset=None,
sequential_order=False, shuffle=True,
remove_checking=False, batch_outside=False,
inter_threads=None, intra_threads=None):
from tensorflow.python.data.ops import dataset_ops
import tensorflow as tf
# rule 1: we assume that the dataset user passed is not batched
if not batch_outside:
rules = [(
lambda dataset, is_training: isinstance(dataset, dataset_ops.BatchDataset),
"Dataset should not be batched, please use a dataset without the batch operation")]
else:
rules = []
rules += [
(
lambda dataset, is_training: isinstance(dataset, dataset_ops.RepeatDataset),
"Dataset should not be repeated, please use a dataset without the repeat operation")
]
if not remove_checking:
TFDataDataset.check_rules(tf_data_dataset, rules, True)
if validation_dataset is not None:
TFDataDataset.check_rules(validation_dataset, rules, False)
py_func_ops = {"PyFunc", "PyFuncStateless", "EagerPyFunc"}
for node in tf.get_default_graph().as_graph_def().node:
op_type = node.op
if op_type in py_func_ops:
raise ValueError("tf.py_func, tf.py_function, tf.numpy_function and" +
" Dataset.from_generators are not supported in TFPark")
if shuffle:
from tensorflow.python.keras.engine import training_utils
training_utils.verify_dataset_shuffled(tf_data_dataset)
flatten_shapes = nest.flatten(tf.compat.v1.data.get_output_shapes(tf_data_dataset))
if batch_outside:
flatten_shapes = [shape[1:] for shape in flatten_shapes]
flatten_types = nest.flatten(tf.compat.v1.data.get_output_types(tf_data_dataset))
flatten_tensor_structure = [TensorMeta(dtype=flatten_types[i],
shape=list(flatten_shapes[i]),
name="zoo_input_{}".format(i))
for i in range(len(flatten_shapes))]
structure = tf.compat.v1.data.get_output_types(tf_data_dataset)
if isinstance(structure, tf.DType):
structure = (structure,)
tensor_structure = nest.pack_sequence_as(structure,
flatten_tensor_structure)
super(TFDataDataset, self).__init__(tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size)
self.intra_threads = intra_threads
self.inter_threads = inter_threads
if intra_threads is None:
self.intra_threads = self.core_num
if inter_threads is None:
self.inter_threads = 1
if self.batch_size > 0 and self.has_batch:
# training case
self._per_partition_batch_size = self.batch_size // self.node_num
self._shard_num = self.node_num
self.drop_remainder = True
else:
# inference case
self._per_partition_batch_size = self.batch_per_thread
self._shard_num = self.total_core_num
if hard_code_batch_size:
self.drop_remainder = True
logging.warning("hard_code_batch_size is set to true, so we"
" must drop remainder elements in the dataset"
" to avoid outputting small batches, the dropped"
" elements will not get processed. You can "
"pad your dataset so that the total number "
"of elements is divisible by the total batch size"
" to avoid this.")
else:
self.drop_remainder = False
if self.hard_code_batch_size:
self.drop_remainder = True
if not batch_outside:
tf_data_dataset = tf_data_dataset.batch(self._per_partition_batch_size,
drop_remainder=self.drop_remainder)
if validation_dataset is not None and not batch_outside:
drop_remainder = self.hard_code_batch_size
validation_dataset = validation_dataset.batch(self._per_partition_batch_size,
drop_remainder=drop_remainder)
shard_index = tf.placeholder(dtype=tf.int64, shape=())
from tensorflow.python.distribute.input_ops import auto_shard_dataset
tf_data_dataset = auto_shard_dataset(tf_data_dataset, self._shard_num, shard_index)
if validation_dataset is not None:
validation_dataset = auto_shard_dataset(validation_dataset, self._shard_num,
shard_index)
self.shard_index = shard_index
self.train_dataset = tf_data_dataset
self.train_iterator = tf.compat.v1.data.make_initializable_iterator(self.train_dataset)
self.train_next_ops = nest.flatten(self.train_iterator.get_next())
self.output_types = [t.as_datatype_enum
for t in nest.flatten(
tf.compat.v1.data.get_output_types(self.train_dataset))]
self.validation_dataset = validation_dataset
self.validation_iterator = None
self.validation_next_ops = None
self._train_init_op_name = self.train_iterator.initializer.name
self._train_output_names = [op.name for op in self.train_next_ops]
if validation_dataset is not None:
self.validation_iterator = tf.compat.v1.data.make_initializable_iterator(
self.validation_dataset)
self.validation_next_ops = nest.flatten(self.validation_iterator.get_next())
self._val_init_op_name = self.validation_iterator.initializer.name
self._val_output_names = [op.name for op in self.validation_next_ops]
self.table_init_name = tf.tables_initializer().name
self.sequential_order = sequential_order
self.shuffle = shuffle
self.graph = self.train_next_ops[0].graph
self.graph_def = bytearray(self.graph.as_graph_def().SerializeToString())
def _get_prediction_data(self):
jvalue = callZooFunc("float", "createMiniBatchRDDFromTFDataset",
self.graph_def, self._train_init_op_name, self.table_init_name,
self._train_output_names,
self.output_types, self.shard_index.name)
rdd = jvalue.value().toJavaRDD()
return rdd
def _get_evaluation_data(self):
jvalue = callZooFunc("float", "createMiniBatchRDDFromTFDatasetEval",
self.graph_def, self._train_init_op_name, self.table_init_name,
self._train_output_names,
self.output_types, self.shard_index.name)
rdd = jvalue.value().toJavaRDD()
return rdd
def _get_training_data(self):
jvalue = callZooFunc("float", "createTFDataFeatureSet",
self.graph_def, self._train_init_op_name, self.table_init_name,
self._train_output_names, self.output_types, self.shard_index.name,
self.inter_threads, self.intra_threads)
return FeatureSet(jvalue=jvalue)
def _get_validation_data(self):
if self.validation_dataset is not None:
jvalue = callZooFunc("float", "createTFDataFeatureSet",
self.graph_def, self._val_init_op_name, self.table_init_name,
self._val_output_names,
self.output_types, self.shard_index.name, self.inter_threads,
self.intra_threads)
return FeatureSet(jvalue=jvalue)
return None
[docs]class TFFeatureDataset(TFDataset):
def __init__(self, dataset, tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size=False, validation_dataset=None):
super(TFFeatureDataset, self).__init__(tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size)
self.dataset = dataset
self.validation_dataset = validation_dataset
def _get_prediction_data(self):
raise Exception("TFFeatureDataset is only supported in training")
def _get_evaluation_data(self):
raise Exception("TFFeatureDataset is only supported in training")
def _get_training_data(self):
fs = self.dataset.transform(MergeFeatureLabelFeatureTransformer())
fs = fs.transform(SampleToMiniBatch(self.batch_size))
return fs
def _get_validation_data(self):
if self.validation_dataset is not None:
fs = self.validation_dataset.transform(
MergeFeatureLabelFeatureTransformer())
fs = fs.transform(SampleToMiniBatch(self.batch_size))
return fs
return None
[docs] def get_num_partitions(self):
raise Exception("TFFeatureDataset is only supported in training")
[docs]class TFBytesDataset(TFDataset):
[docs] def get_num_partitions(self):
return self.train_rdd.getNumPartitions()
def __init__(self, string_rdd, batch_size,
batch_per_thread, hard_code_batch_size=False,
validation_string_rdd=None, sequential_order=False, shuffle=True):
import tensorflow as tf
tensor_structure = (TensorMeta(dtype=tf.string, shape=(), name="input"),)
super(TFBytesDataset, self).__init__(tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size)
self.train_rdd = string_rdd
self.validation_rdd = validation_string_rdd
self.sequential_order = sequential_order
self.shuffle = shuffle
def _get_prediction_data(self):
jvalue = callZooFunc("float", "createMiniBatchRDDFromStringRDD",
self.train_rdd,
self.batch_per_thread)
rdd = jvalue.value().toJavaRDD()
return rdd
def _get_evaluation_data(self):
jvalue = callZooFunc("float", "createMiniBatchRDDFromStringRDD",
self.train_rdd,
self.batch_per_thread)
rdd = jvalue.value().toJavaRDD()
return rdd
def _get_training_data(self):
jvalue = callZooFunc("float", "createMiniBatchFeatureSetFromStringRDD",
self.train_rdd,
self.batch_size, self.sequential_order, self.shuffle)
fs = FeatureSet(jvalue)
return fs
def _get_validation_data(self):
if self.validation_rdd is not None:
jvalue = callZooFunc("float", "createMiniBatchFeatureSetFromStringRDD",
self.validation_rdd,
self.batch_size, self.sequential_order, self.shuffle)
fs = FeatureSet(jvalue)
return fs
return None
[docs]class TFTextDataset(TFDataset):
def __init__(self, text_set, tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size=False,
validation_text_set=None, sequential_order=False, shuffle=True):
super(TFTextDataset, self).__init__(tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size)
self.text_set = text_set
self.validation_text_set = validation_text_set
self.sequential_order = sequential_order
self.shuffle = shuffle
def _get_prediction_data(self):
rdd = self.text_set.get_samples().map(
lambda sample: Sample.from_jtensor(features=sample.features,
labels=JTensor.from_ndarray(np.array([0.0]))))
rdd_wrapper = callZooFunc("float", "zooRDDSampleToMiniBatch", rdd, self.batch_per_thread)
return rdd_wrapper.value().toJavaRDD()
def _get_evaluation_data(self):
rdd = self.text_set.get_samples()
rdd_wrapper = callZooFunc("float", "zooRDDSampleToMiniBatch", rdd, self.batch_per_thread)
return rdd_wrapper.value().toJavaRDD()
def _get_training_data(self):
sample_rdd = self.text_set.get_samples().map(
lambda sample: Sample.from_jtensor(features=sample.features + sample.labels,
labels=JTensor.from_ndarray(np.array([0.0]))))
fs = FeatureSet.sample_rdd(sample_rdd,
sequential_order=self.sequential_order,
shuffle=self.shuffle)
fs = fs.transform(SampleToMiniBatch(self.batch_size))
return fs
def _get_validation_data(self):
if self.validation_text_set is not None:
sample_rdd = self.validation_text_set.get_samples().map(
lambda sample: Sample.from_jtensor(features=sample.features + sample.labels,
labels=JTensor.from_ndarray(np.array([0.0]))))
fs = FeatureSet.sample_rdd(sample_rdd,
sequential_order=self.sequential_order,
shuffle=self.shuffle)
fs = fs.transform(SampleToMiniBatch(self.batch_size))
return fs
return None
[docs] def get_num_partitions(self):
return self.text_set.get_samples().getNumPartitions()
[docs]class TFImageDataset(TFDataset):
def __init__(self, image_set, tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size=False,
validation_image_set=None,
sequential_order=False, shuffle=True):
super(TFImageDataset, self).__init__(tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size)
self.image_set = image_set
self.validation_image_set = validation_image_set
self.sequential_order = sequential_order
self.shuffle = shuffle
def _get_prediction_data(self):
return self.image_set
def _get_evaluation_data(self):
return self.image_set.to_image_frame() \
.transform(MergeFeatureLabelImagePreprocessing())
def _get_training_data(self):
fs = FeatureSet.image_set(self.image_set,
sequential_order=self.sequential_order,
shuffle=self.shuffle)
fs = fs.transform(MergeFeatureLabelImagePreprocessing())
fs = fs.transform(ImageFeatureToSample())
fs = fs.transform(SampleToMiniBatch(self.batch_size))
return fs
def _get_validation_data(self):
if self.validation_image_set is not None:
fs = FeatureSet.image_set(self.validation_image_set,
sequential_order=self.sequential_order,
shuffle=self.shuffle)
fs = fs.transform(MergeFeatureLabelImagePreprocessing())
fs = fs.transform(ImageFeatureToSample())
fs = fs.transform(SampleToMiniBatch(self.batch_size))
return fs
return None
[docs] def get_num_partitions(self):
return self.image_set.get_image().getNumPartitions()
[docs]class TFParkSampleToMiniBatch(Preprocessing):
"""
a Transformer that converts Feature to (Feature, None).
"""
def __init__(self,
batch_size,
drop_remainder,
bigdl_type="float"):
super(TFParkSampleToMiniBatch, self).__init__(bigdl_type, batch_size, drop_remainder)
[docs]class TFNdarrayDataset(TFDataset):
def __init__(self, rdd, tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size=False,
val_rdd=None, sequential_order=True, shuffle=False):
super(TFNdarrayDataset, self).__init__(tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size)
self.val_rdd = val_rdd
self.rdd = rdd
self.sequential_order = sequential_order
self.shuffle = shuffle
if self.hard_code_batch_size:
logging.warning("hard_code_batch_size is set to true, so we"
" must drop remainder elements in the dataset"
" to avoid outputting small batches, the dropped"
" elements will not get processed. You can "
"pad your dataset so that the total number "
"of elements is divisible by the total batch size"
" to avoid this.")
def _get_prediction_data(self):
rdd = self.rdd.map(lambda t: Sample.from_ndarray(nest.flatten(t), np.array([0.0])))
rdd_wrapper = callZooFunc("float", "zooRDDSampleToMiniBatch", rdd, self.batch_per_thread,
self.hard_code_batch_size)
return rdd_wrapper.value().toJavaRDD()
def _get_evaluation_data(self):
rdd = self.rdd.map(lambda t: Sample.from_ndarray(nest.flatten(t), np.array([0.0])))
rdd_wrapper = callZooFunc("float", "zooRDDSampleToMiniBatch", rdd, self.batch_per_thread,
self.hard_code_batch_size)
return rdd_wrapper.value().toJavaRDD()
def _get_training_data(self):
sample_rdd = self.rdd.map(
lambda t: Sample.from_ndarray(nest.flatten(t), np.array([0.0])))
fs = FeatureSet.sample_rdd(sample_rdd,
sequential_order=self.sequential_order,
shuffle=self.shuffle)
# for training there won't be any remainder, the input to SampleToMiniBatch
# will loop indefinitely
fs = fs.transform(TFParkSampleToMiniBatch(self.batch_size, drop_remainder=False))
return fs
def _get_validation_data(self):
if self.val_rdd is not None:
sample_rdd = self.val_rdd.map(
lambda t: Sample.from_ndarray(nest.flatten(t), np.array([0.0])))
fs = FeatureSet.sample_rdd(sample_rdd,
sequential_order=self.sequential_order,
shuffle=self.shuffle)
fs = fs.transform(TFParkSampleToMiniBatch(self.batch_size, self.hard_code_batch_size))
return fs
return None
[docs] def get_num_partitions(self):
return self.rdd.getNumPartitions()
[docs] @staticmethod
def from_rdd(rdd, names=None, shapes=None, types=None,
batch_size=-1, batch_per_thread=-1,
hard_code_batch_size=False, val_rdd=None,
features=None, labels=None,
sequential_order=False,
shuffle=True):
import tensorflow as tf
if features is not None:
feature_structure = _to_tensor_structure(features)
if labels is not None:
label_structure = _to_tensor_structure(labels)
tensor_structure = (feature_structure, label_structure)
else:
tensor_structure = (feature_structure,)
return TFNdarrayDataset(rdd, tensor_structure,
batch_size, batch_per_thread,
hard_code_batch_size, val_rdd,
sequential_order=sequential_order,
shuffle=shuffle)
if names is not None or shapes is not None or types is not None:
if not names:
names = ["features", "labels"]
if not shapes:
shapes = [None] * len(names)
if not types:
types = [tf.float32] * len(names)
tensor_structure = []
for i in range(len(names)):
tensor_structure.append(TensorMeta(types[i], name=names[i], shape=shapes[i]))
else:
tensor_structure = [TensorMeta(dtype=tf.float32), TensorMeta(dtype=tf.float32)]
return TFNdarrayDataset(rdd, tensor_structure,
batch_size, batch_per_thread,
hard_code_batch_size, val_rdd,
sequential_order=sequential_order, shuffle=shuffle)
[docs] @staticmethod
def from_ndarrays(tensors, batch_size=-1, batch_per_thread=-1,
hard_code_batch_size=False, val_tensors=None,
sequential_order=False, shuffle=True):
sc = getOrCreateSparkContext()
node_num, core_num = get_node_and_core_number()
total_core_num = node_num * core_num
rdd, tensor_structure = _tensors_to_rdd(tensors, sc, total_core_num)
val_rdd = None
if val_tensors is not None:
val_rdd, _ = _tensors_to_rdd(val_tensors, sc, total_core_num)
return TFNdarrayDataset(rdd, tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size,
val_rdd, sequential_order=sequential_order, shuffle=shuffle)
[docs]class DataFrameDataset(TFNdarrayDataset):
[docs] @staticmethod
def df_datatype_to_tf(dtype):
import tensorflow as tf
import pyspark.sql.types as df_types
if isinstance(dtype, df_types.FloatType):
return (tf.float32, ())
if isinstance(dtype, df_types.IntegerType):
return (tf.int32, ())
if isinstance(dtype, df_types.LongType):
return (tf.int64, ())
if isinstance(dtype, df_types.DoubleType):
return (tf.float64, ())
if isinstance(dtype, df_types.ArrayType):
return (tf.float32, (None,))
if isinstance(dtype, VectorUDT):
return (tf.float32, (None,))
return None
[docs] @staticmethod
def is_scalar_type(dtype):
import pyspark.sql.types as df_types
if isinstance(dtype, df_types.FloatType):
return True
if isinstance(dtype, df_types.IntegerType):
return True
if isinstance(dtype, df_types.LongType):
return True
if isinstance(dtype, df_types.DoubleType):
return True
return False
def __init__(self, df, feature_cols, labels_cols=None, batch_size=-1,
batch_per_thread=-1, hard_code_batch_size=False,
validation_df=None,
sequential_order=False, shuffle=True):
assert isinstance(feature_cols, list), "feature_cols should be a list"
if labels_cols is not None:
assert isinstance(labels_cols, list), "label_cols should be a list"
import pyspark
assert isinstance(df, pyspark.sql.DataFrame)
if labels_cols is None:
labels_cols = []
selected_df = df.select(*(feature_cols + labels_cols))
schema = selected_df.schema
feature_meta = []
for feature_col in feature_cols:
field = schema[feature_col]
name = field.name
data_type = field.dataType
if DataFrameDataset.df_datatype_to_tf(data_type) is None:
raise ValueError(
"data type {} of col {} is not supported for now".format(data_type, name))
tf_type, tf_shape = DataFrameDataset.df_datatype_to_tf(data_type)
feature_meta.append(TensorMeta(tf_type, name=name, shape=tf_shape))
if labels_cols:
label_meta = []
for label_col in labels_cols:
field = schema[label_col]
name = field.name
data_type = field.dataType
if DataFrameDataset.df_datatype_to_tf(data_type) is None:
raise ValueError(
"data type {} of col {} is not supported for now".format(data_type, name))
tf_type, tf_shape = DataFrameDataset.df_datatype_to_tf(data_type)
label_meta.append(TensorMeta(tf_type, name=name, shape=tf_shape))
tensor_structure = (feature_meta, label_meta)
else:
tensor_structure = (feature_meta,)
def convert(row):
def convert_for_cols(row, cols):
import pyspark.sql.types as df_types
result = []
for name in cols:
feature_type = schema[name].dataType
if DataFrameDataset.is_scalar_type(feature_type):
result.append(np.array(row[name]))
elif isinstance(feature_type, df_types.ArrayType):
result.append(np.array(row[name]))
elif isinstance(row[name], DenseVector):
result.append(row[name].values)
else:
assert isinstance(row[name], SparseVector), \
"unsupported field {}, data {}".format(schema[name], row[name])
result.append(row[name].toArray())
if len(result) == 1:
return result[0]
return result
features = convert_for_cols(row, feature_cols)
if labels_cols:
labels = convert_for_cols(row, labels_cols)
return (features, labels)
else:
return (features,)
rdd = selected_df.rdd.map(lambda row: convert(row))
val_rdd = validation_df.rdd.map(lambda row: convert(row)) if validation_df else None
super(DataFrameDataset, self).__init__(rdd, tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size,
val_rdd, sequential_order, shuffle)