zoukankan      html  css  js  c++  java
  • mrjob 使用 mongoldb 数据源【转】

    最近做的事情是用mrjob写mapreduce程序,从mongo读取数据。我的做法很容易也很好懂,因为mrjob可以支持sys.stdin的读取,所以我考虑用一个python程序读mongo中的数据,然后同时让mrjob脚本接受输入,处理,输出。

    具体方式:

    readInMongoDB.py:

    #coding:UTF-8
    '''
    Created on 2014年5月28日
    
    @author: hao
    '''
    import pymongo
    pyconn = pymongo.Connection(host,port=27017)
    pycursor = pyconn.userid_cid_score.find().batch_size(30)
    for i in pycursor:
        userId = i['userId']
        cid = i['cid']
        score = i['score']
    #     temp = list()
    #     temp.append(userId)
    #     temp.append(cid)
    #     temp.append(score)
        print str(userId)+','+str(cid)+','+str(score)
    
     

    step1.py:

    #coding:UTF-8
    '''
    Created on 2014年5月27日
    
    @author: hao
    '''
    from mrjob.job import MRJob
    # from mrjob import protocol
    import pymongo
    import logging
    import simplejson as sj
    
    class step(MRJob):
        '''
        '''
    #     logging.c
        def parseMatrix(self, _, line):
            '''
            input one stdin for pymongo onetime search
            output contentId, (userId, rating)
            '''
            line = (str(line))
            line=line.split(',')
            userId = line[0]
    #         print userId
            cid = line[1]
    #         print cid
            score = float(line[2])
    #         print score
            yield cid, (userId, float(score))        
    
        
        def scoreCombine(self, cid, userRating):
            '''
            将对同一个内容的(用户,评分)拼到一个list里
            '''
            userRatings = list()
            for i in userRating:
                userRatings.append(i)
            yield cid, userRatings
            
        def userBehavior(self, cid, userRatings):
            '''        
            '''
            scoreList = list()
            for doc in userRatings:
                # 每个combiner结果
                for i in doc:
                    scoreList.append(i)
            for user1 in scoreList:
                for user2 in scoreList:
                    if user1[0] == user2[0]:
                        continue
                    yield (user1[0], user2[0]), (user1[1], user2[1])
        
        
        def steps(self):
            return [self.mr(mapper = self.parseMatrix,
                            reducer = self.scoreCombine),
                    self.mr(reducer = self.userBehavior),]
        
        
    if __name__=='__main__':
        
        fp = open('a.txt','w')
        fp.write('[')
        step.run()
        fp.write(']')
        fp.close()

    然后执行脚本  python readInMongoDB.py | python step1.py >> out.txt 

    这个方式在本地执行的非常好,没有任何问题(除开mrjob速度的问题,其实在本次应用中影响不大)

    原文:http://blog.csdn.net/whzhcahzxh/article/details/29587059

  • 相关阅读:
    Spring Boot 使用常见问题
    JPA 使用
    JPA 复杂查询
    Gitlab利用Webhook实现Push代码后的jenkins自动构建
    消息中间件的使用场景
    消息中间件资料整理
    js原生跨域--用script标签实现
    遍历类成员并赋值
    Spring boot 读取 application.properties 文件方法
    fastjson 进行json转实体类对象及一些常用操作
  • 原文地址:https://www.cnblogs.com/olivetree123/p/4939429.html
Copyright © 2011-2022 走看看