输入文件:20130712000000
格式:packageid ruleid pid userid shw clk price time
输入文件: ./cache_data/0-9
格式为:userid 客服id 运营单位。其中0文件里存放的是所有userid%10=0的数据,5文件里存放的是所有userid%10=5的数据,其他一样。
部分数据格式为 userid ,这里客服id=”” 运营单位=””
任务:生成以下格式的数据。
Packageid 运营单位 影响的用户数 消费额度
即根据包来获取包所影响的所有用户userid,并计算出这些用户所在的运营单位,然后按照运营单位为维度,来分别统计用户数和消费额
#[work@yx-testing-ecom124.vm.baidu.com 0724]$ cat deal_union_07.py
#!/bin/python
#encoding=utf-8
import sys
import os
import time
MAP_COUNT = 8
def output_dic(flag, dic, f) :
if (flag != "") :
for key in dic.keys() :
f.write("%s %s %d %d
"%(flag, key, dic[key][1], dic[key][2]))
return True
return False
def main_07(fname_2013 = "./20130712000000", cache_2013 = "./cache_data/") :
if (len(sys.argv)==3):
fname_2013 = sys.argv[1]
cache_2013 = sys.argv[2]
line_cnt = 0
err_line_cnt = 0
cache_cnt_list = []
for i in range(10) :
cache_cnt_list.append([0, 0])
dic = {}
flag = ""
cache = []
for i in range(10) :
cache.append({})
for i in range(10) :
fo = open(cache_2013 + str(i), 'r')
for line in fo :
lstr = line.rstrip('
').split(' ')
cache_cnt_list[i][0] += 1
if(len(lstr) < 3) :
cache_cnt_list[i][1] += 1
cache[i][lstr[0]] = "NULL"
continue
else :
cache[i][lstr[0]] = lstr[2]
fo.close()
for i in range(10) :
if(cache_cnt_list[i][1] > 0) :
print "The " + cache_2013 + str(i) + " totally %d lines processed with %d error lines (No operator)" % (cache_cnt_list[i][0], cache_cnt_list[i][1])
f_2013 = open(fname_2013, 'r')
f_output_07 = open("output_07", 'w')
for line in f_2013 :
line_cnt += 1
record = line.rstrip('
').split(' ')
if(len(record) < MAP_COUNT) :
err_line_cnt += 1
packid = record[0]
userid = record[3]
price = int(record[6])
adress = cache[int(userid[-1])][userid]
try :
if(flag == packid) :
if(adress in dic.keys()) :
if(dic[adress][0] != userid) :
dic[adress][1] += 1
dic[adress][2] += price
else :
dic[adress] = [userid, 1, price]
else :
output_dic(flag, dic, f_output_07)
dic = {}
flag = packid
dic[adress] = [userid, 1, price]
except Exception, e :
print >> sys.stderr, "%s %s: failed in line#%d, [err_msg:%s]" %
(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, e)
output_dic(flag, dic, f_output_07)
dic = {}
f_output_07.close()
f_2013.close()
print >> sys.stderr, "%s %s: totally %u lines processed with %u error lines" %
(time.asctime(time.localtime(time.time())), "The file " + fname_2013, line_cnt, err_line_cnt)
#print >> sys.stderr, "%s %s: totally %u lines processed with %u error lines" %
#(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, err_line_cnt)
if __name__ == '__main__':
try :
main_07()
except Exception, e :
print >> sys.stderr, "%s %s: failed to process file, [err_msg:%s]"
% (time.asctime(time.localtime(time.time())), os.path.basename(__file__), e)
sys.exit(-1)
sys.exit(0)data_2013原数据
350 1 1 3 1 1 5 2013
350 1 1 2 1 1 20 2013
350 1 1 4 1 1 0 2013
350 1 1 2 1 1 0 2013
350 1 1 6 1 1 1 2013
350 1 1 1 1 1 2 2013
351 1 1 2 1 2 1 2013
351 1 1 1 1 1 0 2013
352 1 1 5 1 5 0 2013
Cache_data
文件1
1 9 a
11 9 b
文件2
2 9 a
文件3
3 9 b
文件4
4 9 c
文件5
5
文件6
16 9 d
6 9 b
catoutput_07 最终结果
350 a 2 22
350 c 1 0
350 b 2 6
351 a 2 1
352 NULL 1 0