zoukankan      html  css  js  c++  java
  • libin_job_01_reduce.py

    robby_map.py.bak   
    #!/bin/python
    #encoding=utf-8
    import sys
    import os
    import time

    MAP_COUNT=23

    def map_process(istream, ostream) :
    #    print "HERE"
        if (len(sys.argv)==3):
            time_stamp = sys.argv[1] 
            file_name = sys.argv[2]
        else:
            return 1
        file_rule = open(file_name, 'r')
        rule_list = []
        for line in file_rule:
            if(line.rstrip(' ').strip(' ')!=""):
                items = line.rstrip(' ').split(' ')
                rule_list.append(tuple(items))
        file_rule.close()    
        #print rule_list
        err_line_cnt  = 0
        line_cnt      = 0
        for line in istream :
            line_cnt += 1
            record = line.rstrip(' ').split(' ')
            if (len(record) < MAP_COUNT) : 
                err_line_cnt += 1
                continue
            userid = record[1]
            ideaid = record[4]
            wordid = record[3]
            price  = record[9]
            query  = record[11]
            pid    = record[16]
            channel = record[19]
           # print channel
           # print "GG"
            for rule in rule_list :
                rule_packid = rule[0]
                rule_userid = rule[1]
                if(userid == rule_userid) :
                    ostream.write('query %s %s %s %s %s '%(rule_packid, pid, userid, channel, price))
                    continue
                else :
                    pass
        print >> sys.stderr, "%s %s: totally %u lines processed with %u error lines" %
        (time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, err_line_cnt)
                    
    if __name__ == '__main__':
        #map_process(sys.stdin, sys.stdout)
        try :
            map_process(sys.stdin, sys.stdout)
        except Exception, e :                                                       
            print >> sys.stderr, "%s %s: failed to process file, [err_msg:%s]"
            % (time.asctime(time.localtime(time.time())), os.path.basename(__file__), e)
            sys.exit(-1)
        sys.exit(0)
    
    
    [work@yx-testing-ecom124.vm.baidu.com hadoop_prac]$ cat robby_reduce.py.bak
    #!/bin/python
    #encoding=utf-8
    #/***************************************************************************
    # *
    # * Copyright (c) 2013 Baidu.com, Inc. All Rights Reserved
    # *
    # **************************************************************************/
    #
    #/**
    #  * @file    libin_job_01_reduce.py
    #  * @author  chenlibin(chenlibin@baidu.com)
    #  * @date    2013-07-21
    #**/
    
    # input      rule,userid,'word',wordid,pid,shw,clk,price
    # output     rule,userid,pid,shw,clk,price
    # input ostream.write('query	%s	%s	%s	%s	%s	%s
    '%(rule,userid,pid,shw,clk,price))
    # output rule,userid,pid,shw,clk,price
    import sys
    import os
    import time
    
    MAPCOUNT   = 6
    
    MAP_TYPE   = 0
    MAP_PACKID = 1
    MAP_PID    = 2
    MAP_USERID = 3
    MAP_CHANNEL = 4
    MAP_PRICE  = 5
    #MAP_TIME   = 6
    
    PACKID    = 0
    PID     = 1
    USERID  = 2
    CHANNEL = 3
    PRICE   = 4
    #TIME    = 5
    
    dic = {}
    def output_cacheA(cache, ostream) :
        if cache[PACKID] is not None:
            ostream.write("%s	%s	%s	%d	#A
    "%(cache[PACKID], cache[USERID], cache[PID], cache[PRICE]))
            return True
        return False
    def output_cacheB(cache, ostream) :
        if cache[PACKID] is not None:
            ostream.write("%s	%s	%s	%d	%d	%d	%d	#B
    "%(cache[PACKID], cache[USERID], cache[PID], dic[201], dic[204], dic[225], dic[227]))
            return True
        return False
    def load_cacheA(record, cacheA) :
        cacheA[PACKID]      = record[MAP_PACKID]
        cacheA[USERID]    = record[MAP_USERID]
        cacheA[PID]       = record[MAP_PID]
        cacheA[PRICE]     = int(record[MAP_PRICE])
        #cacheA[TIME]     = record[MAP_TIME]
    def load_cacheB(record, cacheB):
        cacheB[PACKID]  = record[MAP_PACKID]
        cacheB[PID]       = record[MAP_PID]
        cacheB[USERID]    = record[MAP_USERID]
        cacheB[PRICE]     = int(record[MAP_PRICE])
        cacheB[CHANNEL]   = int(record[MAP_CHANNEL])
        dic[cacheB[CHANNEL]] = cacheB[PRICE]
        #cacheB[TIME]     = record[MAP_TIME]
    def reduce_process(istream, ostream) :
        cacheA = [None, None, None,0,0]
        cacheB = [None, None, None,0,0,0]
        line_cnt = 0
        err_line_cnt = 0
        #print "GOOD"    
        for line in istream :
            try :
                line_cnt += 1
                record = line.rstrip('
    ').split('	')
                if (len(record) < MAPCOUNT) :
                    err_line_cnt += 1
                    print "error line:%s"%(line)
                    continue
    #print "line:%s"%(record)
                if ((record[MAP_USERID]!=cacheB[USERID]) or (cacheB[PID]!=record[MAP_PID]) or (cacheB[CHANNEL]!=int(record[MAP_CHANNEL]))):
                    if ((cacheB[PID]!=record[MAP_PID]) or (record[MAP_USERID]!=cacheB[USERID])):
                        output_cacheA(cacheA,ostream)
                        output_cacheB(cacheB,ostream)
                        load_cacheA(record, cacheA)
                        dic.clear()
                        dic[201] = 0
                        dic[204] = 0
                        dic[225] = 0
                        dic[227] =0
                        load_cacheB(record, cacheB)
                    elif (int(record[MAP_CHANNEL])!=cacheB[CHANNEL]):
                        load_cacheB(record, cacheB)
                        cacheA[PRICE] += int(record[MAP_PRICE])
                else :
                    cacheA[PRICE] += int(record[MAP_PRICE])
                    cacheB[PRICE] = int(record[MAP_PRICE])
                    #dic[cacheB[CHANNEL]] += cacheB[PRICE]
                    dic[cacheB[CHANNEL]] += cacheB[PRICE]
    
    #print 'else %s' %(cacheA)
            except Exception, e :
                print >> sys.stderr, "%s	%s: failed in line#%d, [err_msg:%s]" % 
                    (time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, e)
        output_cacheA(cacheA,ostream)
        output_cacheB(cacheB,ostream)
        print >> sys.stderr, "%s	%s: totally %u lines processed with %u error lines" % 
            (time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, err_line_cnt)
    
    
    if __name__ == '__main__':
        try :
            reduce_process(sys.stdin, sys.stdout)
        except Exception, e :
            print >> sys.stderr, "%s	%s: failed to process file, [err_msg:%s]" 
                    % (time.asctime(time.localtime(time.time())), os.path.basename(__file__), e)
            sys.exit(-1)
    
        sys.exit(0)

  • 相关阅读:
    nao机器人使用手册
    突然感觉简单的东西是最美的
    window10教育版激活失败
    linux新分区无法新建文件夹
    看看CSDN的吃相
    游戏mod启动器原理
    显示器选购指南
    维修老电视
    快乐小丑
    这猫会关水龙头,所以你只要给猫猫打开就行——华中师范大学的胖猫
  • 原文地址:https://www.cnblogs.com/robbychan/p/3786650.html
Copyright © 2011-2022 走看看