zoukankan      html  css  js  c++  java
  • 【推荐算法工程师技术栈系列】分布式&数据库--tensorflow

    TensorFlow 高阶API

    Dataset(tf.data)

    源码定义:tensorflow/python/data/ops/dataset_ops.py
    用户向导:Dataset Input Pipeline

    Dataset可以用来表示输入管道元素集合(张量的嵌套结构)和“逻辑计划“对这些元素的转换操作。在Dataset中元素可以是向量,元组或字典等形式。
    另外,Dataset需要配合另外一个类Iterator进行使用,Iterator对象是一个迭代器,可以对Dataset中的元素进行迭代提取。

    api 说明
    from_generator(generator,output_types,output_shapes=None) Creates a Dataset whose elements are generated by generator.
    from_tensor_slices(tensors) 创建dataset,其元素是给定张量的切片的元素
    from_tensors(tensors) 创建一个Dataset包含给定张量的单个元素。
    interleave(map_func,cycle_length,block_length=1) 对数据集进行map转换并合并插入结果
    list_files(file_pattern,shuffle=None) A dataset of all files matching a pattern.
    range(*args) Creates a Dataset of a step-separated range of values.
    skip(count) Creates a Dataset that skips count elements from this dataset.
    take(count) Creates a Dataset with at most count elements from this dataset.
    zip(datasets) Creates a Dataset by zipping together the given datasets.
    prefetch(buffer_size) 创建dataset,在请求输入数据集之前从输入数据集中预提取元素
    apply(transformation_func) Apply a transformation function to this dataset.
    cache(filename='') Caches the elements in this dataset.
    concatenate(dataset) Creates a Dataset by concatenating given dataset with this dataset.
    filter(predicate) Filters this dataset according to predicate.
    map(map_func,num_parallel_calls=None) Maps map_func across this dataset.
    flat_map(map_func) Maps map_func across this dataset and flattens the result.
    batch(batch_size) 将数据集的连续元素合成批次
    padded_batch(batch_size,padded_shapes,padding_values=None) 将数据集的连续元素组合到填充批次中,此转换将输入数据集的多个连续元素组合为单个元素。
    repeat(count=None) Repeats this dataset count times.
    shard(num_shards,index) 将Dataset分割成num_shards个子数据集。这个函数在分布式训练中非常有用,它允许每个设备读取唯一子集。
    shuffle(buffer_size,seed=None,reshuffle_each_iteration=None) 随机混洗数据集的元素。
    make_initializable_iterator(shared_name=None) Creates an Iterator for enumerating the elements of this dataset.
    make_one_shot_iterator() 创建Iterator用于枚举此数据集的元素。(可自动初始化)

    Iterator
    源码定义:tensorflow/python/data/ops/iterator_ops.py
    用户向导:Dataset Input Pipeline > Iterating over datasets

    api 说明
    get_next(name=None) In graph mode, you should typically call this method once and use its result as the input to another computation.

    Estimator(tf.estimator)

    源码定义:tensorflow/python/estimator/estimator.py
    用户向导:Regression Examples

    Estimator对象包装(wraps)由model_fn指定的模型,model_fn由给定输入和其他一些参数,返回需要进行训练、计算,或预测的操作.
    所有输出(检查点,事件文件等)都被写入model_dir或其子目录.如果model_dir未设置,则使用临时目录.
    可以通过RunConfig对象(包含了有关执行环境的信息)传递config参数.它被传递给model_fn,如果model_fn有一个名为“config”的参数(和输入函数以相同的方式).Estimator使得模型可配置化,并且还使用其一些字段来控制内部,特别是关于检查点的控制.
    该params参数包含hyperparameter,如果model_fn有一个名为“PARAMS”的参数,并且以相同的方式传递给输入函数,则将它传递给model_fn. Estimator只是沿着参数传递,并不检查它.因此,params的结构完全取决于开发人员.
    不能在子类中重写任何Estimator方法(其构造函数强制执行此操作).子类应使用model_fn来配置基类,并且可以添加实现专门功能的方法.

    api 说明
    init(model_fn, model_dir=None, config=None, params=None) Estimator的构造方法,返回EstimatorSpec实例
    evaluate(input_fn,steps=None,hooks=None,checkpoint_path=None,name=None) 返回一个包含按name为键的model_fn中指定的计算指标的词典。
    export_savedmodel(export_dir_base, serving_input_receiver_fn) 将推理图作为SavedModel导出到给定的目录中.该方法通过首先调用serving_input_receiver_fn来获取特征Tensors来构建一个新图,然后调用这个Estimator的model_fn来基于这些特征生成模型图.它在新的会话中将给定的检查点恢复到该图中.最后它会在给定的export_dir_base下面创建一个时间戳导出目录,并在其中写入一个SavedModel,其中包含从此会话保存的单个MetaGraphDef.
    get_variable_names() 返回此模型中所有变量名称的列表.
    get_variable_value(name) 返回由名称给出的变量的值. 返回值类型:Numpy数组
    latest_checkpoint() 查找model_dir中最新保存的检查点文件的文件名.
    predict(self,input_fn,predict_keys=None) 对给定的features产生预测,返回predictions字典张量的计算值.
    train(input_fn,hooks=None,steps=None,max_steps=None,saving_listeners=None) 训练给定训练数据input_fn的模型.

    FeatureColumns(tf.feature_column)

    源码定义:tensorflow/python/feature_column/feature_column.py
    用户向导:feature_columns
    feature_columns2

    Feature cloumns是原始数据和Estimator模型之间的桥梁,它们被用来把各种形式的原始数据转换为模型能够使用的格式

    api 返回值 用法
    tf.feature_column.bucketized_column(source_column,boundaries) Represents discretized dense input. Bucketized column用来把numeric column的值按照提供的边界(boundaries)离散化为多个值
    tf.feature_column.categorical_column_with_hash_bucket(key,hash_bucket_size,dtype=tf.string) Represents sparse feature where ids are set by hashing. 用户指定类别的总数,通过hash的方式来得到最终的类别ID
    tf.feature_column.categorical_column_with_identity(key,num_buckets,default_value=None) A _CategoricalColumn that returns identity values. 与Bucketized column类似,Categorical identity column用单个唯一值表示bucket。
    tf.feature_column.categorical_column_with_vocabulary_file(key,vocabulary_file) A _CategoricalColumn with a vocabulary file. 类别特征做one-hot编码,字典通过文件给出
    tf.feature_column.categorical_column_with_vocabulary_list(key,vocabulary_list) A _CategoricalColumn with in-memory vocabulary. 类别特征做one-hot编码,字典通过list给出
    tf.feature_column.crossed_column(keys,hash_bucket_size,hash_key=None) Returns a column for performing crosses of categorical features. 对keys的笛卡尔积执行hash操作,再把hash的结果对hash_bucket_size取模得到最终的结果
    tf.feature_column.embedding_column(categorical_column,dimension,combiner='mean') _DenseColumn that converts from sparse, categorical input. 把原始特征映射为一个低维稠密的实数向量
    tf.feature_column.indicator_column(categorical_column) Represents multi-hot representation of given categorical column. 把categorical column得到的稀疏tensor转换为one-hot或者multi-hot形式的稠密tensor
    tf.feature_column.input_layer(features,feature_columns) Returns a dense Tensor as input layer based on given feature_columns.
    tf.feature_column.linear_model(features,feature_columns) Returns a linear prediction Tensor based on given feature_columns.
    tf.feature_column.make_parse_example_spec(feature_columns) Creates parsing spec dictionary from input feature_columns.
    tf.feature_column.numeric_column(key,shape=(1,),default_value=None,dtype=tf.float32,normalizer_fn=None) Represents real valued or numerical features.
    tf.feature_column.shared_embedding_columns(categorical_columns,dimension,combiner='mean') List of dense columns that convert from sparse, categorical input. 把多个特征共享相同的embeding映射空间
    tf.feature_column.weighted_categorical_column(categorical_column,weight_feature_key,dtype=tf.float32) 给一个类别特征赋予一定的权重,给一个类别特征赋予一定的权重

    tf.nn

    激活函数op,loss部分抽象

    源码定义:tensorflow/python/ops/gen_nn_ops.py
    用户向导:Neural Network

    api 说明
    tf.nn.relu(...) Computes rectified linear: max(features, 0).
    tf.nn.sigmoid(...) Computes sigmoid of x element-wise.
    tf.nn.bias_add(...)
    tf.nn.tanh(...) Computes hyperbolic tangent of x element-wise.
    tf.nn.dropout()
    tf.nn.softmax()
    tf.nn.sigmoid_cross_entropy_with_logits(...) Computes sigmoid cross entropy given logits.
    tf.nn.top_k(...) Finds values and indices of the k largest entries for the last dimension.
    tf.nn.embedding_lookup(params,ids,partition_strategy='mod') 按照ids查找params里面的vector然后输出
    tf.nn.embedding_lookup_sparse(params,sp_ids,sp_weights) Looks up ids in a list of embedding tensors,Computes embeddings for the given ids and weights.
    tf.nn.zero_fraction(value,name=None) Returns the fraction of zeros in value.
    tf.nn.nce_loss() 负采样的NCE loss实现

    tf.layers

    网络模块抽象

    api 说明
    average_pooling1d(...) Average Pooling layer for 1D inputs.
    average_pooling2d(...) Average pooling layer for 2D inputs (e.g. images).
    average_pooling3d(...) Average pooling layer for 3D inputs (e.g. volumes).
    batch_normalization(...) Functional interface for the batch normalization layer.
    conv1d(...) Functional interface for 1D convolution layer (e.g. temporal convolution).
    conv2d(...) Functional interface for the 2D convolution layer.
    conv2d_transpose(...) Functional interface for transposed 2D convolution layer.
    conv3d(...) Functional interface for the 3D convolution layer.
    conv3d_transpose(...) Functional interface for transposed 3D convolution layer.
    dense(...) Functional interface for the densely-connected layer.
    dropout(...) Applies Dropout to the input.
    flatten(...) Flattens an input tensor while preserving the batch axis (axis 0).
    max_pooling1d(...) Max Pooling layer for 1D inputs.
    max_pooling2d(...) Max pooling layer for 2D inputs (e.g. images).
    max_pooling3d(...) Max pooling layer for 3D inputs (e.g. volumes).
    separable_conv1d(...) Functional interface for the depthwise separable 1D convolution layer.
    separable_conv2d(...) Functional interface for the depthwise separable 2D convolution layer.

    tf.train

    tf.train provides a set of classes and functions that help train models.

    用户向导:Training

    api 说明
    tf.train.Optimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
    tf.train.GradientDescentOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
    tf.train.AdadeltaOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
    tf.train.AdagradOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
    tf.train.AdamOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
    tf.train.SyncReplicasOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
    tf.train.SessionRunHook Training Hooks,Hook to extend calls to MonitoredSession.run().
    tf.train.SessionRunArgs Training Hooks,Represents arguments to be added to a Session.run() call.
    tf.train.global_step sess每执行完一个batch,就给global_step加1,用来统计目前执行的batch数
    tf.train.basic_train_loop
    tf.train.get_global_step() 返回global_step作为name的tensor
    tf.train.get_or_create_global_step() Returns and create (if necessary) the global step tensor.
    tf.train.write_graph
    tf.train.string_input_producer 输出字符串到一个输入管道队列
    tf.train.Saver(max_to_keep=5) 保存模型
    tf.train.latest_checkpoint(checkpoint_dir,latest_filename=None) Finds the filename of latest saved checkpoint file.
    tf.train.Int64List Int64List
    tf.train.FloatList FloatList
    tf.train.Features Features
    tf.train.Example Example
    tf.train.batch(tensors,batch_size) Creates batches of tensors in tensors.
    tf.trainable_variables()
    tf.train.Scaffold Scaffold

    tf.linalg

    TensorFlow 的线性代数库。
    w3cschool TensorFlow官方文档-linalg

    api 说明
    adjoint(...) 转置最后两个维度和共轭张量matrix.
    band_part(...) 复制张量设置每个最内层矩阵中心带外的所有内容
    cholesky(...) 计算一个或多个方阵的Cholesky分解.
    cholesky_solve(...) A X = RHS给出的Cholesky因子分解,求解线性方程组.
    det(...) 计算一个或多个方阵的行列式.
    diag(...) 返回具有给定批处理对角线值的批处理对角线张量.
    diag_part(...) 返回批处理张量的批处理对角线部分.
    eigh(...) 计算了一批自共轭矩阵的特征分解.
    eigvalsh(...) 计算一个或多个自共轭矩阵的特征值.
    einsum(...) 任意维度的张量之间的广义收缩.
    expm(...) 计算一个或多个方阵的矩阵指数.
    eye(...) 构造一个单位矩阵或批矩阵.
    inv(...) 计算一个或多个平方可逆矩阵或它们的倒数
    logdet(...) 计算hermitian正定矩阵的行列式的对数.
    logm(...) 计算一个或多个方阵的矩阵对数:
    lstsq(...) 解决一个或多个线性最小二乘问题.
    norm(...) 计算向量,矩阵和张量的范数.(不赞成的参数)
    qr(...) 计算一个或多个矩阵的QR分解.
    set_diag(...) 返回具有新批处理对角线值的批处理矩阵张量.
    slogdet(...) 计算行列式的绝对值的符号和日志
    solve(...) 求解线性方程组.
    svd(...) 计算一个或多个矩阵的奇异值分解.
    tensordot(...) a和b沿指定轴的张量收缩.
    trace(...) 计算张量x的轨迹.
    transpose(...) 转置张量a的最后两个维度.
    triangular_solve(...) 求解具有上三角矩阵或下三角矩阵的线性方程组.

    checkpoint(模型保存与恢复)

    Estimator 将一个检查点保存到 model_dir 中后,每次调用 Estimator 的 train、eval 或 predict 方法时,都会发生下列情况:
    a) Estimator 通过运行 model_fn() 构建模型图。(要详细了解 model_fn(),请参阅创建自定义 Estimator。)
    b) Estimator 根据最近写入的检查点中存储的数据来初始化新模型的权重。
    换言之,一旦存在检查点,TensorFlow 就会在您每次调用 train()、evaluate() 或 predict() 时重建模型。

    Tensorflow Serving

    官方例子

    half_plus_two的例子

    # Download the TensorFlow Serving Docker image and repo
    docker pull tensorflow/serving
    
    git clone https://github.com/tensorflow/serving
    # Location of demo models
    TESTDATA="$(pwd)/serving/tensorflow_serving/servables/tensorflow/testdata"
    
    # Start TensorFlow Serving container and open the REST API port
    # docker run -t 表示是否允许伪TTY
    # docker run --rm表示如果实例已经存在,则先remove掉该实例再重新启动新实例
    # docker -p设置端口映射
    # docker -v设置磁盘映射
    # docker -e设置环境变量
    docker run -t --rm -p 8501:8501 
        -v "$TESTDATA/saved_model_half_plus_two_cpu:/models/half_plus_two" 
        -e MODEL_NAME=half_plus_two 
        tensorflow/serving &
    
    # Query the model using the predict API
    curl -d '{"instances": [1.0, 2.0, 5.0]}' 
        -X POST http://localhost:8501/v1/models/half_plus_two:predict
    
    # Returns => { "predictions": [2.5, 3.0, 4.5] }
    

    serving镜像提供了两种调用方式:gRPC和HTTP请求。gRPC默认端口是8500,HTTP请求的默认端口是8501,serving镜像中的程序会自动加载镜像内/models下的模型,通过MODEL_NAME指定/models下的哪个模型。

    创建自定义镜像

    docker run --rm -p 8501:8501 
        --mount type=bind,source=${model_dir},target=/models/dpp_model 
        -e MODEL_NAME=dpp_model -t tensorflow/serving &
    

    官方文档

    架构

    TensorFlow Serving可抽象为一些组件构成,每个组件实现了不同的API任务,其中最重要的是Servable, Loader, Source, 和 Manager,我们看一下组件之间是如何交互的。

    Source

    TF Serving发现磁盘上的模型文件,该模型服务的生命周期就开始了。Source组件负责发现模型文件,找出需要加载的新模型。实际上,该组件监控文件系统,当发现一个新的模型版本,就为该模型创建一个Loader。

    Loader

    Loader需要知道模型的相关信息,包括如何加载模型如何估算模型需要的资源,包括需要请求的RAM、GPU内存。Loader带一个指针,连接到磁盘上存储的模型,其中包含加载模型需要的相关元数据。不过记住,Loader现在还不允许加载模型。

    Manager

    Manager收到待加载模型版本,开始模型服务流程。此处有两种可能性,第一种情况是模型首次推送部署,Manager先确保模型需要的资源可用,一旦获取相应的资源,Manager赋予Loader权限去加载模型。
    第二种情况是为已上线模型部署一个新版本。Manager会先查询Version Policy插件,决定加载新模型的流程如何进行。
    具体来说,当加载新模型时,可选择保持 (1) 可用性(the Availability Preserving Policy)或 (2) 资源(the Resource Preserving Policy)。

    Servable

    最后,当用户请求模型的句柄,Manager返回句柄给Servable。

    Servables 是 TensorFlow Serving 中最核心的抽象,是客户端用于执行计算 (例如:查找或推断) 的底层对象。

    部署服务

    模型导出

    将TensorFlow构建的模型用作服务,首先需要确保导出为正确的格式,可以采用TensorFlow提供的SavedModel类。TensorFlow Saver提供模型checkpoint磁盘文件的保存/恢复。事实上SavedModel封装了TensorFlow Saver,对于模型服务是一种标准的导出方法。
    SignatureDefs定义了一组TensorFlow支持的计算签名,便于在计算图中找到适合的输入输出张量。简单的说,使用这些计算签名,可以准确指定特定的输入输出节点。目前有3个服务API: 分类、预测和回归。每个签名定义关联一个RPC API。分类SignatureDef用于分类RPC API,预测SignatureDef用于RPC API等等。对于分类SignatureDef,需要一个输入张量(接收数据)以及可能的输出张量: 类别或得分。回归SignatureDef需要一个输入张量以及另一个输出张量。最后预测SignatureDef需要一个可变长度的输入输出张量。

    提供tensorflow模型

    tf.saved_model API

    with sess.graph.as_default() as graph:
        builder = tf.saved_model.builder.SavedModelBuilder(saved_model_dir)
    	# 1.8版本,1.12之后调整为tf.saved_model.predict_signature_def
        signature = tf.saved_model.signature_def_utils.predict_signature_def(inputs={'image': in_image},
                                          outputs={'prediction': graph.get_tensor_by_name('final_result:0')},)
        builder.add_meta_graph_and_variables(sess=sess,
                                             tags=["serve"],
                                             signature_def_map={'predict':signature,
    		       			   tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:signature
    	})
        builder.save()
    

    tf.estimator API

    def export_savedmodel(self,
                          export_dir,
                          serving_input_receiver_fn=None,
                          as_text=False,
                          checkpoint_path=None):
        if serving_input_receiver_fn is None and self.feature_cfgs is None:
            raise ValueError('Both serving_input_receiver_fn and feature_cfgs are none.')
    
        if serving_input_receiver_fn is None:
            feature_spec = self.feature_cfgs.to_feature_spec(exclude_targets=True)
            serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
    	# 自己封装的estimator
        savedmodel_path = self.estimator.export_savedmodel(
            export_dir_base=export_dir,
            serving_input_receiver_fn=serving_input_receiver_fn,
            as_text=as_text,
            checkpoint_path=checkpoint_path
        )
        return savedmodel_path
    

    其中self.feature_cfgs.to_feature_spec函数得到如下结果:

    {'follow_seq': VarLenFeature(dtype=tf.int64), 'uid': VarLenFeature(dtype=tf.int64)}
    

    说明了输入】

    TensorFlow Serving ModelServer 用于发现新导出的模型,并启动 gRPC 用于提供模型服务。

    API请求(predict)

    TensorFlow ModelServer通过host:port接受下面这种RESTful API请求:
    POST http://host:port/:
    URI: /v1/models/({MODEL_NAME}[/versions/){MODEL_VERSION}]
    VERB: classify|regress|predict

    其中“/versions/${MODEL_VERSION}”是可选的,如果省略,则使用最新的版本。

    该API基本遵循gRPC版本的PredictionService API。

    请求URL的示例:

    http://host:port/v1/models/iris:classify
    http://host:port/v1/models/mnist/versions/314:predict
    

    请求格式

    预测API的请求体必须是如下格式的JSON对象:

    {
      // (Optional) Serving signature to use.
      // If unspecifed default serving signature is used.
      "signature_name": <string>,
    
      // Input Tensors in row ("instances") or columnar ("inputs") format.
      // A request can have either of them but NOT both.
      "instances": <value>|<(nested)list>|<list-of-objects>
      "inputs": <value>|<(nested)list>|<object>
    }
    

    以行的形式说明输入的tensor

    {
     "instances": [
       {
         "tag": "foo",
         "signal": [1, 2, 3, 4, 5],
         "sensor": [[1, 2], [3, 4]]
       },
       {
         "tag": "bar",
         "signal": [3, 4, 1, 2, 5]],
         "sensor": [[4, 5], [6, 8]]
       }
     ]
    }
    

    以列的形式说明输入的tensor

    {
     "inputs": {
       "tag": ["foo", "bar"],
       "signal": [[1, 2, 3, 4, 5], [3, 4, 1, 2, 5]],
       "sensor": [[[1, 2], [3, 4]], [[4, 5], [6, 8]]]
     }
    }
    

    返回格式

    行形式

    {
      "predictions": <value>|<(nested)list>|<list-of-objects>
    }
    

    如果模型的输出只包含一个命名的tensor,我们省略名字和predictions key map,直接使用标量或者值的list。如果模型输出多个命名的tensor,我们输出对象list,和上面提到的行形式输入类似。

    列形式

    {
      "outputs": <value>|<(nested)list>|<object>
    }
    

    如果模型的输出只包含一个命名的tensor,我们省略名字和outputs key map,直接使用标量或者值的list。如果模型输出多个命名的tensor,我们输出对象,其每个key都和输出的tensor名对应,和上面提到的列形式输入类似。

    API demo

    $ curl -d '{"instances": [1.0,2.0,5.0]}' -X POST http://localhost:8501/v1/models/half_plus_three:predict
    {
        "predictions": [3.5, 4.0, 5.5]
    }
    

    [译]TensorFlow Serving RESTful API

    源码理解

    tensorflow-serving源码理解

    开发中遇到的其他问题记录

    实现numpy中的切片赋值

    # 初始化一个3*4的tensor和大小为3的list
    cis = tf.zeros([3, 4)
    cis_list = [tf.zeros([4]) for _ in range(3)]
    # 替换list的一个元素
    cis_list[k] = eis
    # 重新堆叠成一个tensor,列切片赋值axis=1?
    cis = tf.stack(cis_list)
    

    附录

  • 相关阅读:
    HTML特殊字符编码对照表
    在Echarts 柱形图的单击事件中写入自定义的参数
    IIS7.5支持解析读取.json文件数据 -- 问题
    VS SVN
    WebApi 跨域问题解决方案:CORS
    SQL Server2012中的SequenceNumber尝试
    Oracle数据类型与.NET中的对应关系
    MongoDB 学习 --转
    MongoDB 基础
    CSS魔法堂:你真的懂text-align吗?
  • 原文地址:https://www.cnblogs.com/arachis/p/tensorflow.html
Copyright © 2011-2022 走看看