这次lab主要主要是研究click-through rate (CTR)。数据集来自于Kaggle的Criteo Labs dataset。相关ipynb文件见我github。
作业分成5个部分:one-hot encoding处理特征;构造one-hot encoding dictionary;解析CTR数据并处理特征;用逻辑回归来预测CTR;通过feature hashing来减少特征维度。
Featurize categorical data using one-hot-encoding
One-hot-encoding
这部分我们要实现one-hot encoding。我们在用实际数据处理前,先用一个包含3个样本的数据集上练习一下。样本有三个特征:什么动物,什么颜色,吃什么。其中最后一个特征可选。第一个特征有三个值:bear, cat, mouse;第二个有两个:black, tabby;第三个有两个:mouse, salmon。我们首先第一步是把(featureID, category) 映射到从0开始的连续整数。
# Data for manual OHE
# Note: the first data point does not include any value for the optional third feature
sampleOne = [(0, 'mouse'), (1, 'black')]
sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
sampleThree = [(0, 'bear'), (1, 'black'), (2, 'salmon')]
sampleDataRDD = sc.parallelize([sampleOne, sampleTwo, sampleThree])
# TODO: Replace <FILL IN> with appropriate code
sampleOHEDictManual = {}
sampleOHEDictManual[(0,'bear')] = 0
sampleOHEDictManual[(0,'cat')] = 1
sampleOHEDictManual[(0,'mouse')] = 2
sampleOHEDictManual[(1,'black')] = 3
sampleOHEDictManual[(1,'tabby')] = 4
sampleOHEDictManual[(2,'mouse')] = 5
sampleOHEDictManual[(2,'salmon')] = 6
Sparse vectors
在处理稀疏矩阵的时候,我们常用SparseVector,而不是numpy,因为能减少计算量和存储空间。我们这里就验证SparseVector和numpy的计算结果是一样的。我们看看SparseVector的用法:
pyspark.mllib.linalg.SparseVector(size, *args)[source]
size是指向量的长度,后面的参数一般是两个list,前者是存值的索引,后者是具体的值。比如
a = SparseVector(4, [1, 3], [3.0, 4.0])
a的实际样子是[0, 3.0, 0, 4.0]
import numpy as np
from pyspark.mllib.linalg import SparseVector
# TODO: Replace <FILL IN> with appropriate code
aDense = np.array([0., 3., 0., 4.])
aSparse = SparseVector(4,[1,3],[3,4])
bDense = np.array([0., 0., 0., 1.])
bSparse = SparseVector(4,{3:1})
w = np.array([0.4, 3.1, -1.4, -.5])
print aDense.dot(w)
print aSparse.dot(w)
print bDense.dot(w)
print bSparse.dot(w)
OHE features as sparse vectors
这里是把上面的特征手动转换为SparseVector
# TODO: Replace <FILL IN> with appropriate code
sampleOneOHEFeatManual = SparseVector(7,[2,3],[1,1])
sampleTwoOHEFeatManual = SparseVector(7,[1,4,5],[1,1,1])
sampleThreeOHEFeatManual = SparseVector(7,[0,3,6],[1,1,1])
Define a OHE function
这一步是通过上面我们定义的字典,用代码对数据进行OHE转换
# TODO: Replace <FILL IN> with appropriate code
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
"""Produce a one-hot-encoding from a list of features and an OHE dictionary.
Note:
You should ensure that the indices used to create a SparseVector are sorted.
Args:
rawFeats (list of (int, str)): The features corresponding to a single observation. Each
feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
OHEDict (dict): A mapping of (featureID, value) to unique integer.
numOHEFeats (int): The total number of unique OHE features (combinations of featureID and
value).
Returns:
SparseVector: A SparseVector of length numOHEFeats with indicies equal to the unique
identifiers for the (featureID, value) combinations that occur in the observation and
with values equal to 1.0.
"""
sparseIndex = np.sort(list(OHEDict[i] for i in rawFeats))
sparseValues = np.ones(len(rawFeats))
return SparseVector(numOHEFeats,sparseIndex,sparseValues)
# Calculate the number of features in sampleOHEDictManual
numSampleOHEFeats = len(sampleOHEDictManual)
# Run oneHotEnoding on sampleOne
sampleOneOHEFeat = oneHotEncoding(sampleOne,sampleOHEDictManual,numSampleOHEFeats)
print sampleOneOHEFeat
Apply OHE to a dataset
# TODO: Replace <FILL IN> with appropriate code
sampleOHEData = sampleDataRDD.map(lambda x : oneHotEncoding(x,sampleOHEDictManual,numSampleOHEFeats))
print sampleOHEData.collect()
Part 2 Construct an OHE dictionary
Pair RDD of (featureID, category)
我们现在通过代码来构造OHE字典。首先把所以的特征值放到一个RDD里,然后去重。
# TODO: Replace <FILL IN> with appropriate code
sampleDistinctFeats = (sampleDataRDD
.flatMap(lambda x: x).distinct())
OHE Dictionary from distinct features
我们现在要构造出OHE字典。主要是通过zipWithIndex和collectAsMap。前者是对RDD里的元素增加索引,后者是把RDD转换为Map。
# TODO: Replace <FILL IN> with appropriate code
sampleOHEDict = (sampleDistinctFeats
.zipWithIndex().collectAsMap())
print sampleOHEDict
Automated creation of an OHE dictionary
# TODO: Replace <FILL IN> with appropriate code
def createOneHotDict(inputData):
"""Creates a one-hot-encoder dictionary based on the input data.
Args:
inputData (RDD of lists of (int, str)): An RDD of observations where each observation is
made up of a list of (featureID, value) tuples.
Returns:
dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
unique integers.
"""
inputOHEDict = (inputData.flatMap(lambda x:x).distinct().zipWithIndex().collectAsMap())
return inputOHEDict
sampleOHEDictAuto = createOneHotDict(sampleDataRDD)
print sampleOHEDictAuto
Part 3 Parse CTR data and generate OHE features
Loading and splitting the data
首先要下载数据,这段代码在LAB1里面也看到过
# Run this code to view Criteo's agreement
from IPython.lib.display import IFrame
IFrame("http://labs.criteo.com/downloads/2014-kaggle-display-advertising-challenge-dataset/",
600, 350)
# TODO: Replace <FILL IN> with appropriate code
# Just replace <FILL IN> with the url for dac_sample.tar.gz
import glob
import os.path
import tarfile
import urllib
import urlparse
# Paste url, url should end with: dac_sample.tar.gz
url = '<FILL IN>'
url = url.strip()
baseDir = os.path.join('data')
inputPath = os.path.join('cs190', 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)
inputDir = os.path.split(fileName)[0]
def extractTar(check = False):
# Find the zipped archive and extract the dataset
tars = glob.glob('dac_sample*.tar.gz*')
if check and len(tars) == 0:
return False
if len(tars) > 0:
try:
tarFile = tarfile.open(tars[0])
except tarfile.ReadError:
if not check:
print 'Unable to open tar.gz file. Check your URL.'
return False
tarFile.extract('dac_sample.txt', path=inputDir)
print 'Successfully extracted: dac_sample.txt'
return True
else:
print 'You need to retry the download with the correct url.'
print ('Alternatively, you can upload the dac_sample.tar.gz file to your Jupyter root ' +
'directory')
return False
if os.path.isfile(fileName):
print 'File is already available. Nothing to do.'
elif extractTar(check = True):
print 'tar.gz file was already available.'
elif not url.endswith('dac_sample.tar.gz'):
print 'Check your download url. Are you downloading the Sample dataset?'
else:
# Download the file and store it in the same directory as this notebook
try:
urllib.urlretrieve(url, os.path.basename(urlparse.urlsplit(url).path))
except IOError:
print 'Unable to download and store: {0}'.format(url)
extractTar()
import os.path
baseDir = os.path.join('data')
inputPath = os.path.join('cs190', 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)
if os.path.isfile(fileName):
rawData = (sc
.textFile(fileName, 2)
.map(lambda x: x.replace(' ', ','))) # work with either ',' or ' ' separated data
print rawData.take(1)
读取完数据后,把数据集分成三份,然后缓存起来。
# TODO: Replace <FILL IN> with appropriate code
weights = [.8, .1, .1]
seed = 42
# Use randomSplit with weights and seed
rawTrainData, rawValidationData, rawTestData = rawData.randomSplit(weights,seed)
# Cache the data
rawTrainData.cache()
rawValidationData.cache()
rawTestData.cache()
nTrain = rawTrainData.count()
nVal = rawValidationData.count()
nTest = rawTestData.count()
print nTrain, nVal, nTest, nTrain + nVal + nTest
print rawData.take(1)
Extract features
因为解析出来的特征并没有行列的信息,所以我们把特征处理成(列,值)的样子,其中列是这个特征在第几列,值是指本来的值,然后统计有多少个不同的(列,值)对。
# TODO: Replace <FILL IN> with appropriate code
def parsePoint(point):
"""Converts a comma separated string into a list of (featureID, value) tuples.
Note:
featureIDs should start at 0 and increase to the number of features - 1.
Args:
point (str): A comma separated string where the first value is the label and the rest
are features.
Returns:
list: A list of (featureID, value) tuples.
"""
featuresList = point.split(',')
return list((i,featuresList[i+1]) for i in range(len(featuresList)-1))
parsedTrainFeat = rawTrainData.map(parsePoint)
numCategories = (parsedTrainFeat
.flatMap(lambda x: x)
.distinct()
.map(lambda x: (x[0], 1))
.reduceByKey(lambda x, y: x + y)
.sortByKey()
.collect())
print numCategories[2][1]
Create an OHE dictionary from the dataset
我们现在处理成了和part 2里一样了,看看这个字典的大小。
# TODO: Replace <FILL IN> with appropriate code
ctrOHEDict = createOneHotDict(parsedTrainFeat)
numCtrOHEFeats = len(ctrOHEDict.keys())
print numCtrOHEFeats
print ctrOHEDict[(0, '')]
Apply OHE to the dataset
我们在上面的基础上,把特征的Label加进去,就是实现一个parsePoint加强版。
from pyspark.mllib.regression import LabeledPoint
# TODO: Replace <FILL IN> with appropriate code
def parseOHEPoint(point, OHEDict, numOHEFeats):
"""Obtain the label and feature vector for this raw observation.
Note:
You must use the function `oneHotEncoding` in this implementation or later portions
of this lab may not function as expected.
Args:
point (str): A comma separated string where the first value is the label and the rest
are features.
OHEDict (dict of (int, str) to int): Mapping of (featureID, value) to unique integer.
numOHEFeats (int): The number of unique features in the training dataset.
Returns:
LabeledPoint: Contains the label for the observation and the one-hot-encoding of the
raw features based on the provided OHE dictionary.
"""
pointList = point.split(',')
pointLabel = pointList[0]
pointFeaturesRaw = list((i,pointList[i+1]) for i in range(len(pointList)-1))
pointFeatures = oneHotEncoding(pointFeaturesRaw,OHEDict,numOHEFeats)
return LabeledPoint(pointLabel,pointFeatures)
OHETrainData = rawTrainData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
OHETrainData.cache()
print OHETrainData.take(1)
# Check that oneHotEncoding function was used in parseOHEPoint
backupOneHot = oneHotEncoding
oneHotEncoding = None
withOneHot = False
try: parseOHEPoint(rawTrainData.take(1)[0], ctrOHEDict, numCtrOHEFeats)
except TypeError: withOneHot = True
oneHotEncoding = backupOneHot
Handling unseen features
假如测试集和验证集里面有的特征没有出现在训练集里面,所以我们要更新oneHotEncoding()来对付之前没有出现过的特征值。
# TODO: Replace <FILL IN> with appropriate code
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
"""Produce a one-hot-encoding from a list of features and an OHE dictionary.
Note:
If a (featureID, value) tuple doesn't have a corresponding key in OHEDict it should be
ignored.
Args:
rawFeats (list of (int, str)): The features corresponding to a single observation. Each
feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
OHEDict (dict): A mapping of (featureID, value) to unique integer.
numOHEFeats (int): The total number of unique OHE features (combinations of featureID and
value).
Returns:
SparseVector: A SparseVector of length numOHEFeats with indicies equal to the unique
identifiers for the (featureID, value) combinations that occur in the observation and
with values equal to 1.0.
"""
crossList = list(OHEDict.get(i,'-1') for i in rawFeats)
sparseIndex = np.sort([elem for elem in crossList if elem != "-1"])
sparseValues = np.ones(len(sparseIndex))
return SparseVector(numOHEFeats,sparseIndex,sparseValues)
OHEValidationData = rawValidationData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
OHEValidationData.cache()
print OHEValidationData.take(1)
Part 4 CTR prediction and logloss evaluation
Logistic regression
前面把数据处理好了,现在需要训练我们的分类器了,这里用到的是逻辑回归。主要的思路是,用LogisticRegressionWithSGD训练,得到模型LogisticRegressionModel。
from pyspark.mllib.classification import LogisticRegressionWithSGD
# fixed hyperparameters
numIters = 50
stepSize = 10.
regParam = 1e-6
regType = 'l2'
includeIntercept = True
model0 = LogisticRegressionWithSGD.train(OHETrainData, iterations=numIters, step=stepSize,regParam=regParam, regType=regType, intercept=includeIntercept)
sortedWeights = sorted(model0.weights)
print sortedWeights[:5], model0.intercept
Log loss
# TODO: Replace <FILL IN> with appropriate code
from math import log
def computeLogLoss(p, y):
"""Calculates the value of log loss for a given probabilty and label.
Note:
log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it
and when p is 1 we need to subtract a small value (epsilon) from it.
Args:
p (float): A probabilty between 0 and 1.
y (int): A label. Takes on the values 0 and 1.
Returns:
float: The log loss value.
"""
epsilon = 10e-12
if p == 0 :
p += epsilon
if p == 1 :
p -= epsilon
if y == 1 :
return -log(p)
if y == 0 :
return -log(1-p)
Baseline log loss
现在我们要用上面写的loss function来计算训练集的Baseline Train Logloss。这里用标签的平均值。
# TODO: Replace <FILL IN> with appropriate code
# Note that our dataset has a very high click-through rate by design
# In practice click-through rate can be one to two orders of magnitude lower
classOneFracTrain = OHETrainData.map(lambda lp: lp.label).sum()/OHETrainData.count()
print classOneFracTrain
logLossTrBase = OHETrainData.map(lambda lp : computeLogLoss(classOneFracTrain,lp.label)).sum()/OHETrainData.count()
print 'Baseline Train Logloss = {0:.3f}
'.format(logLossTrBase)
Predicted probability
通过上面训练好的模型来计算概率
# TODO: Replace <FILL IN> with appropriate code
from math import exp # exp(-t) = e^-t
def getP(x, w, intercept):
"""Calculate the probability for an observation given a set of weights and intercept.
Note:
We'll bound our raw prediction between 20 and -20 for numerical purposes.
Args:
x (SparseVector): A vector with values of 1.0 for features that exist in this
observation and 0.0 otherwise.
w (DenseVector): A vector of weights (betas) for the model.
intercept (float): The model's intercept.
Returns:
float: A probability between 0 and 1.
"""
rawPrediction = 1 / (1 + exp(-x.dot(w)-intercept))
# Bound the raw prediction value
rawPrediction = min(rawPrediction, 20)
rawPrediction = max(rawPrediction, -20)
return rawPrediction
trainingPredictions = OHETrainData.map(lambda lp: getP(lp.features,model0.weights,model0.intercept))
print trainingPredictions.take(5)
valuate the model
# TODO: Replace <FILL IN> with appropriate code
def evaluateResults(model, data):
"""Calculates the log loss for the data given the model.
Args:
model (LogisticRegressionModel): A trained logistic regression model.
data (RDD of LabeledPoint): Labels and features for each observation.
Returns:
float: Log loss for the data.
"""
dataPrediction = data.map(lambda lp : (getP(lp.features,model.weights,model.intercept),lp.label))
logLoss = dataPrediction.map(lambda (x,y) : computeLogLoss(x,y)).sum() / dataPrediction.count()
return logLoss
logLossTrLR0 = evaluateResults(model0, OHETrainData)
print ('OHE Features Train Logloss:
Baseline = {0:.3f}
LogReg = {1:.3f}'
.format(logLossTrBase, logLossTrLR0))
Validation log loss
验证集上又计算一遍。。。
# TODO: Replace <FILL IN> with appropriate code
logLossValBase = OHEValidationData.map(lambda lp : computeLogLoss(classOneFracTrain,lp.label)).sum()/OHEValidationData.count()
logLossValLR0 = evaluateResults(model0, OHEValidationData)
print ('OHE Features Validation Logloss:
Baseline = {0:.3f}
LogReg = {1:.3f}'
.format(logLossValBase, logLossValLR0))
我们在这里通过改变不同的阈值来获得ROC曲线。
Part 5 Reduce feature dimension via feature hashing
上面的例子告诉我们,通过OHE,我们可以获得一个不错的准确率,但是特征的个数太多了,多达233K个。所以我们需要feature hashing。
Hash function
from collections import defaultdict
import hashlib
def hashFunction(numBuckets, rawFeats, printMapping=False):
"""Calculate a feature dictionary for an observation's features based on hashing.
Note:
Use printMapping=True for debug purposes and to better understand how the hashing works.
Args:
numBuckets (int): Number of buckets to use as features.
rawFeats (list of (int, str)): A list of features for an observation. Represented as
(featureID, value) tuples.
printMapping (bool, optional): If true, the mappings of featureString to index will be
printed.
Returns:
dict of int to float: The keys will be integers which represent the buckets that the
features have been hashed to. The value for a given key will contain the count of the
(featureID, value) tuples that have hashed to that key.
"""
mapping = {}
for ind, category in rawFeats:
featureString = category + str(ind)
mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
if(printMapping): print mapping
sparseFeatures = defaultdict(float)
for bucket in mapping.values():
sparseFeatures[bucket] += 1.0
return dict(sparseFeatures)
# Reminder of the sample values:
# sampleOne = [(0, 'mouse'), (1, 'black')]
# sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
# sampleThree = [(0, 'bear'), (1, 'black'), (2, 'salmon')]
# TODO: Replace <FILL IN> with appropriate code
# Use four buckets
sampOneFourBuckets = hashFunction(4, sampleOne, True)
sampTwoFourBuckets = hashFunction(4, sampleTwo, True)
sampThreeFourBuckets = hashFunction(4, sampleThree, True)
# Use one hundred buckets
sampOneHundredBuckets = hashFunction(100, sampleOne, True)
sampTwoHundredBuckets = hashFunction(100, sampleTwo, True)
sampThreeHundredBuckets = hashFunction(100, sampleThree, True)
print ' 4 Buckets 100 Buckets'
print 'SampleOne: {0} {1}'.format(sampOneFourBuckets, sampOneHundredBuckets)
print 'SampleTwo: {0} {1}'.format(sampTwoFourBuckets, sampTwoHundredBuckets)
print 'SampleThree: {0} {1}'.format(sampThreeFourBuckets, sampThreeHundredBuckets)
Creating hashed features
这几步和上面差不多
# TODO: Replace <FILL IN> with appropriate code
def parseHashPoint(point, numBuckets):
"""Create a LabeledPoint for this observation using hashing.
Args:
point (str): A comma separated string where the first value is the label and the rest are
features.
numBuckets: The number of buckets to hash to.
Returns:
LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed
features.
"""
pointList = point.split(',')
pointSize = len(pointList) -1
pointLabel = pointList[0]
pointFeatureRaw = list((i,pointList[i+1]) for i in range(pointSize ))
pointFeature = SparseVector(numBuckets,hashFunction(numBuckets , pointFeatureRaw, True))
return LabeledPoint(pointLabel,pointFeature)
numBucketsCTR = 2 ** 15
hashTrainData = rawTrainData.map(lambda x:parseHashPoint(x,numBucketsCTR))
hashTrainData.cache()
hashValidationData = rawValidationData.map(lambda x:parseHashPoint(x,numBucketsCTR))
hashValidationData.cache()
hashTestData = rawTestData.map(lambda x:parseHashPoint(x,numBucketsCTR))
hashTestData.cache()
print hashTrainData.take(1)
Sparsity
计算稀疏度。。定义在注释里面有
# TODO: Replace <FILL IN> with appropriate code
def computeSparsity(data, d, n):
"""Calculates the average sparsity for the features in an RDD of LabeledPoints.
Args:
data (RDD of LabeledPoint): The LabeledPoints to use in the sparsity calculation.
d (int): The total number of features.
n (int): The number of observations in the RDD.
Returns:
float: The average of the ratio of features in a point to total features.
"""
return data.map(lambda x: len(x.features.values)).sum()/float(d*n)
averageSparsityHash = computeSparsity(hashTrainData, numBucketsCTR, nTrain)
averageSparsityOHE = computeSparsity(OHETrainData, numCtrOHEFeats, nTrain)
print 'Average OHE Sparsity: {0:.7e}'.format(averageSparsityOHE)
print 'Average Hash Sparsity: {0:.7e}'.format(averageSparsityHash)
Logistic model with hashed features
numIters = 500
regType = 'l2'
includeIntercept = True
# Initialize variables using values from initial model training
bestModel = None
bestLogLoss = 1e10
# TODO: Replace <FILL IN> with appropriate code
stepSizes = (1,10)
regParams = (1e-6,1e-3)
for stepSize in stepSizes:
for regParam in regParams:
model = (LogisticRegressionWithSGD
.train(hashTrainData, numIters, stepSize, regParam=regParam, regType=regType,
intercept=includeIntercept))
logLossVa = evaluateResults(model, hashValidationData)
print (' stepSize = {0:.1f}, regParam = {1:.0e}: logloss = {2:.3f}'
.format(stepSize, regParam, logLossVa))
if (logLossVa < bestLogLoss):
bestModel = model
bestLogLoss = logLossVa
print ('Hashed Features Validation Logloss:
Baseline = {0:.3f}
LogReg = {1:.3f}'
.format(logLossValBase, bestLogLoss))
Evaluate on the test set
和上面一样。。。
# TODO: Replace <FILL IN> with appropriate code
# Log loss for the best model from (5d)
logLossTest = evaluateResults(bestModel, hashTestData)
# Log loss for the baseline model
logLossTestBaseline = hashTestData.map(lambda lp : computeLogLoss(classOneFracTrain,lp.label )).sum() / hashTestData.count()
print ('Hashed Features Test Log Loss:
Baseline = {0:.3f}
LogReg = {1:.3f}'
.format(logLossTestBaseline, logLossTest))