该方案基于任务调度框架Gearman,采用Python开发的分布式数据统计系统。
项目的目录结构很简单:
# apple at localhost in ~/Develop/getui [11:24:26]
$ tree
.
├── Browser.py
├── PickleGearman.py
├── SpiderWorker.py
└── countPushNum.py
0 directories, 4 files
我们的Mac Pro Book,Gearman安装并启动:
1 # apple at liujingyu.local in ~/Develop/getui [10:47:36] 2 $ brew install gearman
3 $ gearmand -d -L 127.0.0.1 -p 4307
Python需要安装Gearman、mechanize等库,(pip用于安装常用的包,具体安装见, https://pip.pypa.io/en/latest/installing.html#install-pip)
1 # apple at liujingyu.local in ~/Develop/getui [10:47:36] 2 $ pip install gearman mechanize
workder之间发送,接受Python对象。
1 $ cat PickleGearman.py 2 #!/usr/bin/env python 3 #coding:utf-8 4 5 import pickle 6 import gearman 7 8 class PickleDataEncoder(gearman.DataEncoder): 9 @classmethod 10 def encode(cls, encodable_object): 11 return pickle.dumps(encodable_object) 12 13 @classmethod 14 def decode(cls, decodable_string): 15 return pickle.loads(decodable_string) 16 17 class PickleWorker(gearman.GearmanWorker): 18 data_encoder = PickleDataEncoder 19 20 class PickleClient(gearman.GearmanClient): 21 data_encoder = PickleDataEncoder
运行图:
8个Spider运行过程图:
Spider代码:
1 $ cat SpiderWorker.py 2 #!/usr/bin/env python 3 4 from PickleGearman import PickleWorker 5 from Browser import Browser 6 7 class GearmanWorker(PickleWorker): 8 def on_job_execute(self, current_job): 9 return super(GearmanWorker, self).on_job_execute(current_job) 10 11 def SpiderWorker(gearman_worker, gearman_job): 12 taskIds = gearman_job.data 13 14 try: 15 doc = Browser(taskIds) 16 except Exception as e: 17 config.logging.info(e) 18 19 return doc 20 21 worker = GearmanWorker(['127.0.0.1:4307']) 22 worker.register_task("SpiderWorker", SpiderWorker) 23 worker.work()
countPushNum.py代码:
1 # apple at localhost in ~/Develop/getui [11:30:38] 2 $ cat countPushNum.py 3 #!/usr/bin/python 4 # -*- coding: utf-8 -*- 5 6 import cookielib 7 import json 8 import socket 9 socket.setdefaulttimeout(10) 10 import redis 11 import mechanize 12 from PickleGearman import PickleClient 13 import numpy as np 14 currency = 30 15 16 def printEveryGroupMsg(groupSum): 17 """docstring for printEveryGroupMsg""" 18 print '有效可发送数 实际下发数 收到数' 19 print groupSum 20 21 def main(): 22 gearman_clients = PickleClient(['127.0.0.1:4307']) 23 """docstring for main""" 24 r1 = redis.Redis(host='xxx.xx.xx.x', port=6379, db=0, password='pasword') 25 r2 = redis.Redis(host='xx.xx.xx.xx', port=6379, db=0, password='pasword') 26 27 #总数统计 28 yesterdaykeys = '*'+yesterday+':count' 29 30 totalkeys = r1.keys(yesterdaykeys) 31 for key in totalkeys: 32 print key,r1.get(key) 33 totalkeys = r2.keys(yesterdaykeys) 34 for key in totalkeys: 35 print key,r2.get(key) 36 37 #push数统计 38 yesterdaykeys = '*'+yesterday+':taskIds' 39 40 totalkeys = r1.keys(yesterdaykeys) 41 for key in totalkeys: 42 print key 43 taskIds = list(r1.smembers(key)) 44 everyGroup = [] 45 jobs = [dict(task='SpiderWorker', data=taskId) for taskId in [taskIds[i:i+currency] for i in range(0, len(taskIds), currency)]] 46 for per_jobs in [jobs[i:i+currency] for i in range(0, len(jobs), currency)]: 47 completed_requests = gearman_clients.submit_multiple_jobs(per_jobs) 48 for current_request in completed_requests: 49 content = current_request.result 50 if len(content) == 3: 51 everyGroup.append(content) 52 printEveryGroupMsg(np.sum(everyGroup, 0)) 53 54 totalkeys = r2.keys(yesterdaykeys) 55 for key in totalkeys: 56 print key 57 taskIds = list(r2.smembers(key)) 58 59 everyGroup = [] 60 jobs = [dict(task='SpiderWorker', data=taskId) for taskId in [taskIds[i:i+currency] for i in range(0, len(taskIds), currency)]] 61 for per_jobs in [jobs[i:i+currency] for i in range(0, len(jobs), currency)]: 62 completed_requests = gearman_clients.submit_multiple_jobs(per_jobs) 63 for current_request in completed_requests: 64 content = current_request.result 65 if len(content) == 3: 66 everyGroup.append(content) 67 printEveryGroupMsg(np.sum(everyGroup, 0)) 68 69 if __name__ == '__main__': 70 71 from datetime import date, timedelta 72 73 day = input('请输入时间<昨天请输入1> >') or 0 74 75 yesterday = (date.today() - timedelta(day)).strftime('%y%m%d') 76 today = (date.today() - timedelta(0)).strftime('%y%m%d') 77 78 main()
抓取模块代码:
1 $ cat Browser.py 2 #!/usr/bin/env python 3 #coding:utf-8 4 5 import mechanize 6 import numpy as np 7 import cookielib,json 8 9 def Browser(taskIds): 10 url = 'http://dev.igetui.com/login.htm' 11 # Browser 12 br = mechanize.Browser() 13 14 # Cookie Jar 15 cj = cookielib.LWPCookieJar() 16 br.set_cookiejar(cj) 17 18 # Browser options 19 br.set_handle_equiv(True) 20 br.set_handle_gzip(True) 21 br.set_handle_redirect(True) 22 br.set_handle_referer(True) 23 br.set_handle_robots(False) 24 25 # Follows refresh 0 but not hangs on refresh > 0 26 br.set_handle_refresh(mechanize._http.HTTPRefreshProcessor(), max_time=1) 27 28 # Want debugging messages? 29 br.set_debug_http(False) 30 br.set_debug_redirects(False) 31 br.set_debug_responses(False) 32 33 # User-Agent (this is cheating, ok?) 34 br.addheaders = [('User-agent', 'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.1) 35 Gecko/2008071615 Fedora/3.0.1-1.fc9 Firefox/3.0.1')] 36 37 # Open some site, let's pick a random one, the first that pops in mind: 38 r = br.open(url) 39 40 br.select_form(name = 'loginForm') 41 # 登陆用户名和密码 42 br['username'] = 'getui' 43 br['password'] = 'password' 44 br.submit() 45 46 everyGroup = [] 47 for taskId in taskIds: 48 try: 49 tsum = [] 50 try: 51 home_url = 'http://dev.getui.com/dos/statistics/apiStatistics' 52 response = br.open('https://dev.getui.com/dos/pushRecords/queryApiPushList?curPage=1&appId=16500&taskId=%s' % taskId) 53 html = response.read() 54 55 result = json.loads(html.strip()) 56 if result.has_key('resultList'): 57 resultList = result['resultList'] 58 59 tsum.append(int(resultList[0]['sendNum'])) 60 tsum.append(int(resultList[0]['realSendNum'])) 61 tsum.append(int(resultList[0]['receiveNum'])) 62 except Exception as e: 63 print e 64 else: 65 print tsum 66 67 if len(tsum) == 3: 68 everyGroup.append(tsum) 69 except Exception as e: 70 print e 71 72 return np.sum(everyGroup, 0)