from collections import namedtuple, OrderedDict, defaultdict
from torch import nn
from recsys_dl.layers.sequence import SequencePoolingLayer
import torch
class SparseFeat(namedtuple('SparseFeat',['name', 'vocabulary_size','embedding_dim', 'use_hash',
'dtype','embedding_name', 'group_name'])):
slots = ()
# 不实用hash处理
def new(cls, name, vocabulary_size, embedding_dim=4, use_hash=False, dtype='int32',
embedding_name=None, group_name="default_group"):
if embedding_name is None:
embedding_name = name
if embedding_dim == "auto":
embedding_dim = 6 * int(pow(vocabulary_size, 0.25))
return super(SparseFeat, cls).__new__(cls, name, vocabulary_size, embedding_dim, use_hash,
dtype, embedding_name, group_name)
def __hash__(self):
return self.name.__hash__()
class VarLenSparseFeat(namedtuple('VarLenSparseFeat',
['sparsefeat', 'maxlen', 'combiner', 'length_name'])):
"""变长的category序列
"""
slots = ()
def new(cls, sparsefeat, maxlen, combiner='mean', length_name=None):
return super(VarLenSparseFeat, cls).new(cls, sparsefeat, maxlen, combiner, length_name)
@property
def name(self):
return self.sparsefeat.name
@property
def vocabulary_size(self):
return self.sparsefeat.vocabulary_size
@property
def embedding_dim(self):
return self.sparsefeat.embedding_dim
@property
def dtype(self):
return self.sparsefeat.dtype
@property
def embedding_name(self):
return self.sparsefeat.embedding_name
@property
def group_name(self):
return self.sparsefeat.group_name
def hash(self):
return self.name.hash()
class DenseFeat(namedtuple('DenseFeat', ['name', 'dimension', 'dtype'])):
slots = ()
def __new__(cls, name, dimension=1, dtype='float32'):
return super(DenseFeat, cls).__new__(cls, name, dimension, dtype)
def __hash__(self):
return self.name.__hash__()
def create_embedding_matrix(sparse_columns, init_std=0.0001, linear=False, sparse=False, device='cpu'):
embedding_dict = nn.ModuleDict(
{feat.embedding_name: nn.Embedding(feat.vocabulary_size, feat.embedding_dim if not linear else 1, sparse=sparse)
for feat in sparse_columns})
for tensor in embedding_dict.values():
nn.init.normal_(tensor.weight, mean=0, std=init_std)
return embedding_dict.to(device)
计算输入数据特征维度的函数
涉及到SparseFeatVarlenSparseDenseFeat三个类
def compute_input_dim(features_columns, include_sparse=True, include_dense=True, feature_group=False):
input_dim = 0
sparse_feature_columns = list(filter(lambda x: isinstance(x, (SparseFeat, VarLenSparseFeat)), features_columns))
dense_feature_columns = list(filter(lambda x: isinstance(x, DenseFeat), features_columns))
dense_input_dim = sum(map(lambda x: x.dimension, dense_feature_columns))
if feature_group:
sparse_input_dim = len(sparse_feature_columns)
else:
sparse_input_dim = sum(feat.embedding_dim for feat in sparse_feature_columns)
if include_dense:
input_dim += dense_input_dim
if include_sparse:
input_dim += sparse_input_dim
return input_dim
def build_input_features(feature_columns):
features = OrderedDict()
start = 0
for feat in feature_columns:
feat_name = feat.name
if feat_name in features:
continue
if isinstance(feat, SparseFeat):
features[feat_name] = (start, start+1)
start += 1
elif isinstance(feat, DenseFeat):
features[feat_name] = (start, start + feat.dimension)
start += feat.dimension
elif isinstance(feat, VarLenSparseFeat):
features[feat_name] = (start, start + feat.maxlen)
start += feat.maxlen
if feat.length_name is not None and feat.length_name not in features:
features[feat.length_name] = (start, start + 1)
start += 1
else:
raise TypeError("Invalid feature column type, got", type(feat))
return features
def get_varlen_pooling_list(embedding_dict, features, feature_index, varlen_cols, device):
varlen_embedding = []
for feat in varlen_cols:
seq_emb = embedding_dict[feat.embedding_name](
features[:, feature_index[feat.name][0]:feature_index[feat.name][1]].long())
if feat.length_name is None:
seq_mask = features[:, feature_index[feat.name][0]:feature_index[feat.name][1]].long() != 0
emb = SequencePoolingLayer(mode=feat.combiner, support_masking=True, device=device)(
[seq_emb, seq_mask])
else:
seq_length = features[:, feature_index[feat.length_name][0]:feature_index[feat.length_name][1]].long()
emb = SequencePoolingLayer(mode=feat.combiner, support_masking=True, device=device)(
[seq_emb, seq_length])
varlen_embedding.append(emb)
return varlen_embedding
def input_from_feature_columns(X, feature_columns, embedding_dict, support_dense=True, device='cpu'):
sparse_cols = list(filter(lambda x: isinstance(x, SparseFeat), feature_columns))
dense_cols = list(filter(lambda x: isinstance(x, DenseFeat), feature_columns))
varlen_cols = list(filter(lambda x: isinstance(x, VarLenSparseFeat), feature_columns))
feature_index = build_input_features(feature_columns)
if not support_dense and len(dense_cols) > 0:
raise ValueError("DenseFeat is not supported in dnn_feature_columns")
sparse_embedding = [embedding_dict[feat.embedding_name](
X[:, feature_index[feat.name][0]: feature_index[feat.name][1]].long()) for
feat in sparse_cols]
varlen_embedding = get_varlen_pooling_list(embedding_dict, X, feature_index, varlen_cols, device)
dense_value = [X[:, feature_index[feat.name][0]: feature_index[feat.name][1]] for
feat in dense_cols]
return sparse_embedding + varlen_embedding, dense_value
def combined_dnn_input(sparse_embedding_list, dense_value_list):
if len(sparse_embedding_list) > 0 and len(dense_value_list) > 0:
sparse_dnn_input = torch.flatten(
torch.cat(sparse_embedding_list, dim=-1), start_dim=1)
dense_dnn_input = torch.flatten(
torch.cat(dense_value_list, dim=-1), start_dim=1)
return torch.cat([sparse_dnn_input, dense_dnn_input], dim=-1)
elif len(sparse_embedding_list) > 0:
return torch.flatten(torch.cat(sparse_embedding_list, dim=-1), start_dim=1)
elif len(dense_value_list) > 0:
return torch.flatten(torch.cat(dense_value_list, dim=-1), start_dim=1)
else:
raise NotImplementedError
def get_feature_names(feature_columns):
features = build_input_features(feature_columns)
return list(features.keys())
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from torch import nn
import torch.utils.data as Data
import numpy as np
from recsys_dl.dataProc import SparseFeat, DenseFeat, get_feature_names
from recsys_dl.models import *
from recsys_dl.utils import *
BATCH_SIZE, LEARNING_RATE, num_epochs = 256, 0.01, 10
if name == 'main':
data = pd.read_csv('./criteo_sample.txt').sample(100)
sparse_features = ['C' + str(i) for i in range(1, 27)]
dense_features = ['I' + str(i) for i in range(1, 14)]
data[sparse_features] = data[sparse_features].fillna('-1', )
data[dense_features] = data[dense_features].fillna(0, )
target = ['label']
for feat in sparse_features:
lbe = LabelEncoder()
data[feat] = lbe.fit_transform(data[feat])
mms = MinMaxScaler(feature_range=(0, 1))
data[dense_features] = mms.fit_transform(data[dense_features])
sparse_cols = [SparseFeat(feat, data[feat].nunique()) for feat in sparse_features]
dense_cols = [DenseFeat(feat, 1, ) for feat in dense_features]
feature_names = get_feature_names(sparse_cols + dense_cols)
train, test = train_test_split(data, test_size=0.2, random_state=2020)
train_model_input = {name: train[name] for name in feature_names}
test_model_input = {name: test[name] for name in feature_names}
feature_index = build_input_features(sparse_cols + dense_cols)
if isinstance(train_model_input, dict):
train_model_input = [train_model_input[feature] for feature in feature_index]
for i in range(len(train_model_input)):
if len(train_model_input[i].shape) == 1:
train_model_input[i] = np.expand_dims(train_model_input[i], axis=1)
train_tensor_data = Data.TensorDataset(
torch.from_numpy(
np.concatenate(train_model_input, axis=-1)),
torch.from_numpy(train[target].values))
train_iter = Data.DataLoader(dataset=train_tensor_data, batch_size=BATCH_SIZE, shuffle=True)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = xDeepFM(sparse_cols=sparse_cols, dense_cols=dense_cols, device=device)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
loss = nn.CrossEntropyLoss()
for X, y in train_iter:
x = X.to(device).float()
pred = model(x)
print(pred, y)
break
# print(model)
import torch
from torch import nn
from recsys_dl.dataProc import build_input_features
from recsys_dl.models.linear import Linear
from recsys_dl.layers.core import PredictionLayer
class MLR(nn.Module):
def init(self, region_cols, base_cols=None, bias_cols=None, region_num=4,
l2_reg=1e-5, init_std=0.0001, seed=1024, device='cpu'):
super(MLR, self).init()
self.l2_reg = l2_reg
self.init_std = init_std
self.seed = seed
self.device = device
self.region_num = region_num
self.region_cols = region_cols
self.base_cols = base_cols
self.bias_cols = bias_cols
if base_cols is None or len(base_cols) == 0:
self.base_cols = region_cols
if bias_cols is None:
self.bias_cols = []
self.feature_index = build_input_features(self.region_cols + self.base_cols + self.bias_cols)
self.region_model = nn.ModuleList([Linear(self.region_cols, self.feature_index) for _ in
range(self.region_num)])
self.base_model = nn.ModuleList([Linear(self.base_cols, self.feature_index) for _ in
range(self.region_num)])
if self.bias_cols is not None and len(self.bias_cols) > 0:
self.bias_model = nn.Sequential(Linear(self.bias_cols, self.feature_index))
self.out = PredictionLayer()
self.to(self.device)
def forward(self, X):
region_input = torch.cat([self.region_model[i](X) for i in range(self.region_num)], dim=-1)
region_out = nn.Softmax(dim=-1)(region_input)
linear_input = torch.cat([self.base_model[i](X) for i in range(self.region_num)], dim=-1)
base_out = self.out(linear_input)
final_logit = torch.sum(region_out * base_out, dim=-1, keepdim=True)
if self.bias_feature_columns is not None and len(self.bias_feature_columns) > 0:
bias_score = self.bias_model(X)
final_logit += bias_score
return final_logit