zoukankan      html  css  js  c++  java
  • 利用Mrjob实现Weighted Slope One算法

    前面的文章"Weight Slope One算法"介绍了Weight Slope One算法,这个算法通常用于评分的预测,这种预测通常基于大数据,这篇文章将要讲述的就是hadoop下实现Weight Slpope One算法。Mrjob是python中的一个hadoop框架,为什么用python呢?因为使用python开发起来比较快,而且hadoop程序多是I/O密集型,所以用python比用java慢不了多少。用Mrjob编写和调试hadoop程序是非常简单和直观的,但是它内部一个序列化过程对性能产生的损害,因此现在用mrjob,还是感觉慢了些,希望以后它能修正这个问题。这里有一篇文章(点击这里)对python的hadoop框架做了个挺不错总结。关于mrjob怎么用的问题,这篇文章(点击这里)给出了介绍。总之mrjob就是入门快,开发快,调试快,运行慢。

    步入正题。通过前面的文章我们知道weight slope one算法假设项目之间的评分符合y=x+d模型。实现weigthedslopeone算法需要多个map-reduce过程,mrjob已经实现了这种chain map-reduce过程,上一次的map-reduce的输入直接变成下一次mapreduce的输出。下面分步介绍。

    输入文件:

    两个,均为为文本文件,第一个是训练样本,格式为:userId,movieId,score

    第二个文件为需要预测的数据,格式为: userId,movieId. 需要预测userId,对movieId的评分。

    为了说明方便,这里给出一个数据作为例子,数据如下:

    trainsmall.txt

    A   1   2
    A   2   3
    A   3   1
    B   1   3
    B   2   5
    B   3   3
    C   1   4
    C   3   2
    D   1   3
    D   2   4

    predictsmall.txt

    C   2
    D   3

    第一步:产生user-item数据

    输入: 训练数据,预测数据

    输出:

    key: userId

    value: 元组---(movieId,score),如果没有评分,那么score为-1

     def mapper1(self,_,line):
            item=line.split()
            if len(item)>2:
                yield item[0],(item[1],item[2])
            elif(len(item)==2):
                yield item[0],(item[1],-1)
            
    def reducer1(self,key,value):
            for v in value:
                yield key,v

    小样例这一步的输出:

    "A" ["1", "2"]
    "A" ["2", "3"]
    "A" ["3", "1"]
    "B" ["1", "3"]
    "B" ["2", "5"]
    "B" ["3", "3"]
    "C" ["1", "4"]
    "C" ["2", -1]
    "C" ["3", "2"]
    "D" ["1", "3"]
    "D" ["2", "4"]
    "D" ["3", -1]

    第二步:产生产生两个电影的评分差

    需要注意的是,只需要这两个电影必须来自同一个人,那么它们的评分差才是有意义的,因此上一步以userId作为key。

    这一步不需要map过程。

    输入:上一步的输出

    输出: 

    key: (movieA,movieB)    注意,movieA要大于movieB,否则会产生重复,浪费资源,并且他们来自同一个人。

    value: 如果movieA和movieB都已经评分,那么输出(userId,分数差,-1)

    其中-1只是一个标志,为了区分另一种输出。

    如果movieA,movieB中有一个是需要预测的(注意,最多有一个是需要预测的,可以看下面的代码),输出 (userId,对已经预测的那个电影的评分,1或者0)

    需要说明的是,为了节约空间和计算量,规定将movieA和movieB中较大的放在前面,这种顺序对于第一种输出没有影响,但是对于第二种输出必须做出区分,因此1表示第二个是需要预测的,0表示第一个是需要预测的。

    def reducer2(self,key,value):
            rateM={}
            preM=[]
            for movieid,rate in value:
                if rate==-1.0:
                    preM.append(movieid)
                else:
                    rateM[movieid]=rate
                    
            for i,itemi in enumerate(rateM.iterkeys()):
                for j,itemj in enumerate(rateM.iterkeys()):
                    if j<=i :continue
                    if itemi>itemj:
                        yield (itemi,itemj),(key,float(rateM[itemi])-float(rateM[itemj]),-1)
                    else:
                        yield (itemj,itemi),(key,float(rateM[itemj])-float(rateM[itemi]),-1)
                    
    
            for rm,rate in rateM.iteritems():
                for pm in preM:
                    if rm>pm:
                        yield (rm,pm),(key,rate,1)   # the second is the one to be predicted
                    else:
                        yield (pm,rm),(key,rate,0)   # the first is the one to be predicted

    样例的输出:

    ["3", "1"] ["A", -1.0, -1]
    ["2", "1"] ["A", 1.0, -1]
    ["3", "2"] ["A", -2.0, -1]
    ["3", "1"] ["B", 0.0, -1]
    ["2", "1"] ["B", 2.0, -1]
    ["3", "2"] ["B", -2.0, -1]
    ["3", "1"] ["C", -2.0, -1]
    ["2", "1"] ["C", "4", 0]
    ["3", "2"] ["C", "2", 1]
    ["2", "1"] ["D", 1.0, -1]
    ["3", "1"] ["D", "3", 0]
    ["3", "2"] ["D", "4", 0]

    第三步:得到预测分数和预测频率

    不需要map过程

    输入:上一步的输出

    输出:

    key:(userId,movieId)    表示userId需要预测的项目movieId

    value: (rate,fres)   表示userId对movieId的评分的一个预测

     def reducer3(self,key,value):
            total=0
            fre=0
            needPredict=[]
            for userId,diff,flag in value:
                if flag!=-1:
                    #yield key,(userId,diff,flag)
                    needPredict.append((userId,float(diff),flag))
                else:
                    total+=float(diff)
                    fre+=1
                    
            if fre>0 and len(needPredict)>0:
                avg=total/fre
                for userId,rate,flag in needPredict:
                    if float(flag)==0:
                        predictRate=avg+rate
                        mp=key[0]
                    else:
                        predictRate=rate-avg
                        mp=key[1]
                    yield (userId,mp),(predictRate,fre)

    样例输出:

    ["C", "2"] [5.333333333333333, 3]
    ["D", "3"] [2.0, 3]
    ["C", "2"] [4.0, 2]
    ["D", "3"] [2.0, 2]

    第四步:得到预测分数

    不需要map过程

    输入:上一次的输出

    输出:最终的预测

    def reducer4(self,key,value):
            totalFre=0
            totalRate=0
            for predictRate,fre in value:
                totalRate+=predictRate*fre
                totalFre+=fre
            yield key,totalRate/totalFre

    样例输出:

    ["C", "2"] 4.7999999999999998
    ["D", "3"] 2.0

    完整代码:

    from mrjob.job import MRJob 
    
    class PredictBySlopeOne(MRJob):
        # first step, generate user-item table, input is the train.txt and the predict predict.txt
        def mapper1(self,_,line):
            item=line.split()
            if len(item)>2:
                yield item[0],(item[1],item[2])
            elif(len(item)==2):
                yield item[0],(item[1],-1)
            
        def reducer1(self,key,value):
            for v in value:
                yield key,v  
                
        def reducer2(self,key,value):
            rateM={}
            preM=[]
            for movieid,rate in value:
                if rate==-1.0:
                    preM.append(movieid)
                else:
                    rateM[movieid]=rate
                    
            for i,itemi in enumerate(rateM.iterkeys()):
                for j,itemj in enumerate(rateM.iterkeys()):
                    if j<=i :continue
                    if itemi>itemj:
                        yield (itemi,itemj),(key,float(rateM[itemi])-float(rateM[itemj]),-1)
                    else:
                        yield (itemj,itemi),(key,float(rateM[itemj])-float(rateM[itemi]),-1)
                    
    
            for rm,rate in rateM.iteritems():
                for pm in preM:
                    if rm>pm:
                        yield (rm,pm),(key,rate,1)   # the second is the one to be predicted
                    else:
                        yield (pm,rm),(key,rate,0)   # the first is the one to be predicted
                        
        
        def reducer3(self,key,value):
            total=0
            fre=0
            needPredict=[]
            for userId,diff,flag in value:
                if flag!=-1:
                    #yield key,(userId,diff,flag)
                    needPredict.append((userId,float(diff),flag))
                else:
                    total+=float(diff)
                    fre+=1
                    
            if fre>0 and len(needPredict)>0:
                avg=total/fre
                for userId,rate,flag in needPredict:
                    if float(flag)==0:
                        predictRate=avg+rate
                        mp=key[0]
                    else:
                        predictRate=rate-avg
                        mp=key[1]
                    yield (userId,mp),(predictRate,fre)
                
                
        def reducer4(self,key,value):
            totalFre=0
            totalRate=0
            for predictRate,fre in value:
                totalRate+=predictRate*fre
                totalFre+=fre
            yield key,totalRate/totalFre
                
                              
    
        def steps(self):
            return [self.mr(mapper=self.mapper1,combiner=self.reducer1,reducer=self.reducer1),
                    self.mr(reducer=self.reducer2),
                    self.mr(reducer=self.reducer3),
                    self.mr(reducer=self.reducer4)]
    
    
    if __name__=="__main__":
        PredictBySlopeOne.run()
  • 相关阅读:
    java修饰符 protect public protected
    java中interface使用
    java中super的用法
    引用的一道JAVA题目
    java中==和equals的区别(转)
    2019PHP面试题最全面归纳总结
    (一)PHP基础知识考察点
    Linux常用命令大全(非常全!!!)
    MAMP mysql无法启动 总结(以后有发现再添加)
    win 安装composer (详细教程)
  • 原文地址:https://www.cnblogs.com/naniJser/p/3054832.html
Copyright © 2011-2022 走看看