robby_map.py.bak
#!/bin/python
#encoding=utf-8
import sys
import os
import time
MAP_COUNT=23
def map_process(istream, ostream) :
# print "HERE"
if (len(sys.argv)==3):
time_stamp = sys.argv[1]
file_name = sys.argv[2]
else:
return 1
file_rule = open(file_name, 'r')
rule_list = []
for line in file_rule:
if(line.rstrip('
').strip(' ')!=""):
items = line.rstrip('
').split(' ')
rule_list.append(tuple(items))
file_rule.close()
#print rule_list
err_line_cnt = 0
line_cnt = 0
for line in istream :
line_cnt += 1
record = line.rstrip('
').split(' ')
if (len(record) < MAP_COUNT) :
err_line_cnt += 1
continue
userid = record[1]
ideaid = record[4]
wordid = record[3]
price = record[9]
query = record[11]
pid = record[16]
channel = record[19]
# print channel
# print "GG"
for rule in rule_list :
rule_packid = rule[0]
rule_userid = rule[1]
if(userid == rule_userid) :
ostream.write('query %s %s %s %s %s
'%(rule_packid, pid, userid, channel, price))
continue
else :
pass
print >> sys.stderr, "%s %s: totally %u lines processed with %u error lines" %
(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, err_line_cnt)
if __name__ == '__main__':
#map_process(sys.stdin, sys.stdout)
try :
map_process(sys.stdin, sys.stdout)
except Exception, e :
print >> sys.stderr, "%s %s: failed to process file, [err_msg:%s]"
% (time.asctime(time.localtime(time.time())), os.path.basename(__file__), e)
sys.exit(-1)
sys.exit(0)
[work@yx-testing-ecom124.vm.baidu.com hadoop_prac]$ cat robby_reduce.py.bak
#!/bin/python
#encoding=utf-8
#/***************************************************************************
# *
# * Copyright (c) 2013 Baidu.com, Inc. All Rights Reserved
# *
# **************************************************************************/
#
#/**
# * @file libin_job_01_reduce.py
# * @author chenlibin(chenlibin@baidu.com)
# * @date 2013-07-21
#**/
# input rule,userid,'word',wordid,pid,shw,clk,price
# output rule,userid,pid,shw,clk,price
# input ostream.write('query %s %s %s %s %s %s
'%(rule,userid,pid,shw,clk,price))
# output rule,userid,pid,shw,clk,price
import sys
import os
import time
MAPCOUNT = 6
MAP_TYPE = 0
MAP_PACKID = 1
MAP_PID = 2
MAP_USERID = 3
MAP_CHANNEL = 4
MAP_PRICE = 5
#MAP_TIME = 6
PACKID = 0
PID = 1
USERID = 2
CHANNEL = 3
PRICE = 4
#TIME = 5
dic = {}
def output_cacheA(cache, ostream) :
if cache[PACKID] is not None:
ostream.write("%s %s %s %d #A
"%(cache[PACKID], cache[USERID], cache[PID], cache[PRICE]))
return True
return False
def output_cacheB(cache, ostream) :
if cache[PACKID] is not None:
ostream.write("%s %s %s %d %d %d %d #B
"%(cache[PACKID], cache[USERID], cache[PID], dic[201], dic[204], dic[225], dic[227]))
return True
return False
def load_cacheA(record, cacheA) :
cacheA[PACKID] = record[MAP_PACKID]
cacheA[USERID] = record[MAP_USERID]
cacheA[PID] = record[MAP_PID]
cacheA[PRICE] = int(record[MAP_PRICE])
#cacheA[TIME] = record[MAP_TIME]
def load_cacheB(record, cacheB):
cacheB[PACKID] = record[MAP_PACKID]
cacheB[PID] = record[MAP_PID]
cacheB[USERID] = record[MAP_USERID]
cacheB[PRICE] = int(record[MAP_PRICE])
cacheB[CHANNEL] = int(record[MAP_CHANNEL])
dic[cacheB[CHANNEL]] = cacheB[PRICE]
#cacheB[TIME] = record[MAP_TIME]
def reduce_process(istream, ostream) :
cacheA = [None, None, None,0,0]
cacheB = [None, None, None,0,0,0]
line_cnt = 0
err_line_cnt = 0
#print "GOOD"
for line in istream :
try :
line_cnt += 1
record = line.rstrip('
').split(' ')
if (len(record) < MAPCOUNT) :
err_line_cnt += 1
print "error line:%s"%(line)
continue
#print "line:%s"%(record)
if ((record[MAP_USERID]!=cacheB[USERID]) or (cacheB[PID]!=record[MAP_PID]) or (cacheB[CHANNEL]!=int(record[MAP_CHANNEL]))):
if ((cacheB[PID]!=record[MAP_PID]) or (record[MAP_USERID]!=cacheB[USERID])):
output_cacheA(cacheA,ostream)
output_cacheB(cacheB,ostream)
load_cacheA(record, cacheA)
dic.clear()
dic[201] = 0
dic[204] = 0
dic[225] = 0
dic[227] =0
load_cacheB(record, cacheB)
elif (int(record[MAP_CHANNEL])!=cacheB[CHANNEL]):
load_cacheB(record, cacheB)
cacheA[PRICE] += int(record[MAP_PRICE])
else :
cacheA[PRICE] += int(record[MAP_PRICE])
cacheB[PRICE] = int(record[MAP_PRICE])
#dic[cacheB[CHANNEL]] += cacheB[PRICE]
dic[cacheB[CHANNEL]] += cacheB[PRICE]
#print 'else %s' %(cacheA)
except Exception, e :
print >> sys.stderr, "%s %s: failed in line#%d, [err_msg:%s]" %
(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, e)
output_cacheA(cacheA,ostream)
output_cacheB(cacheB,ostream)
print >> sys.stderr, "%s %s: totally %u lines processed with %u error lines" %
(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, err_line_cnt)
if __name__ == '__main__':
try :
reduce_process(sys.stdin, sys.stdout)
except Exception, e :
print >> sys.stderr, "%s %s: failed to process file, [err_msg:%s]"
% (time.asctime(time.localtime(time.time())), os.path.basename(__file__), e)
sys.exit(-1)
sys.exit(0)