zoukankan      html  css  js  c++  java
  • python之多线程 queue 实践 筛选有效url

    0.目录

    1.背景

    某号码卡申请页面通过省份+城市切换归属地,每次返回10个号码。

    通过 Fiddler 抓包确认 url 关键参数规律:

    provinceCode 两位数字

    cityCode 三位数字

    groupKey 与 provinceCode 为一一对应

    所以任务是手动遍历省份,取得 provinceCode 和 groupKey 组合列表,对组合列表的每个组合执行 for 循环 cityCode ,确认有效 url 。

    url 不对的时候正常返回,而使用 squid 多代理经常出现代理失效,需要排除 requests 相关异常,尽量避免错判。

    # In [88]: r.text
    # Out[88]: u'jsonp_queryMoreNums({"numRetailList":[],"code":"M1","uuid":"a95ca4c6-957e-462a-80cd-0412b
    # d5672df","numArray":[]});' 

    获取号码归属地信息:

    url = 'http://www.ip138.com:8080/search.asp?action=mobile&mobile=%s' %num

    中文转换拼音:

    from pypinyin import lazy_pinyin

    province_pinyin = ''.join(lazy_pinyin(province_zh))

    确认任务队列已完成:

    https://docs.python.org/2/library/queue.html#module-Queue

    Queue.task_done()
    Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
    
    If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
    
    Raises a ValueError if called more times than there were items placed in the queue.

    2.完整代码

    referer 和 url 细节已#!/usr/bin/env python# -*- coding: UTF-8 -*import timeimport reimport jsonimport traceback

    import threading
    lock = threading.Lock()
    import Queue
    task_queue = Queue.Queue()
    write_queue = Queue.Queue()
     
    import requests
    from requests.exceptions import (ConnectionError, ConnectTimeout, ReadTimeout, SSLError,
                                    ProxyError, RetryError, InvalidSchema)
    s = requests.Session()
    s.headers.update({'user-agent':'Mozilla/5.0 (iPhone; CPU iPhone OS 9_3_5 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Mobile/13G36 MicroMessenger/6.5.12 NetType/4G'})
    # 隐藏 referer 细节,实测可不用
    # s.headers.update({'Referer':'https://servicewechat.com/xxxxxxxx'}) s.verify = False s.mount('https://', requests.adapters.HTTPAdapter(pool_connections=1000, pool_maxsize=1000)) import copy sp = copy.deepcopy(s) proxies = {'http': 'http://127.0.0.1:3128', 'https': 'https://127.0.0.1:3128'} sp.proxies = proxies from urllib3.exceptions import InsecureRequestWarning from warnings import filterwarnings filterwarnings('ignore', category = InsecureRequestWarning) from bs4 import BeautifulSoup as BS from pypinyin import lazy_pinyin import pickle import logging def get_logger(): logger = logging.getLogger("threading_example") logger.setLevel(logging.DEBUG) # fh = logging.FileHandler("d:/threading.log") fh = logging.StreamHandler() fmt = '%(asctime)s - %(threadName)-10s - %(levelname)s - %(message)s' formatter = logging.Formatter(fmt) fh.setFormatter(formatter) logger.addHandler(fh) return logger logger = get_logger() # url 不对的时候正常返回: # In [88]: r.text # Out[88]: u'jsonp_queryMoreNums({"numRetailList":[],"code":"M1","uuid":"a95ca4c6-957e-462a-80cd-0412b # d5672df","numArray":[]});' results = [] def get_nums(): global results pattern = re.compile(r'({.*?})') #, re.S | re.I | re.X) while True: try: #尽量缩小try代码块大小 _url = task_queue.get() url = _url + str(int(time.time()*1000)) resp = sp.get(url, timeout=10) except (ConnectionError, ConnectTimeout, ReadTimeout, SSLError, ProxyError, RetryError, InvalidSchema) as err: task_queue.task_done() ############### 重新 put 之前需要 task_done ,才能保证释放 task_queue.join() task_queue.put(_url) except Exception as err: logger.debug(' status_code:{} url:{} err: {} traceback: {}'.format(resp.status_code, url, err, traceback.format_exc())) task_queue.task_done() ############### 重新 put 之前需要 task_done ,才能保证释放 task_queue.join() task_queue.put(_url) else: try: # rst = resp.content # match = rst[rst.index('{'):rst.index('}')+1] # m = re.search(r'({.*?})',resp.content) m = pattern.search(resp.content) match = m.group() rst = json.loads(match) nums = [num for num in rst['numArray'] if num>10000] nums_len = len(nums) # assert nums_len == 10 num = nums[-1] province_zh, city_zh, province_pinyin, city_pinyin = get_num_info(num) result = (str(num), province_zh, city_zh, province_pinyin, city_pinyin, _url) results.append(result) write_queue.put(result) logger.debug(u'results:{} threads: {} task_queue: {} {} {} {} {}'.format(len(results), threading.activeCount(), task_queue.qsize(), num, province_zh, city_zh, _url)) except (ValueError, AttributeError, IndexError) as err: pass except Exception as err: # print err,traceback.format_exc() logger.debug(' status_code:{} url:{} content:{} err: {} traceback: {}'.format(resp.status_code, url, resp.content, err, traceback.format_exc())) finally: task_queue.task_done() ############### def get_num_info(num): try: url = 'http://www.ip138.com:8080/search.asp?action=mobile&mobile=%s' %num resp = s.get(url) soup = BS(resp.content, 'lxml') # pro, cit = re.findall(r'<TD class="tdc2" align="center">(.*?)<', resp.content)[0].decode('gbk').split('&nbsp;') rst = soup.select('tr td.tdc2')[1].text.split() if len(rst) == 2: province_zh, city_zh = rst else: province_zh = city_zh = rst[0] province_pinyin = ''.join(lazy_pinyin(province_zh)) city_pinyin = ''.join(lazy_pinyin(city_zh)) except Exception as err: print err,traceback.format_exc() province_zh = city_zh = province_pinyin = city_pinyin = 'xxx' return province_zh, city_zh, province_pinyin, city_pinyin def write_result(): with open('10010temp.txt','w',0) as f: # 'w' open时会截去之前内容,所以放在 while True 之上 while True: try: rst = ' '.join(write_queue.get()) + ' ' f.write(rst.encode('utf-8')) write_queue.task_done() except Exception as err: print err,traceback.format_exc() if __name__ == '__main__': province_groupkey_list = [ ('18', '15237219'), ('51', '21236872'), ('31', '34236498'), ('87', '43236612'), ('10', '8400250331'), ('89', '90242110'), ('83', '99250240'), ('19', '59237227'), ('36', '60236866'), ('97', '49236584'), ('79', '13238152'), ('34', '33236916'), ('71', '40236873'), ('88', '9100283297'), ('50', '27237168'), ('59', '6800258755'), ('74', '71237034'), ('11', '85236889'), ('84', '13236970'), ('76', '85236973'), ('13', '36236594'), ('85', '53237275'), ('86', '79237759'), ('90', '19236614'), ('30', '2400265649'), ('38', '12236361'), ('17', '17236695'), ('70', '4900281779'), ('75', '67237076'), ('91', '19236028'), ('81', '20236750')] # province_groupkey_list = [('51', '21236872')] import itertools for (provinceCode, groupKey) in province_groupkey_list: # for cityCode in range(1000): for cityCode in [''.join(i) for i in itertools.product('0123456789',repeat=3)]:
    fmt
    = 'https://m.1xxxx.com/xxxxx&provinceCode={provinceCode}&cityCode={cityCode}&xxxxx&groupKey={groupKey}&xxxxx' # url 细节已被隐藏 url = fmt.format(provinceCode=provinceCode, cityCode=cityCode, groupKey=groupKey)#, now=int(float(time.time())*1000)) task_queue.put(url) threads = [] for i in range(300): t = threading.Thread(target=get_nums) #args接收元组,至少(a,) threads.append(t) t_write_result = threading.Thread(target=write_result) threads.append(t_write_result) # for t in threads: # t.setDaemon(True) # t.start() # while True: # pass for t in threads: t.setDaemon(True) t.start() # for t in threads: # t.join() task_queue.join() print 'task done' write_queue.join() print 'write done' with open('10010temp','w') as f: pickle.dump(results, f) print 'all done' # while True: # pass

    3.运行结果

    多运行几次,确认最终 results 数量339

  • 相关阅读:
    WINCE6.0+S3C6410睡眠和唤醒的实现
    WINCE6.0+S3C6410的触摸屏驱动
    S3C6410的Bootloader的两个阶段BL1和BL2编译相关学习
    amix vim vimrc 3.6 [_vimrc x64 vim (WorkPlace)]配置
    异常的开销
    A C# Reading List by Eric Lippert (ZZ)
    SQL SERVER 2008中定时备份数据库任务的创建与删除
    ASP.NET26个常用性能优化方法
    如何使用四个语句来提高 SQL Server 的伸缩性
    Cookies揭秘 [Asp.Net, Javascript]
  • 原文地址:https://www.cnblogs.com/my8100/p/7504289.html
Copyright © 2011-2022 走看看