综述
之前在博客中总结过nce损失和YouTuBe DNN推荐;但大多都还是停留在理论层面,没有实践经验。所以笔者想借由此文继续深入探索YouTuBe DNN推荐,另外也进一步总结TensorFlow使用姿势。另外本代码仅自己学习练习使用,如有不妥地方欢迎讨论。
第一步,搞定特征和样本
# 网络参数,在inference_fn进行初始化
weights = {}
# 训练集header,也用于input_fn
_CSV_COLUMNS = []
# label list最大长度
MAX_NUM_TRUE = 100
# movieId映射到index
movieid_index = {}
# index映射到movieId
index_movieid = {}
# 封装样本与特征生成部分
class FeatureSampleGenerators:
def __init__(self):
"""
用于构造训练集
# 特征设计
- 用户观看电影id序列 label
- 偏好tag id序列 tagIDs
- 偏好风格类型序列 genresIDs
- 用户id userId
# 样本设计
- 负样本采样(tf.nn.nce_loss()不用定义负样本)
随机从全量视频中抽取用户没有观看过的作为负样本,不能使用正样本视频之后上架的电影
"""
# 特征和特征基数映射
self.feature_info = {}
# label的列表最大长度
self.max_num_true = 0
# label个数 / nce正样本个数
self.n_classes = 0
# 训练集样本数
self.num_train = 0
data_root = "./ml-latest"
all_columns = ["userId", "tagIDs", "genresIDs", "label"]
global _CSV_COLUMNS
_CSV_COLUMNS = all_columns
self.data_root = data_root
self.train_data = os.path.join(data_root, "train.csv")
self.statistical_info = os.path.join(data_root, "statistical_info.pkl")
if not os.path.exists(self.train_data) or not os.path.exists(self.statistical_info):
# 训练集构造
trainDF = pd.DataFrame(columns=all_columns)
ratingsDF = pd.read_csv(os.path.join(data_root, "ratings.csv"))
print(ratingsDF.head(3))
def concat_ws(group):
return "|".join(map(str, group.movieId))
# 构造用户观看电影id序列作为label
watchMidsDF = ratingsDF.sort_values("timestamp")[["userId", "movieId"]].groupby("userId").agg(
concat_ws).reset_index()
trainDF[["userId", "label"]] = watchMidsDF[["userId", "movieId"]]
self.feature_info['userId'] = trainDF['userId'].max()
genome_scores = pd.read_csv(os.path.join(data_root, "genome-scores.csv"))
genome_tags = pd.read_csv(os.path.join(data_root, "genome-tags.csv"))
mid_tagidDF = pd.merge(genome_scores[genome_scores["relevance"] > 0.3], genome_tags, on="tagId")[
["movieId", "tagId", "tag"]]
mid_tagidDF.index = mid_tagidDF["movieId"]
mid_tagid = mid_tagidDF["tagId"].to_dict()
def mid2tagid(mids):
tagids = [mid_tagid[int(mid)] for mid in mids.split("|") if mid.isdigit() and int(mid) in mid_tagid]
return "|".join(map(str, sorted(set(tagids))))
# 构造偏好tag id序列
trainDF["tagIDs"] = trainDF["label"].fillna("").apply(mid2tagid)
self.feature_info['tagIDs'] = mid_tagidDF['tagId'].max()
moviesDF = pd.read_csv(os.path.join(data_root, "movies.csv"))
moviesDF.index = moviesDF["movieId"]
# 计算类别总数
self.n_classes = moviesDF["movieId"].drop_duplicates().shape[0]
genres_genresid = {}
idx = 1
for line in moviesDF["genres"].drop_duplicates().values:
for g in line.split("|"):
if g not in genres_genresid:
genres_genresid[g] = idx
idx += 1
print(u"风格id映射", genres_genresid)
mid_genres = moviesDF["genres"].to_dict()
def mid2genresID(mids):
genresIDs = set()
for mid in mids.split("|"):
if mid.isdigit() and int(mid) in mid_genres:
for g in mid_genres[int(mid)].split("|"):
genresIDs.add(genres_genresid[g])
return "|".join(map(str, sorted(genresIDs)))
# 构造偏好风格类型序列
trainDF["genresIDs"] = trainDF["label"].fillna("").apply(mid2genresID)
self.feature_info['genresIDs'] = max(genres_genresid.values())
# 映射movieID到连续index,label必须从0开始
index = 0
for mid in moviesDF[['movieId']].sort_values('movieId', ascending=True).drop_duplicates()["movieId"].values:
movieid_index[mid] = index
index_movieid[index] = mid
index += 1
# print("movieid_index :",movieid_index)
# print("index_movieid :", index_movieid)
def mid2index(mids):
index_list = []
for mid in mids.split("|"):
if mid.isdigit() and int(mid) in movieid_index:
index_list.append(movieid_index[int(mid)])
return "|".join(map(str, index_list))
trainDF["label"] = trainDF["label"].apply(mid2index)
trainDF[all_columns].to_csv(index=False, path_or_buf=self.train_data, header=False)
self.num_train = trainDF.shape[0]
self.max_num_true = trainDF["label"].apply(lambda x: len(str(x).split("|"))).max()
print("max_num_true,", self.max_num_true)
statistical_info = {"feature_info": self.feature_info, "n_classes": self.n_classes,
"num_train": self.num_train, "max_num_true": self.max_num_true}
pkl.dump(statistical_info, open(self.statistical_info, "w"))
# 序列化
print("starting serialize")
self._serialize()
print("ending serialize")
del ratingsDF
del genome_scores
del genome_tags
del moviesDF
del trainDF
gc.collect()
else:
statistical_info = pkl.load(open(self.statistical_info))
self.feature_info = statistical_info["feature_info"]
self.n_classes = statistical_info["n_classes"]
self.num_train = statistical_info["num_train"]
self.max_num_true = statistical_info["max_num_true"]
# 序列化
print("starting serialize")
self._serialize()
print("ending serialize")
def _serialize(self):
"""
支持将csv格式文本序列化为TFRecords,并持久化
"""
max_num_true = min(MAX_NUM_TRUE, self.max_num_true)
self.serialize_data = os.path.join(self.data_root, "train.tfr")
writer = tf.python_io.TFRecordWriter(self.serialize_data)
with open(self.train_data) as f:
lines = f.readlines()
for line in lines:
feature = {}
for ix, part in enumerate(line.split(",")):
# 处理定长label,重复copy前面的序列直到填满
if _CSV_COLUMNS[ix] == "label":
values = [int(s) for s in str(part).strip("
").split("|") if s.isdigit()]
while len(values) < max_num_true:
values.extend(values)
values = values[:max_num_true]
feature[_CSV_COLUMNS[ix]] = tf.train.Feature(int64_list=tf.train.Int64List(value=values))
else: # 处理可变长features
values = [int(s) for s in str(part).strip("
").split("|") if s.isdigit()]
feature[_CSV_COLUMNS[ix]] = tf.train.Feature(int64_list=tf.train.Int64List(value=values))
example = tf.train.Example(features=tf.train.Features(feature=feature))
writer.write(example.SerializeToString())
第二步,自定义深度模型的数据输入函数
def input_fn(data_files, batch_size, mode, feature_structure):
"""
从TFRecordDataset生成batch,用于训练与预测
"""
if mode not in ['train', 'eval']:
raise ValueError("Un-supported mode for input function")
# repeat(count):repeat代表从ds这个数据集要重复读取几次
# 在这里num_epochs等于None或者-1,代表无限次重复下去
repeat_count = 1
dataset = tf.data.TFRecordDataset(data_files).repeat(repeat_count)
# 现在ds中得数据集已经时按照batch_size组合成得一个一个batch,存放在队列中,并且是重复了n次
# 这样子得话,不断重复,后面数据是没有意义,所以要将其随机打乱
# shuffle(buffer_size=10000):表示打乱的时候使用的buffer大小是10000,即ds中按顺序取10000个出来
# 打乱放回去,接着从后面再取10000个,按顺序来
if mode == 'train':
dataset = dataset.shuffle(buffer_size=5 * batch_size)
# Batch it up.
dataset = dataset.batch(batch_size)
# 最简单的一种迭代器,仅会对数据集遍历一遍
iterator = dataset.make_one_shot_iterator()
serialized_batch = iterator.get_next()
decoded = tf.parse_example(serialized_batch, features=feature_structure)
label_batch = decoded['label']
decoded.pop('label')
print("feature_batch ,", decoded)
print("label_batch ,", label_batch)
return decoded, label_batch
第三步,自定义神经网络结构
def inference_fn(features, mode, **params):
"""
Args:
features: This is the first item returned from the `input_fn`
passed to `train`, `evaluate`, and `predict`. This should be a
`Tensor` of int32 type, where the int32 value is the index for `tf.nn.embedding_lookup`
mode: Optional. Specifies if this training, evaluation or
prediction. See `ModeKeys`.
params: Optional `dict` of hyperparameters. Will receive what
is passed to Estimator in `params` parameter. This allows
to configure Estimators from hyper parameter tuning.
Returns:
logits
"""
feature_info = params.get("feature_info", None)
# feature_dim_size = params.get("feature_dim_size", None)
hidden_units = params.get("hidden_units", [100])
n_classes = params.get("n_classes")
tf.set_random_seed(2019)
feature_column = []
# 官方推荐embedding_size为特征基数的四次方
embedding_size = 100
feature_dim_size = len(feature_info) * embedding_size
# 初始化权重
cmb_size = hidden_units[-1]
weights["deep_kernel_0"] = tf.Variable(tf.random_normal([feature_dim_size, hidden_units[0]]))
weights["deep_bias_0"] = tf.Variable(tf.random_normal([hidden_units[0]]))
weights["nce_kernel_0"] = tf.Variable(tf.zeros([cmb_size, n_classes]))
weights["nce_bias_0"] = tf.Variable(tf.zeros([n_classes]))
for feature in feature_info:
feature_dim = feature_info[feature]
# 为什么要哈希分桶:扩大id类型的参数空间,相比于一个权重,分桶为模型提供了可用于学习的额外灵活性。
# 参考自:https://www.tensorflow.org/guide/feature_columns#bucketized_column
hash_bucket_size = feature_dim + 100
embedding_col = tf.feature_column.categorical_column_with_hash_bucket(
feature, hash_bucket_size=hash_bucket_size, dtype=tf.int64)
embedding_col = tf.feature_column.embedding_column(
embedding_col, dimension=embedding_size, combiner="mean")
feature_column.append(embedding_col)
with tf.variable_scope('input_layer', values=(features,)):
embedding_layer = tf.feature_column.input_layer(features=features, feature_columns=feature_column)
with tf.variable_scope('hidden_layer', values=(embedding_layer,)):
deep_layer = embedding_layer
deep_layer = tf.add(tf.matmul(deep_layer, weights["deep_kernel_0"]), weights["deep_bias_0"])
deep_layer = tf.nn.relu(deep_layer)
with tf.variable_scope('logits', values=(deep_layer,)):
logits = deep_layer
return logits
第四步,实现loss_fn
def loss_fn(logits, labels, **params):
"""
tf.nn.nce_loss的官方说明
Computes and returns the noise-contrastive estimation training loss.
See [Noise-contrastive estimation: A new estimation principle for
unnormalized statistical
models](http://www.jmlr.org/proceedings/papers/v9/gutmann10a/gutmann10a.pdf).
Also see our [Candidate Sampling Algorithms
Reference](https://www.tensorflow.org/extras/candidate_sampling.pdf)
A common use case is to use this method for training, and calculate the full
sigmoid loss for evaluation or inference. In this case, you must set
`partition_strategy="div"` for the two losses to be consistent, as in the
following example:
```python
if mode == "train":
loss = tf.nn.nce_loss(
weights=weights,
biases=biases,
labels=labels,
inputs=inputs,
...,
partition_strategy="div")
elif mode == "eval":
logits = tf.matmul(inputs, tf.transpose(weights))
logits = tf.nn.bias_add(logits, biases)
labels_one_hot = tf.one_hot(labels, n_classes)
loss = tf.nn.sigmoid_cross_entropy_with_logits(
labels=labels_one_hot,
logits=logits)
loss = tf.reduce_sum(loss, axis=1)
```
Note: By default this uses a log-uniform (Zipfian) distribution for sampling,
so your labels must be sorted in order of decreasing frequency to achieve
good results. For more details, see
@{tf.nn.log_uniform_candidate_sampler}.
Note: In the case where `num_true` > 1, we assign to each target class
the target probability 1 / `num_true` so that the target probabilities
sum to 1 per-example.
Note: It would be useful to allow a variable number of target classes per
example. We hope to provide this functionality in a future release.
For now, if you have a variable number of target classes, you can pad them
out to a constant number by either repeating them or by padding
with an otherwise unused class.
Args:
weights: A `Tensor` of shape `[num_classes, dim]`, or a list of `Tensor`
objects whose concatenation along dimension 0 has shape
[num_classes, dim]. The (possibly-partitioned) class embeddings.
biases: A `Tensor` of shape `[num_classes]`. The class biases.
labels: A `Tensor` of type `int64` and shape `[batch_size,
num_true]`. The target classes.
inputs: A `Tensor` of shape `[batch_size, dim]`. The forward
activations of the input network.
num_sampled: An `int`. The number of classes to randomly sample per batch.
num_classes: An `int`. The number of possible classes.
num_true: An `int`. The number of target classes per training example.
sampled_values: a tuple of (`sampled_candidates`, `true_expected_count`,
`sampled_expected_count`) returned by a `*_candidate_sampler` function.
(if None, we default to `log_uniform_candidate_sampler`)
remove_accidental_hits: A `bool`. Whether to remove "accidental hits"
where a sampled class equals one of the target classes. If set to
`True`, this is a "Sampled Logistic" loss instead of NCE, and we are
learning to generate log-odds instead of log probabilities. See
our [Candidate Sampling Algorithms Reference]
(https://www.tensorflow.org/extras/candidate_sampling.pdf).
Default is False.
partition_strategy: A string specifying the partitioning strategy, relevant
if `len(weights) > 1`. Currently `"div"` and `"mod"` are supported.
Default is `"mod"`. See `tf.nn.embedding_lookup` for more details.
name: A name for the operation (optional).
Returns:
A `batch_size` 1-D tensor of per-example NCE losses.
"""
global weights
# 负样本设置,正样本的1/7
neg_sample = int(params["n_classes"] // 7)
print("labels ", labels)
print("logits", logits)
print("weight", weights["nce_kernel_0"])
out_layer = tf.sigmoid(tf.matmul(logits, weights["nce_kernel_0"]) + weights["nce_bias_0"])
if labels is not None:
loss = tf.reduce_mean(tf.nn.nce_loss(weights=tf.transpose(weights["nce_kernel_0"]),
biases=weights["nce_bias_0"],
labels=labels,
inputs=logits,
num_sampled=neg_sample,
num_classes=params["n_classes"],
num_true=params["num_true"],
remove_accidental_hits=False,
partition_strategy="mod"))
loss = tf.reduce_sum(loss) / params["batch_size"]
else:
loss = None
return loss, out_layer
第五步,实现model_fn,返回EstimatorSpec用于模型训练与预测
def model_fn(features, labels, mode, params, config):
"""
Args:
features: This is the first item returned from the `input_fn`
passed to `train`, `evaluate`, and `predict`. This should be a
single `Tensor` or `dict` of same.
labels: This is the second item returned from the `input_fn`
passed to `train`, `evaluate`, and `predict`. This should be a
single `Tensor` or `dict` of same (for multi-head models). If
mode is `ModeKeys.PREDICT`, `labels=None` will be passed. If
the `model_fn`'s signature does not accept `mode`, the
`model_fn` must still be able to handle `labels=None`.
mode: Optional. Specifies if this training, evaluation or
prediction. See `ModeKeys`.
params: Optional `dict` of hyperparameters. Will receive what
is passed to Estimator in `params` parameter. This allows
to configure Estimators from hyper parameter tuning.
config: Optional configuration object. Will receive what is passed
to Estimator in `config` parameter, or the default `config`.
Allows updating things in your model_fn based on configuration
such as `num_ps_replicas`, or `model_dir`.
Returns:
EstimatorSpec
"""
init_learning_rate = params["learning_rate"]
learning_rate = init_learning_rate
logits = inference_fn(
features=features,
mode=mode,
**params
)
loss, out_layer = loss_fn(logits=logits, labels=labels, **params)
if mode != tf.estimator.ModeKeys.TRAIN:
probs = out_layer
# 输出topk
top_k = tf.nn.top_k(probs, 30, name=None, sorted=True).indices
# user_vector是最后一层隐层的输出特征向量
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {
'user_vector': logits,
'top_k': top_k,
'userId': tf.sparse_tensor_to_dense(features['userId'], default_value=0)
}
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
)
else:
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
train_op = optimizer.minimize(
loss=loss, global_step=tf.train.get_global_step())
tf.summary.scalar('loss', loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=train_op
)
第六步,输出用户向量和topk索引
def main(batch_size=256, train_epochs=1):
pfsg = FeatureSampleGenerators()
n_classes = pfsg.n_classes
max_feature_index = len(pfsg.feature_info)
num_train = pfsg.num_train
feature_info = pfsg.feature_info
num_true = min(MAX_NUM_TRUE, pfsg.max_num_true)
model_dir = "./model"
print("n_classes=", n_classes)
print("feature_info:", feature_info)
print("max_feature_index=", max_feature_index)
print("num_train=", num_train)
print("num_true=", num_true)
feature_structure = {
"label": tf.FixedLenFeature([num_true], dtype=tf.int64),
}
for idx in range(max_feature_index):
feature_structure[_CSV_COLUMNS[idx]] = tf.VarLenFeature(tf.int64)
num_steps_per_epoch = num_train // batch_size
total_steps = num_steps_per_epoch * train_epochs
train_input_fn = functools.partial(
input_fn,
data_files=pfsg.serialize_data,
batch_size=batch_size,
mode="train",
feature_structure=feature_structure
)
model_params = {
"feature_info": feature_info, # 特征基数
"batch_size": batch_size,
"hidden_units": [100],
"batch_norm": False,
"n_classes": n_classes, # 分类数
"num_true": num_true,
"learning_rate": 0.001,
"num_steps_per_epoch": num_steps_per_epoch
}
# allow_soft_placement=True会自动分配GPU或者CPU
# intra_op_parallelism_threads设置线程一个操作内部并行运算的线程数,0表示以最优的线程数处理
sess_config = tf.ConfigProto(
allow_soft_placement=True,
intra_op_parallelism_threads=0)
# save_checkpoints_steps:每隔多少步生成一个检查点
# keep_checkpoint_max:最多保留最近几个检查点
model_config = tf.contrib.learn.RunConfig(
save_checkpoints_steps=1000,
keep_checkpoint_max=10,
session_config=sess_config
)
model = tf.estimator.Estimator(
model_fn=model_fn,
model_dir=model_dir,
config=model_config,
params=model_params,
)
steps_trained = 0
while steps_trained < total_steps:
train_steps = min(num_steps_per_epoch, total_steps - steps_trained)
model.train(input_fn=train_input_fn, steps=train_steps)
steps_trained += train_steps
print("Trained for {} steps, total {} so far.".format(train_steps, steps_trained))
# 获取所有user vector和对应的topk的label index(因为movieId是从0开始的连续空间,label index也即movieId)
test_input_fn = functools.partial(
input_fn,
data_files=pfsg.serialize_data,
batch_size=batch_size,
mode="eval",
feature_structure=feature_structure,
)
res = model.predict(test_input_fn)
result = {}
userIds = []
top_ks = []
user_vectors = []
for sample in res:
userIds.append(sample['userId'][0])
# index还原为对应的movieID
top_ks.append([index_movieid[int(index)] for index in sample['top_k'] ])
user_vectors.append(sample['user_vector'])
result['userId'] = userIds
result['top_k'] = top_ks
result['user_vector'] = user_vectors
pkl.dump(result, open("./result.pkl","w"))
if __name__ == "__main__":
main()