TensorFlow 高阶API
Dataset(tf.data)
源码定义:tensorflow/python/data/ops/dataset_ops.py
用户向导:Dataset Input Pipeline
Dataset可以用来表示输入管道元素集合(张量的嵌套结构)和“逻辑计划“对这些元素的转换操作。在Dataset中元素可以是向量,元组或字典等形式。
另外,Dataset需要配合另外一个类Iterator进行使用,Iterator对象是一个迭代器,可以对Dataset中的元素进行迭代提取。
api | 说明 |
---|---|
from_generator(generator,output_types,output_shapes=None) | Creates a Dataset whose elements are generated by generator. |
from_tensor_slices(tensors) | 创建dataset,其元素是给定张量的切片的元素 |
from_tensors(tensors) | 创建一个Dataset包含给定张量的单个元素。 |
interleave(map_func,cycle_length,block_length=1) | 对数据集进行map转换并合并插入结果 |
list_files(file_pattern,shuffle=None) | A dataset of all files matching a pattern. |
range(*args) | Creates a Dataset of a step-separated range of values. |
skip(count) | Creates a Dataset that skips count elements from this dataset. |
take(count) | Creates a Dataset with at most count elements from this dataset. |
zip(datasets) | Creates a Dataset by zipping together the given datasets. |
prefetch(buffer_size) | 创建dataset,在请求输入数据集之前从输入数据集中预提取元素 |
apply(transformation_func) | Apply a transformation function to this dataset. |
cache(filename='') | Caches the elements in this dataset. |
concatenate(dataset) | Creates a Dataset by concatenating given dataset with this dataset. |
filter(predicate) | Filters this dataset according to predicate. |
map(map_func,num_parallel_calls=None) | Maps map_func across this dataset. |
flat_map(map_func) | Maps map_func across this dataset and flattens the result. |
batch(batch_size) | 将数据集的连续元素合成批次 |
padded_batch(batch_size,padded_shapes,padding_values=None) | 将数据集的连续元素组合到填充批次中,此转换将输入数据集的多个连续元素组合为单个元素。 |
repeat(count=None) | Repeats this dataset count times. |
shard(num_shards,index) | 将Dataset分割成num_shards个子数据集。这个函数在分布式训练中非常有用,它允许每个设备读取唯一子集。 |
shuffle(buffer_size,seed=None,reshuffle_each_iteration=None) | 随机混洗数据集的元素。 |
make_initializable_iterator(shared_name=None) | Creates an Iterator for enumerating the elements of this dataset. |
make_one_shot_iterator() | 创建Iterator用于枚举此数据集的元素。(可自动初始化) |
Iterator
源码定义:tensorflow/python/data/ops/iterator_ops.py
用户向导:Dataset Input Pipeline > Iterating over datasets
api | 说明 |
---|---|
get_next(name=None) | In graph mode, you should typically call this method once and use its result as the input to another computation. |
Estimator(tf.estimator)
源码定义:tensorflow/python/estimator/estimator.py
用户向导:Regression Examples
Estimator对象包装(wraps)由model_fn指定的模型,model_fn由给定输入和其他一些参数,返回需要进行训练、计算,或预测的操作.
所有输出(检查点,事件文件等)都被写入model_dir或其子目录.如果model_dir未设置,则使用临时目录.
可以通过RunConfig对象(包含了有关执行环境的信息)传递config参数.它被传递给model_fn,如果model_fn有一个名为“config”的参数(和输入函数以相同的方式).Estimator使得模型可配置化,并且还使用其一些字段来控制内部,特别是关于检查点的控制.
该params参数包含hyperparameter,如果model_fn有一个名为“PARAMS”的参数,并且以相同的方式传递给输入函数,则将它传递给model_fn. Estimator只是沿着参数传递,并不检查它.因此,params的结构完全取决于开发人员.
不能在子类中重写任何Estimator方法(其构造函数强制执行此操作).子类应使用model_fn来配置基类,并且可以添加实现专门功能的方法.
api | 说明 |
---|---|
init(model_fn, model_dir=None, config=None, params=None) | Estimator的构造方法,返回EstimatorSpec实例 |
evaluate(input_fn,steps=None,hooks=None,checkpoint_path=None,name=None) | 返回一个包含按name为键的model_fn中指定的计算指标的词典。 |
export_savedmodel(export_dir_base, serving_input_receiver_fn) | 将推理图作为SavedModel导出到给定的目录中.该方法通过首先调用serving_input_receiver_fn来获取特征Tensors来构建一个新图,然后调用这个Estimator的model_fn来基于这些特征生成模型图.它在新的会话中将给定的检查点恢复到该图中.最后它会在给定的export_dir_base下面创建一个时间戳导出目录,并在其中写入一个SavedModel,其中包含从此会话保存的单个MetaGraphDef. |
get_variable_names() | 返回此模型中所有变量名称的列表. |
get_variable_value(name) | 返回由名称给出的变量的值. 返回值类型:Numpy数组 |
latest_checkpoint() | 查找model_dir中最新保存的检查点文件的文件名. |
predict(self,input_fn,predict_keys=None) | 对给定的features产生预测,返回predictions字典张量的计算值. |
train(input_fn,hooks=None,steps=None,max_steps=None,saving_listeners=None) | 训练给定训练数据input_fn的模型. |
FeatureColumns(tf.feature_column)
源码定义:tensorflow/python/feature_column/feature_column.py
用户向导:feature_columns
feature_columns2
Feature cloumns是原始数据和Estimator模型之间的桥梁,它们被用来把各种形式的原始数据转换为模型能够使用的格式
api | 返回值 | 用法 |
---|---|---|
tf.feature_column.bucketized_column(source_column,boundaries) | Represents discretized dense input. | Bucketized column用来把numeric column的值按照提供的边界(boundaries)离散化为多个值 |
tf.feature_column.categorical_column_with_hash_bucket(key,hash_bucket_size,dtype=tf.string) | Represents sparse feature where ids are set by hashing. | 用户指定类别的总数,通过hash的方式来得到最终的类别ID |
tf.feature_column.categorical_column_with_identity(key,num_buckets,default_value=None) | A _CategoricalColumn that returns identity values. | 与Bucketized column类似,Categorical identity column用单个唯一值表示bucket。 |
tf.feature_column.categorical_column_with_vocabulary_file(key,vocabulary_file) | A _CategoricalColumn with a vocabulary file. | 类别特征做one-hot编码,字典通过文件给出 |
tf.feature_column.categorical_column_with_vocabulary_list(key,vocabulary_list) | A _CategoricalColumn with in-memory vocabulary. | 类别特征做one-hot编码,字典通过list给出 |
tf.feature_column.crossed_column(keys,hash_bucket_size,hash_key=None) | Returns a column for performing crosses of categorical features. | 对keys的笛卡尔积执行hash操作,再把hash的结果对hash_bucket_size取模得到最终的结果 |
tf.feature_column.embedding_column(categorical_column,dimension,combiner='mean') | _DenseColumn that converts from sparse, categorical input. | 把原始特征映射为一个低维稠密的实数向量 |
tf.feature_column.indicator_column(categorical_column) | Represents multi-hot representation of given categorical column. | 把categorical column得到的稀疏tensor转换为one-hot或者multi-hot形式的稠密tensor |
tf.feature_column.input_layer(features,feature_columns) | Returns a dense Tensor as input layer based on given feature_columns. | |
tf.feature_column.linear_model(features,feature_columns) | Returns a linear prediction Tensor based on given feature_columns. | |
tf.feature_column.make_parse_example_spec(feature_columns) | Creates parsing spec dictionary from input feature_columns. | |
tf.feature_column.numeric_column(key,shape=(1,),default_value=None,dtype=tf.float32,normalizer_fn=None) | Represents real valued or numerical features. | |
tf.feature_column.shared_embedding_columns(categorical_columns,dimension,combiner='mean') | List of dense columns that convert from sparse, categorical input. | 把多个特征共享相同的embeding映射空间 |
tf.feature_column.weighted_categorical_column(categorical_column,weight_feature_key,dtype=tf.float32) | 给一个类别特征赋予一定的权重,给一个类别特征赋予一定的权重 |
tf.nn
激活函数op,loss部分抽象
源码定义:tensorflow/python/ops/gen_nn_ops.py
用户向导:Neural Network
api | 说明 |
---|---|
tf.nn.relu(...) | Computes rectified linear: max(features, 0). |
tf.nn.sigmoid(...) | Computes sigmoid of x element-wise. |
tf.nn.bias_add(...) | |
tf.nn.tanh(...) | Computes hyperbolic tangent of x element-wise. |
tf.nn.dropout() | |
tf.nn.softmax() | |
tf.nn.sigmoid_cross_entropy_with_logits(...) | Computes sigmoid cross entropy given logits. |
tf.nn.top_k(...) | Finds values and indices of the k largest entries for the last dimension. |
tf.nn.embedding_lookup(params,ids,partition_strategy='mod') | 按照ids查找params里面的vector然后输出 |
tf.nn.embedding_lookup_sparse(params,sp_ids,sp_weights) | Looks up ids in a list of embedding tensors,Computes embeddings for the given ids and weights. |
tf.nn.zero_fraction(value,name=None) | Returns the fraction of zeros in value. |
tf.nn.nce_loss() | 负采样的NCE loss实现 |
tf.layers
网络模块抽象
api | 说明 |
---|---|
average_pooling1d(...) | Average Pooling layer for 1D inputs. |
average_pooling2d(...) | Average pooling layer for 2D inputs (e.g. images). |
average_pooling3d(...) | Average pooling layer for 3D inputs (e.g. volumes). |
batch_normalization(...) | Functional interface for the batch normalization layer. |
conv1d(...) | Functional interface for 1D convolution layer (e.g. temporal convolution). |
conv2d(...) | Functional interface for the 2D convolution layer. |
conv2d_transpose(...) | Functional interface for transposed 2D convolution layer. |
conv3d(...) | Functional interface for the 3D convolution layer. |
conv3d_transpose(...) | Functional interface for transposed 3D convolution layer. |
dense(...) | Functional interface for the densely-connected layer. |
dropout(...) | Applies Dropout to the input. |
flatten(...) | Flattens an input tensor while preserving the batch axis (axis 0). |
max_pooling1d(...) | Max Pooling layer for 1D inputs. |
max_pooling2d(...) | Max pooling layer for 2D inputs (e.g. images). |
max_pooling3d(...) | Max pooling layer for 3D inputs (e.g. volumes). |
separable_conv1d(...) | Functional interface for the depthwise separable 1D convolution layer. |
separable_conv2d(...) | Functional interface for the depthwise separable 2D convolution layer. |
tf.train
tf.train provides a set of classes and functions that help train models.
用户向导:Training
api | 说明 |
---|---|
tf.train.Optimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
tf.train.GradientDescentOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
tf.train.AdadeltaOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
tf.train.AdagradOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
tf.train.AdamOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
tf.train.SyncReplicasOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
tf.train.SessionRunHook | Training Hooks,Hook to extend calls to MonitoredSession.run(). |
tf.train.SessionRunArgs | Training Hooks,Represents arguments to be added to a Session.run() call. |
tf.train.global_step | sess每执行完一个batch,就给global_step加1,用来统计目前执行的batch数 |
tf.train.basic_train_loop | |
tf.train.get_global_step() | 返回global_step作为name的tensor |
tf.train.get_or_create_global_step() | Returns and create (if necessary) the global step tensor. |
tf.train.write_graph | |
tf.train.string_input_producer | 输出字符串到一个输入管道队列 |
tf.train.Saver(max_to_keep=5) | 保存模型 |
tf.train.latest_checkpoint(checkpoint_dir,latest_filename=None) | Finds the filename of latest saved checkpoint file. |
tf.train.Int64List | Int64List |
tf.train.FloatList | FloatList |
tf.train.Features | Features |
tf.train.Example | Example |
tf.train.batch(tensors,batch_size) | Creates batches of tensors in tensors. |
tf.trainable_variables() | |
tf.train.Scaffold | Scaffold |
tf.linalg
TensorFlow 的线性代数库。
w3cschool TensorFlow官方文档-linalg
api | 说明 |
---|---|
adjoint(...) | 转置最后两个维度和共轭张量matrix. |
band_part(...) | 复制张量设置每个最内层矩阵中心带外的所有内容 |
cholesky(...) | 计算一个或多个方阵的Cholesky分解. |
cholesky_solve(...) | A X = RHS给出的Cholesky因子分解,求解线性方程组. |
det(...) | 计算一个或多个方阵的行列式. |
diag(...) | 返回具有给定批处理对角线值的批处理对角线张量. |
diag_part(...) | 返回批处理张量的批处理对角线部分. |
eigh(...) | 计算了一批自共轭矩阵的特征分解. |
eigvalsh(...) | 计算一个或多个自共轭矩阵的特征值. |
einsum(...) | 任意维度的张量之间的广义收缩. |
expm(...) | 计算一个或多个方阵的矩阵指数. |
eye(...) | 构造一个单位矩阵或批矩阵. |
inv(...) | 计算一个或多个平方可逆矩阵或它们的倒数 |
logdet(...) | 计算hermitian正定矩阵的行列式的对数. |
logm(...) | 计算一个或多个方阵的矩阵对数: |
lstsq(...) | 解决一个或多个线性最小二乘问题. |
norm(...) | 计算向量,矩阵和张量的范数.(不赞成的参数) |
qr(...) | 计算一个或多个矩阵的QR分解. |
set_diag(...) | 返回具有新批处理对角线值的批处理矩阵张量. |
slogdet(...) | 计算行列式的绝对值的符号和日志 |
solve(...) | 求解线性方程组. |
svd(...) | 计算一个或多个矩阵的奇异值分解. |
tensordot(...) | a和b沿指定轴的张量收缩. |
trace(...) | 计算张量x的轨迹. |
transpose(...) | 转置张量a的最后两个维度. |
triangular_solve(...) | 求解具有上三角矩阵或下三角矩阵的线性方程组. |
checkpoint(模型保存与恢复)
Estimator 将一个检查点保存到 model_dir 中后,每次调用 Estimator 的 train、eval 或 predict 方法时,都会发生下列情况:
a) Estimator 通过运行 model_fn() 构建模型图。(要详细了解 model_fn(),请参阅创建自定义 Estimator。)
b) Estimator 根据最近写入的检查点中存储的数据来初始化新模型的权重。
换言之,一旦存在检查点,TensorFlow 就会在您每次调用 train()、evaluate() 或 predict() 时重建模型。
Tensorflow Serving
官方例子
half_plus_two的例子
# Download the TensorFlow Serving Docker image and repo
docker pull tensorflow/serving
git clone https://github.com/tensorflow/serving
# Location of demo models
TESTDATA="$(pwd)/serving/tensorflow_serving/servables/tensorflow/testdata"
# Start TensorFlow Serving container and open the REST API port
# docker run -t 表示是否允许伪TTY
# docker run --rm表示如果实例已经存在,则先remove掉该实例再重新启动新实例
# docker -p设置端口映射
# docker -v设置磁盘映射
# docker -e设置环境变量
docker run -t --rm -p 8501:8501
-v "$TESTDATA/saved_model_half_plus_two_cpu:/models/half_plus_two"
-e MODEL_NAME=half_plus_two
tensorflow/serving &
# Query the model using the predict API
curl -d '{"instances": [1.0, 2.0, 5.0]}'
-X POST http://localhost:8501/v1/models/half_plus_two:predict
# Returns => { "predictions": [2.5, 3.0, 4.5] }
serving镜像提供了两种调用方式:gRPC和HTTP请求。gRPC默认端口是8500,HTTP请求的默认端口是8501,serving镜像中的程序会自动加载镜像内/models下的模型,通过MODEL_NAME指定/models下的哪个模型。
创建自定义镜像
docker run --rm -p 8501:8501
--mount type=bind,source=${model_dir},target=/models/dpp_model
-e MODEL_NAME=dpp_model -t tensorflow/serving &
架构
TensorFlow Serving可抽象为一些组件构成,每个组件实现了不同的API任务,其中最重要的是Servable, Loader, Source, 和 Manager,我们看一下组件之间是如何交互的。
Source
TF Serving发现磁盘上的模型文件,该模型服务的生命周期就开始了。Source组件负责发现模型文件,找出需要加载的新模型。实际上,该组件监控文件系统,当发现一个新的模型版本,就为该模型创建一个Loader。
Loader
Loader需要知道模型的相关信息,包括如何加载模型如何估算模型需要的资源,包括需要请求的RAM、GPU内存。Loader带一个指针,连接到磁盘上存储的模型,其中包含加载模型需要的相关元数据。不过记住,Loader现在还不允许加载模型。
Manager
Manager收到待加载模型版本,开始模型服务流程。此处有两种可能性,第一种情况是模型首次推送部署,Manager先确保模型需要的资源可用,一旦获取相应的资源,Manager赋予Loader权限去加载模型。
第二种情况是为已上线模型部署一个新版本。Manager会先查询Version Policy插件,决定加载新模型的流程如何进行。
具体来说,当加载新模型时,可选择保持 (1) 可用性(the Availability Preserving Policy)或 (2) 资源(the Resource Preserving Policy)。
Servable
最后,当用户请求模型的句柄,Manager返回句柄给Servable。
Servables 是 TensorFlow Serving 中最核心的抽象,是客户端用于执行计算 (例如:查找或推断) 的底层对象。
部署服务
模型导出
将TensorFlow构建的模型用作服务,首先需要确保导出为正确的格式,可以采用TensorFlow提供的SavedModel类。TensorFlow Saver提供模型checkpoint磁盘文件的保存/恢复。事实上SavedModel封装了TensorFlow Saver,对于模型服务是一种标准的导出方法。
SignatureDefs定义了一组TensorFlow支持的计算签名,便于在计算图中找到适合的输入输出张量。简单的说,使用这些计算签名,可以准确指定特定的输入输出节点。目前有3个服务API: 分类、预测和回归。每个签名定义关联一个RPC API。分类SignatureDef用于分类RPC API,预测SignatureDef用于RPC API等等。对于分类SignatureDef,需要一个输入张量(接收数据)以及可能的输出张量: 类别或得分。回归SignatureDef需要一个输入张量以及另一个输出张量。最后预测SignatureDef需要一个可变长度的输入输出张量。
tf.saved_model API
with sess.graph.as_default() as graph:
builder = tf.saved_model.builder.SavedModelBuilder(saved_model_dir)
# 1.8版本,1.12之后调整为tf.saved_model.predict_signature_def
signature = tf.saved_model.signature_def_utils.predict_signature_def(inputs={'image': in_image},
outputs={'prediction': graph.get_tensor_by_name('final_result:0')},)
builder.add_meta_graph_and_variables(sess=sess,
tags=["serve"],
signature_def_map={'predict':signature,
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:signature
})
builder.save()
tf.estimator API
def export_savedmodel(self,
export_dir,
serving_input_receiver_fn=None,
as_text=False,
checkpoint_path=None):
if serving_input_receiver_fn is None and self.feature_cfgs is None:
raise ValueError('Both serving_input_receiver_fn and feature_cfgs are none.')
if serving_input_receiver_fn is None:
feature_spec = self.feature_cfgs.to_feature_spec(exclude_targets=True)
serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
# 自己封装的estimator
savedmodel_path = self.estimator.export_savedmodel(
export_dir_base=export_dir,
serving_input_receiver_fn=serving_input_receiver_fn,
as_text=as_text,
checkpoint_path=checkpoint_path
)
return savedmodel_path
其中self.feature_cfgs.to_feature_spec函数得到如下结果:
{'follow_seq': VarLenFeature(dtype=tf.int64), 'uid': VarLenFeature(dtype=tf.int64)}
说明了输入】
TensorFlow Serving ModelServer 用于发现新导出的模型,并启动 gRPC 用于提供模型服务。
API请求(predict)
TensorFlow ModelServer通过host:port接受下面这种RESTful API请求:
POST http://host:port/
URI: /v1/models/({MODEL_NAME}[/versions/){MODEL_VERSION}]
VERB: classify|regress|predict
其中“/versions/${MODEL_VERSION}”是可选的,如果省略,则使用最新的版本。
该API基本遵循gRPC版本的PredictionService API。
请求URL的示例:
http://host:port/v1/models/iris:classify
http://host:port/v1/models/mnist/versions/314:predict
请求格式
预测API的请求体必须是如下格式的JSON对象:
{
// (Optional) Serving signature to use.
// If unspecifed default serving signature is used.
"signature_name": <string>,
// Input Tensors in row ("instances") or columnar ("inputs") format.
// A request can have either of them but NOT both.
"instances": <value>|<(nested)list>|<list-of-objects>
"inputs": <value>|<(nested)list>|<object>
}
以行的形式说明输入的tensor
{
"instances": [
{
"tag": "foo",
"signal": [1, 2, 3, 4, 5],
"sensor": [[1, 2], [3, 4]]
},
{
"tag": "bar",
"signal": [3, 4, 1, 2, 5]],
"sensor": [[4, 5], [6, 8]]
}
]
}
以列的形式说明输入的tensor
{
"inputs": {
"tag": ["foo", "bar"],
"signal": [[1, 2, 3, 4, 5], [3, 4, 1, 2, 5]],
"sensor": [[[1, 2], [3, 4]], [[4, 5], [6, 8]]]
}
}
返回格式
行形式
{
"predictions": <value>|<(nested)list>|<list-of-objects>
}
如果模型的输出只包含一个命名的tensor,我们省略名字和predictions key map,直接使用标量或者值的list。如果模型输出多个命名的tensor,我们输出对象list,和上面提到的行形式输入类似。
列形式
{
"outputs": <value>|<(nested)list>|<object>
}
如果模型的输出只包含一个命名的tensor,我们省略名字和outputs key map,直接使用标量或者值的list。如果模型输出多个命名的tensor,我们输出对象,其每个key都和输出的tensor名对应,和上面提到的列形式输入类似。
API demo
$ curl -d '{"instances": [1.0,2.0,5.0]}' -X POST http://localhost:8501/v1/models/half_plus_three:predict
{
"predictions": [3.5, 4.0, 5.5]
}
[译]TensorFlow Serving RESTful API
源码理解
开发中遇到的其他问题记录
实现numpy中的切片赋值
# 初始化一个3*4的tensor和大小为3的list
cis = tf.zeros([3, 4)
cis_list = [tf.zeros([4]) for _ in range(3)]
# 替换list的一个元素
cis_list[k] = eis
# 重新堆叠成一个tensor,列切片赋值axis=1?
cis = tf.stack(cis_list)
附录
- tf.data.Dataset
- Tensorflow中的数据对象Dataset
- Dataset Input Pipeline
- 基于Tensorflow高阶API构建大规模分布式深度学习模型系列之特征工程Feature Columns
- w3cschool TensorFlow官方文档
- TensorFlow入门12 -- Checkpoints,保存和恢复Estimator创建的模型
- 官方文档
- TensorFlow-Serving:Flexible, High-Performance ML Serving
- 如何用TF Serving部署TensorFlow模型
- TensorFlow 学习笔记
- Client API(REST)
- tensorflow-serving源码理解
- docker部署tensorflow serving以及模型替换