zoukankan      html  css  js  c++  java
  • 使用movielens数据集动手实现youtube推荐候选集生成

    综述

    之前在博客中总结过nce损失YouTuBe DNN推荐;但大多都还是停留在理论层面,没有实践经验。所以笔者想借由此文继续深入探索YouTuBe DNN推荐,另外也进一步总结TensorFlow使用姿势。另外本代码仅自己学习练习使用,如有不妥地方欢迎讨论。

    第一步,搞定特征和样本

    # 网络参数,在inference_fn进行初始化
    weights = {}
    # 训练集header,也用于input_fn
    _CSV_COLUMNS = []
    # label list最大长度
    MAX_NUM_TRUE = 100
    # movieId映射到index
    movieid_index = {}
    # index映射到movieId
    index_movieid = {}
    
    
    # 封装样本与特征生成部分
    class FeatureSampleGenerators:
        def __init__(self):
            """
                用于构造训练集
                # 特征设计
                - 用户观看电影id序列    label
                - 偏好tag id序列       tagIDs
                - 偏好风格类型序列      genresIDs
                - 用户id              userId
               
                # 样本设计
                - 负样本采样(tf.nn.nce_loss()不用定义负样本)
                随机从全量视频中抽取用户没有观看过的作为负样本,不能使用正样本视频之后上架的电影
    
            """
            # 特征和特征基数映射
            self.feature_info = {}
            # label的列表最大长度
            self.max_num_true = 0
            # label个数 / nce正样本个数
            self.n_classes = 0
            # 训练集样本数
            self.num_train = 0
            data_root = "./ml-latest"
            all_columns = ["userId", "tagIDs", "genresIDs", "label"]
            global _CSV_COLUMNS
            _CSV_COLUMNS = all_columns
    
            self.data_root = data_root
            self.train_data = os.path.join(data_root, "train.csv")
            self.statistical_info = os.path.join(data_root, "statistical_info.pkl")
    
            if not os.path.exists(self.train_data) or not os.path.exists(self.statistical_info):
                # 训练集构造
                trainDF = pd.DataFrame(columns=all_columns)
    
                ratingsDF = pd.read_csv(os.path.join(data_root, "ratings.csv"))
                print(ratingsDF.head(3))
    
                def concat_ws(group):
                    return "|".join(map(str, group.movieId))
    
                # 构造用户观看电影id序列作为label
                watchMidsDF = ratingsDF.sort_values("timestamp")[["userId", "movieId"]].groupby("userId").agg(
                    concat_ws).reset_index()
    
                trainDF[["userId", "label"]] = watchMidsDF[["userId", "movieId"]]
                self.feature_info['userId'] = trainDF['userId'].max()
    
                genome_scores = pd.read_csv(os.path.join(data_root, "genome-scores.csv"))
                genome_tags = pd.read_csv(os.path.join(data_root, "genome-tags.csv"))
                mid_tagidDF = pd.merge(genome_scores[genome_scores["relevance"] > 0.3], genome_tags, on="tagId")[
                    ["movieId", "tagId", "tag"]]
                mid_tagidDF.index = mid_tagidDF["movieId"]
                mid_tagid = mid_tagidDF["tagId"].to_dict()
    
                def mid2tagid(mids):
                    tagids = [mid_tagid[int(mid)] for mid in mids.split("|") if mid.isdigit() and int(mid) in mid_tagid]
                    return "|".join(map(str, sorted(set(tagids))))
    
                # 构造偏好tag id序列
                trainDF["tagIDs"] = trainDF["label"].fillna("").apply(mid2tagid)
                self.feature_info['tagIDs'] = mid_tagidDF['tagId'].max()
    
                moviesDF = pd.read_csv(os.path.join(data_root, "movies.csv"))
                moviesDF.index = moviesDF["movieId"]
                # 计算类别总数
                self.n_classes = moviesDF["movieId"].drop_duplicates().shape[0]
    
                genres_genresid = {}
                idx = 1
                for line in moviesDF["genres"].drop_duplicates().values:
                    for g in line.split("|"):
                        if g not in genres_genresid:
                            genres_genresid[g] = idx
                            idx += 1
                print(u"风格id映射", genres_genresid)
                mid_genres = moviesDF["genres"].to_dict()
    
                def mid2genresID(mids):
                    genresIDs = set()
                    for mid in mids.split("|"):
                        if mid.isdigit() and int(mid) in mid_genres:
                            for g in mid_genres[int(mid)].split("|"):
                                genresIDs.add(genres_genresid[g])
                    return "|".join(map(str, sorted(genresIDs)))
    
                # 构造偏好风格类型序列
                trainDF["genresIDs"] = trainDF["label"].fillna("").apply(mid2genresID)
                self.feature_info['genresIDs'] = max(genres_genresid.values())
    
                # 映射movieID到连续index,label必须从0开始
                index = 0
                for mid in moviesDF[['movieId']].sort_values('movieId', ascending=True).drop_duplicates()["movieId"].values:
                    movieid_index[mid] = index
                    index_movieid[index] = mid
                    index += 1
                # print("movieid_index :",movieid_index)
                # print("index_movieid :", index_movieid)
    
                def mid2index(mids):
                    index_list = []
                    for mid in mids.split("|"):
                        if mid.isdigit() and int(mid) in movieid_index:
                            index_list.append(movieid_index[int(mid)])
                    return "|".join(map(str, index_list))
    
                trainDF["label"] = trainDF["label"].apply(mid2index)
                trainDF[all_columns].to_csv(index=False, path_or_buf=self.train_data, header=False)
                self.num_train = trainDF.shape[0]
                self.max_num_true = trainDF["label"].apply(lambda x: len(str(x).split("|"))).max()
                print("max_num_true,", self.max_num_true)
                statistical_info = {"feature_info": self.feature_info, "n_classes": self.n_classes,
                                    "num_train": self.num_train, "max_num_true": self.max_num_true}
                pkl.dump(statistical_info, open(self.statistical_info, "w"))
                # 序列化
                print("starting serialize")
                self._serialize()
                print("ending serialize")
                del ratingsDF
                del genome_scores
                del genome_tags
                del moviesDF
                del trainDF
                gc.collect()
            else:
                statistical_info = pkl.load(open(self.statistical_info))
                self.feature_info = statistical_info["feature_info"]
                self.n_classes = statistical_info["n_classes"]
                self.num_train = statistical_info["num_train"]
                self.max_num_true = statistical_info["max_num_true"]
                # 序列化
                print("starting serialize")
                self._serialize()
                print("ending serialize")
    
        def _serialize(self):
            """
            支持将csv格式文本序列化为TFRecords,并持久化
            """
            max_num_true = min(MAX_NUM_TRUE, self.max_num_true)
            self.serialize_data = os.path.join(self.data_root, "train.tfr")
            writer = tf.python_io.TFRecordWriter(self.serialize_data)
            with open(self.train_data) as f:
                lines = f.readlines()
    
            for line in lines:
                feature = {}
                for ix, part in enumerate(line.split(",")):
                    # 处理定长label,重复copy前面的序列直到填满
                    if _CSV_COLUMNS[ix] == "label":
                        values = [int(s) for s in str(part).strip("
    ").split("|") if s.isdigit()]
                        while len(values) < max_num_true:
                            values.extend(values)
                        values = values[:max_num_true]
                        feature[_CSV_COLUMNS[ix]] = tf.train.Feature(int64_list=tf.train.Int64List(value=values))
                    else:  # 处理可变长features
                        values = [int(s) for s in str(part).strip("
    ").split("|") if s.isdigit()]
                        feature[_CSV_COLUMNS[ix]] = tf.train.Feature(int64_list=tf.train.Int64List(value=values))
                example = tf.train.Example(features=tf.train.Features(feature=feature))
                writer.write(example.SerializeToString())
    

    第二步,自定义深度模型的数据输入函数

    def input_fn(data_files, batch_size, mode, feature_structure):
        """
        从TFRecordDataset生成batch,用于训练与预测
        """
        if mode not in ['train', 'eval']:
            raise ValueError("Un-supported mode for input function")
    
        # repeat(count):repeat代表从ds这个数据集要重复读取几次
        # 在这里num_epochs等于None或者-1,代表无限次重复下去
        repeat_count = 1
        dataset = tf.data.TFRecordDataset(data_files).repeat(repeat_count)
    
        # 现在ds中得数据集已经时按照batch_size组合成得一个一个batch,存放在队列中,并且是重复了n次
        # 这样子得话,不断重复,后面数据是没有意义,所以要将其随机打乱
        # shuffle(buffer_size=10000):表示打乱的时候使用的buffer大小是10000,即ds中按顺序取10000个出来
        # 打乱放回去,接着从后面再取10000个,按顺序来
        if mode == 'train':
            dataset = dataset.shuffle(buffer_size=5 * batch_size)
    
        # Batch it up.
        dataset = dataset.batch(batch_size)
        # 最简单的一种迭代器,仅会对数据集遍历一遍
        iterator = dataset.make_one_shot_iterator()
        serialized_batch = iterator.get_next()
        decoded = tf.parse_example(serialized_batch, features=feature_structure)
        label_batch = decoded['label']
        decoded.pop('label')
        print("feature_batch ,", decoded)
        print("label_batch ,", label_batch)
        return decoded, label_batch
    

    第三步,自定义神经网络结构

    def inference_fn(features, mode, **params):
        """
        Args:
            features: This is the first item returned from the `input_fn`
                 passed to `train`, `evaluate`, and `predict`. This should be a
                 `Tensor` of int32 type, where the int32 value is the index for `tf.nn.embedding_lookup`
            mode: Optional. Specifies if this training, evaluation or
                 prediction. See `ModeKeys`.
            params: Optional `dict` of hyperparameters.  Will receive what
                 is passed to Estimator in `params` parameter. This allows
                 to configure Estimators from hyper parameter tuning.
    
        Returns:
            logits
        """
        feature_info = params.get("feature_info", None)
        # feature_dim_size = params.get("feature_dim_size", None)
        hidden_units = params.get("hidden_units", [100])
        n_classes = params.get("n_classes")
    
        tf.set_random_seed(2019)
    
        feature_column = []
        # 官方推荐embedding_size为特征基数的四次方
        embedding_size = 100
        feature_dim_size = len(feature_info) * embedding_size
    
        # 初始化权重
        cmb_size = hidden_units[-1]
        weights["deep_kernel_0"] = tf.Variable(tf.random_normal([feature_dim_size, hidden_units[0]]))
        weights["deep_bias_0"] = tf.Variable(tf.random_normal([hidden_units[0]]))
        weights["nce_kernel_0"] = tf.Variable(tf.zeros([cmb_size, n_classes]))
        weights["nce_bias_0"] = tf.Variable(tf.zeros([n_classes]))
    
        for feature in feature_info:
            feature_dim = feature_info[feature]
            # 为什么要哈希分桶:扩大id类型的参数空间,相比于一个权重,分桶为模型提供了可用于学习的额外灵活性。
            # 参考自:https://www.tensorflow.org/guide/feature_columns#bucketized_column
            hash_bucket_size = feature_dim + 100
            embedding_col = tf.feature_column.categorical_column_with_hash_bucket(
                feature, hash_bucket_size=hash_bucket_size, dtype=tf.int64)
            embedding_col = tf.feature_column.embedding_column(
                embedding_col, dimension=embedding_size, combiner="mean")
            feature_column.append(embedding_col)
    
        with tf.variable_scope('input_layer', values=(features,)):
            embedding_layer = tf.feature_column.input_layer(features=features, feature_columns=feature_column)
    
        with tf.variable_scope('hidden_layer', values=(embedding_layer,)):
            deep_layer = embedding_layer
            deep_layer = tf.add(tf.matmul(deep_layer, weights["deep_kernel_0"]), weights["deep_bias_0"])
            deep_layer = tf.nn.relu(deep_layer)
    
        with tf.variable_scope('logits', values=(deep_layer,)):
            logits = deep_layer
    
        return logits
    

    第四步,实现loss_fn

    def loss_fn(logits, labels, **params):
        """
        tf.nn.nce_loss的官方说明
        Computes and returns the noise-contrastive estimation training loss.
    
          See [Noise-contrastive estimation: A new estimation principle for
          unnormalized statistical
          models](http://www.jmlr.org/proceedings/papers/v9/gutmann10a/gutmann10a.pdf).
          Also see our [Candidate Sampling Algorithms
          Reference](https://www.tensorflow.org/extras/candidate_sampling.pdf)
    
          A common use case is to use this method for training, and calculate the full
          sigmoid loss for evaluation or inference. In this case, you must set
          `partition_strategy="div"` for the two losses to be consistent, as in the
          following example:
    
          ```python
          if mode == "train":
            loss = tf.nn.nce_loss(
                weights=weights,
                biases=biases,
                labels=labels,
                inputs=inputs,
                ...,
                partition_strategy="div")
          elif mode == "eval":
            logits = tf.matmul(inputs, tf.transpose(weights))
            logits = tf.nn.bias_add(logits, biases)
            labels_one_hot = tf.one_hot(labels, n_classes)
            loss = tf.nn.sigmoid_cross_entropy_with_logits(
                labels=labels_one_hot,
                logits=logits)
            loss = tf.reduce_sum(loss, axis=1)
          ```
    
          Note: By default this uses a log-uniform (Zipfian) distribution for sampling,
          so your labels must be sorted in order of decreasing frequency to achieve
          good results.  For more details, see
          @{tf.nn.log_uniform_candidate_sampler}.
    
          Note: In the case where `num_true` > 1, we assign to each target class
          the target probability 1 / `num_true` so that the target probabilities
          sum to 1 per-example.
    
          Note: It would be useful to allow a variable number of target classes per
          example.  We hope to provide this functionality in a future release.
          For now, if you have a variable number of target classes, you can pad them
          out to a constant number by either repeating them or by padding
          with an otherwise unused class.
    
          Args:
            weights: A `Tensor` of shape `[num_classes, dim]`, or a list of `Tensor`
                objects whose concatenation along dimension 0 has shape
                [num_classes, dim].  The (possibly-partitioned) class embeddings.
            biases: A `Tensor` of shape `[num_classes]`.  The class biases.
            labels: A `Tensor` of type `int64` and shape `[batch_size,
                num_true]`. The target classes.
            inputs: A `Tensor` of shape `[batch_size, dim]`.  The forward
                activations of the input network.
            num_sampled: An `int`.  The number of classes to randomly sample per batch.
            num_classes: An `int`. The number of possible classes.
            num_true: An `int`.  The number of target classes per training example.
            sampled_values: a tuple of (`sampled_candidates`, `true_expected_count`,
                `sampled_expected_count`) returned by a `*_candidate_sampler` function.
                (if None, we default to `log_uniform_candidate_sampler`)
            remove_accidental_hits:  A `bool`.  Whether to remove "accidental hits"
                where a sampled class equals one of the target classes.  If set to
                `True`, this is a "Sampled Logistic" loss instead of NCE, and we are
                learning to generate log-odds instead of log probabilities.  See
                our [Candidate Sampling Algorithms Reference]
                (https://www.tensorflow.org/extras/candidate_sampling.pdf).
                Default is False.
            partition_strategy: A string specifying the partitioning strategy, relevant
                if `len(weights) > 1`. Currently `"div"` and `"mod"` are supported.
                Default is `"mod"`. See `tf.nn.embedding_lookup` for more details.
            name: A name for the operation (optional).
    
          Returns:
            A `batch_size` 1-D tensor of per-example NCE losses.
          """
        global weights
        # 负样本设置,正样本的1/7
        neg_sample = int(params["n_classes"] // 7)
    
        print("labels ", labels)
        print("logits", logits)
        print("weight", weights["nce_kernel_0"])
    
        out_layer = tf.sigmoid(tf.matmul(logits, weights["nce_kernel_0"]) + weights["nce_bias_0"])
        if labels is not None:
            loss = tf.reduce_mean(tf.nn.nce_loss(weights=tf.transpose(weights["nce_kernel_0"]),
                                                 biases=weights["nce_bias_0"],
                                                 labels=labels,
                                                 inputs=logits,
                                                 num_sampled=neg_sample,
                                                 num_classes=params["n_classes"],
                                                 num_true=params["num_true"],
                                                 remove_accidental_hits=False,
                                                 partition_strategy="mod"))
            loss = tf.reduce_sum(loss) / params["batch_size"]
        else:
            loss = None
        return loss, out_layer
    

    第五步,实现model_fn,返回EstimatorSpec用于模型训练与预测

    def model_fn(features, labels, mode, params, config):
        """
        Args:
            features: This is the first item returned from the `input_fn`
                 passed to `train`, `evaluate`, and `predict`. This should be a
                 single `Tensor` or `dict` of same.
            labels: This is the second item returned from the `input_fn`
                 passed to `train`, `evaluate`, and `predict`. This should be a
                 single `Tensor` or `dict` of same (for multi-head models). If
                 mode is `ModeKeys.PREDICT`, `labels=None` will be passed. If
                 the `model_fn`'s signature does not accept `mode`, the
                 `model_fn` must still be able to handle `labels=None`.
            mode: Optional. Specifies if this training, evaluation or
                 prediction. See `ModeKeys`.
            params: Optional `dict` of hyperparameters.  Will receive what
                 is passed to Estimator in `params` parameter. This allows
                 to configure Estimators from hyper parameter tuning.
            config: Optional configuration object. Will receive what is passed
                 to Estimator in `config` parameter, or the default `config`.
                 Allows updating things in your model_fn based on configuration
                 such as `num_ps_replicas`, or `model_dir`.
    
        Returns:
            EstimatorSpec
        """
        init_learning_rate = params["learning_rate"]
        learning_rate = init_learning_rate
    
        logits = inference_fn(
            features=features,
            mode=mode,
            **params
        )
    
        loss, out_layer = loss_fn(logits=logits, labels=labels, **params)
    
        if mode != tf.estimator.ModeKeys.TRAIN:
            probs = out_layer
            # 输出topk
            top_k = tf.nn.top_k(probs, 30, name=None, sorted=True).indices
            # user_vector是最后一层隐层的输出特征向量
            if mode == tf.estimator.ModeKeys.PREDICT:
                predictions = {
                    'user_vector': logits,                            
                    'top_k': top_k,
                    'userId': tf.sparse_tensor_to_dense(features['userId'], default_value=0)
                }
                return tf.estimator.EstimatorSpec(
                    mode=mode,
                    predictions=predictions,
                )
        else:
            optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
            train_op = optimizer.minimize(
                loss=loss, global_step=tf.train.get_global_step())
            tf.summary.scalar('loss', loss)
            return tf.estimator.EstimatorSpec(
                mode=mode,
                loss=loss,
                train_op=train_op
            )
    

    第六步,输出用户向量和topk索引

    def main(batch_size=256, train_epochs=1):
        pfsg = FeatureSampleGenerators()
    
        n_classes = pfsg.n_classes
        max_feature_index = len(pfsg.feature_info)
        num_train = pfsg.num_train
        feature_info = pfsg.feature_info
        num_true = min(MAX_NUM_TRUE, pfsg.max_num_true)
        model_dir = "./model"
    
        print("n_classes=", n_classes)
        print("feature_info:", feature_info)
        print("max_feature_index=", max_feature_index)
        print("num_train=", num_train)
        print("num_true=", num_true)
    
        feature_structure = {
            "label": tf.FixedLenFeature([num_true], dtype=tf.int64),
        }
        for idx in range(max_feature_index):
            feature_structure[_CSV_COLUMNS[idx]] = tf.VarLenFeature(tf.int64)
    
        num_steps_per_epoch = num_train // batch_size
        total_steps = num_steps_per_epoch * train_epochs
    
        train_input_fn = functools.partial(
            input_fn,
            data_files=pfsg.serialize_data,
            batch_size=batch_size,
            mode="train",
            feature_structure=feature_structure
        )
    
        model_params = {
            "feature_info": feature_info,  # 特征基数
            "batch_size": batch_size,
            "hidden_units": [100],
            "batch_norm": False,
            "n_classes": n_classes,      # 分类数 
            "num_true": num_true,
            "learning_rate": 0.001,
            "num_steps_per_epoch": num_steps_per_epoch
        }
    
        # allow_soft_placement=True会自动分配GPU或者CPU
        # intra_op_parallelism_threads设置线程一个操作内部并行运算的线程数,0表示以最优的线程数处理
        sess_config = tf.ConfigProto(
            allow_soft_placement=True,
            intra_op_parallelism_threads=0)
    
        # save_checkpoints_steps:每隔多少步生成一个检查点
        # keep_checkpoint_max:最多保留最近几个检查点
        model_config = tf.contrib.learn.RunConfig(
            save_checkpoints_steps=1000,
            keep_checkpoint_max=10,
            session_config=sess_config
        )
    
        model = tf.estimator.Estimator(
            model_fn=model_fn,
            model_dir=model_dir,
            config=model_config,
            params=model_params,
        )
    
        steps_trained = 0
        while steps_trained < total_steps:
            train_steps = min(num_steps_per_epoch, total_steps - steps_trained)
    
            model.train(input_fn=train_input_fn, steps=train_steps)
            steps_trained += train_steps
            print("Trained for {} steps, total {} so far.".format(train_steps, steps_trained))
    
        # 获取所有user vector和对应的topk的label index(因为movieId是从0开始的连续空间,label index也即movieId)
        test_input_fn = functools.partial(
            input_fn,
            data_files=pfsg.serialize_data,
            batch_size=batch_size,
            mode="eval",
            feature_structure=feature_structure,
        )
    
        res = model.predict(test_input_fn)
        result = {}
        userIds = []
        top_ks = []
        user_vectors = []
        for sample in res:
            userIds.append(sample['userId'][0])
            # index还原为对应的movieID
            top_ks.append([index_movieid[int(index)] for index in sample['top_k'] ])
            user_vectors.append(sample['user_vector'])
        result['userId'] = userIds
        result['top_k'] = top_ks
        result['user_vector'] = user_vectors
        pkl.dump(result, open("./result.pkl","w"))
    
    if __name__ == "__main__":
        main()
    
    

    附录:

  • 相关阅读:
    IO库 8.5
    IO库 8.4
    标准模板库——IO库
    IO库 8.3
    IO库 8.2
    IO库 8.1
    CF 599D Spongebob and Squares(数学)
    Django入门学习(一)
    hdu 5733
    uva 11210
  • 原文地址:https://www.cnblogs.com/arachis/p/DNN_NCE.html
Copyright © 2011-2022 走看看