今天上来分享一下昨天实现的一个多线程网页下载器。
这是一个有着真实需求的实现,我的用途是拿它来通过 HTTP 方式向服务器提交游戏数据。把它放上来也是想大家帮忙挑刺,找找 bug,让它工作得更好。
keywords:python,http,multi-threads,thread,threading,httplib,urllib,urllib2,Queue,http pool,httppool
废话少说,上源码:
1 # -*- coding:utf-8 -*- 2 import urllib, httplib 3 import thread 4 import time 5 from Queue import Queue, Empty, Full 6 HEADERS = {"Content-type": "application/x-www-form-urlencoded", 7 'Accept-Language':'zh-cn', 8 'User-Agent': 'Mozilla/4.0 (compatible; MSIE 6.0;Windows NT 5.0)', 9 "Accept": "text/plain"} 10 UNEXPECTED_ERROR = -1 11 POST = 'POST' 12 GET = 'GET' 13 def base_log(msg): 14 print msg 15 def base_fail_op(task, status, log): 16 log('fail op. task = %s, status = %d'%(str(task), status)) 17 def get_remote_data(tasks, results, fail_op = base_fail_op, log = base_log): 18 while True: 19 task = tasks.get() 20 try: 21 tid = task['id'] 22 hpt = task['conn_args'] # hpt <= host:port, timeout 23 except KeyError, e: 24 log(str(e)) 25 continue 26 log('thread_%s doing task %d'%(thread.get_ident(), tid)) 27 #log('hpt = ' + str(hpt)) 28 conn = httplib.HTTPConnection(**hpt) 29 30 try: 31 params = task['params'] 32 except KeyError, e: 33 params = {} 34 params = urllib.urlencode(params) 35 #log('params = ' + params) 36 37 try: 38 method = task['method'] 39 except KeyError: 40 method = 'GET' 41 #log('method = ' + method) 42 43 try: 44 url = task['url'] 45 except KeyError: 46 url = '/' 47 #log('url = ' + url) 48 49 headers = HEADERS 50 try: 51 tmp = task['headers'] 52 except KeyError, e: 53 tmp = {} 54 headers.update(tmp) 55 #log('headers = ' + str(headers)) 56 headers['Content-Length'] = len(params) 57 58 try: 59 if method == POST: 60 conn.request(method, url, params, headers) 61 else: 62 conn.request(method, url + params) 63 response = conn.getresponse() 64 except Exception, e: 65 log('request failed. method = %s, url = %s, params = %s headers = %s'%( 66 method, url, params, headers)) 67 log(str(e)) 68 fail_op(task, UNEXPECTED_ERROR, log) 69 continue 70 71 if response.status != httplib.OK: 72 fail_op(task, response.status, log) 73 continue 74 75 data = response.read() 76 results.put((tid, data), True) 77 78 class HttpPool(object): 79 def __init__(self, threads_count, fail_op, log): 80 self._tasks = Queue() 81 self._results = Queue() 82 83 for i in xrange(threads_count): 84 thread.start_new_thread(get_remote_data, 85 (self._tasks, self._results, fail_op, log)) 86 87 def add_task(self, tid, host, url, params, headers = {}, method = 'GET', timeout = None): 88 task = { 89 'id' : tid, 90 'conn_args' : {'host' : host} if timeout is None else {'host' : host, 'timeout' : timeout}, 91 'headers' : headers, 92 'url' : url, 93 'params' : params, 94 'method' : method, 95 } 96 try: 97 self._tasks.put_nowait(task) 98 except Full: 99 return False 100 return True 101 102 def get_results(self): 103 results = [] 104 while True: 105 try: 106 res = self._results.get_nowait() 107 except Empty: 108 break 109 results.append(res) 110 return results 111 112 def test_google(task_count, threads_count): 113 hp = HttpPool(threads_count, base_fail_op, base_log) 114 for i in xrange(task_count): 115 if hp.add_task(i, 116 'www.google.cn', 117 '/search?', 118 {'q' : 'lai'}, 119 # method = 'POST' 120 ): 121 print 'add task successed.' 122 123 while True: 124 results = hp.get_results() 125 if not results: 126 time.sleep(1.0 * random.random()) 127 for i in results: 128 print i[0], len(i[1]) 129 # print unicode(i[1], 'gb18030') 130 131 if __name__ == '__main__': 132 import sys, random 133 task_count, threads_count = int(sys.argv[1]), int(sys.argv[2]) 134 test_google(task_count, threads_count)
from: http://blog.csdn.net/gzlaiyonghao/article/details/4083852