zoukankan      html  css  js  c++  java
  • 多线程糗事百科案例(python2)

    Queue(队列对象)

    Queue是python中的标准库,可以直接import Queue引用;队列是线程间最常用的交换数据的形式

    python下多线程的思考

    对于资源,加锁是个重要的环节。因为python原生的list,dict等,都是not thread safe的。而Queue,是线程安全的,因此在满足使用条件下,建议使用队列

    1. 初始化: class Queue.Queue(maxsize) FIFO 先进先出

    2. 包中的常用方法:

      • Queue.qsize() 返回队列的大小

      • Queue.empty() 如果队列为空,返回True,反之False

      • Queue.full() 如果队列满了,返回True,反之False

      • Queue.full 与 maxsize 大小对应

      • Queue.get([block[, timeout]])获取队列,timeout等待时间

    3. 创建一个“队列”对象

      • import Queue
      • myqueue = Queue.Queue(maxsize = 10)
    4. 将一个值放入队列中

      • myqueue.put(10)
    5. 将一个值从队列中取出

      • myqueue.get()
      1 # -*- coding:utf-8 -*-
      2 
      3 # 使用了线程库
      4 import threading
      5 # 队列
      6 from Queue import Queue
      7 # 解析库
      8 from lxml import etree
      9 # 请求处理
     10 import requests
     11 # json处理
     12 import json
     13 import time
     14 
     15 class ThreadCrawl(threading.Thread):
     16     def __init__(self, threadName, pageQueue, dataQueue):
     17         #threading.Thread.__init__(self)
     18         # 调用父类初始化方法
     19         super(ThreadCrawl, self).__init__()
     20         # 线程名
     21         self.threadName = threadName
     22         # 页码队列
     23         self.pageQueue = pageQueue
     24         # 数据队列
     25         self.dataQueue = dataQueue
     26         # 请求报头
     27         self.headers = {"User-Agent" : "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0;"}
     28 
     29     def run(self):
     30         print "启动 " + self.threadName
     31         while not CRAWL_EXIT:
     32             try:
     33                 # 取出一个数字,先进先出
     34                 # 可选参数block,默认值为True
     35                 #1. 如果对列为空,block为True的话,不会结束,会进入阻塞状态,直到队列有新的数据
     36                 #2. 如果队列为空,block为False的话,就弹出一个Queue.empty()异常,
     37                 page = self.pageQueue.get(False)
     38                 url = "http://www.qiushibaike.com/8hr/page/" + str(page) +"/"
     39                 #print url
     40                 content = requests.get(url, headers = self.headers).text
     41                 time.sleep(1)
     42                 self.dataQueue.put(content)
     43                 #print len(content)
     44             except:
     45                 pass
     46         print "结束 " + self.threadName
     47 
     48 class ThreadParse(threading.Thread):
     49     def __init__(self, threadName, dataQueue, filename, lock):
     50         super(ThreadParse, self).__init__()
     51         # 线程名
     52         self.threadName = threadName
     53         # 数据队列
     54         self.dataQueue = dataQueue
     55         # 保存解析后数据的文件名
     56         self.filename = filename
     57         #
     58         self.lock = lock
     59 
     60     def run(self):
     61         print "启动" + self.threadName
     62         while not PARSE_EXIT:
     63             try:
     64                 html = self.dataQueue.get(False)
     65                 self.parse(html)
     66             except:
     67                 pass
     68         print "退出" + self.threadName
     69 
     70     def parse(self, html):
     71         # 解析为HTML DOM
     72         html = etree.HTML(html)
     73 
     74         node_list = html.xpath('//div[contains(@id, "qiushi_tag")]')
     75 
     76         for node in node_list:
     77             # xpath返回的列表,这个列表就这一个参数,用索引方式取出来,用户名
     78             username = node.xpath('.//img/@alt')[0]
     79             # 图片连接
     80             image = node.xpath('.//div[@class="thumb"]//@src')#[0]
     81             # 取出标签下的内容,段子内容
     82             content = node.xpath('.//div[@class="content"]/span')[0].text
     83             # 取出标签里包含的内容,点赞
     84             zan = node.xpath('.//i')[0].text
     85             # 评论
     86             comments = node.xpath('.//i')[1].text
     87 
     88             items = {
     89                 "username" : username,
     90                 "image" : image,
     91                 "content" : content,
     92                 "zan" : zan,
     93                 "comments" : comments
     94             }
     95 
     96             # with 后面有两个必须执行的操作:__enter__ 和 _exit__
     97             # 不管里面的操作结果如何,都会执行打开、关闭
     98             # 打开锁、处理内容、释放锁
     99             with self.lock:
    100                 # 写入存储的解析后的数据
    101                 self.filename.write(json.dumps(items, ensure_ascii = False).encode("utf-8") + "
    ")
    102 
    103 CRAWL_EXIT = False
    104 PARSE_EXIT = False
    105 
    106 
    107 def main():
    108     # 页码的队列,表示20个页面
    109     pageQueue = Queue(20)
    110     # 放入1~10的数字,先进先出
    111     for i in range(1, 21):
    112         pageQueue.put(i)
    113 
    114     # 采集结果(每页的HTML源码)的数据队列,参数为空表示不限制
    115     dataQueue = Queue()
    116 
    117     filename = open("duanzi.json", "a")
    118     # 创建锁
    119     lock = threading.Lock()
    120 
    121     # 三个采集线程的名字
    122     crawlList = ["采集线程1号", "采集线程2号", "采集线程3号"]
    123     # 存储三个采集线程的列表集合
    124     threadcrawl = []
    125     for threadName in crawlList:
    126         thread = ThreadCrawl(threadName, pageQueue, dataQueue)
    127         thread.start()
    128         threadcrawl.append(thread)
    129 
    130 
    131     # 三个解析线程的名字
    132     parseList = ["解析线程1号","解析线程2号","解析线程3号"]
    133     # 存储三个解析线程
    134     threadparse = []
    135     for threadName in parseList:
    136         thread = ThreadParse(threadName, dataQueue, filename, lock)
    137         thread.start()
    138         threadparse.append(thread)
    139 
    140     # 等待pageQueue队列为空,也就是等待之前的操作执行完毕
    141     while not pageQueue.empty():
    142         pass
    143 
    144     # 如果pageQueue为空,采集线程退出循环
    145     global CRAWL_EXIT
    146     CRAWL_EXIT = True
    147 
    148     print "pageQueue为空"
    149 
    150     for thread in threadcrawl:
    151         thread.join()
    152         print "1"
    153 
    154     while not dataQueue.empty():
    155         pass
    156 
    157     global PARSE_EXIT
    158     PARSE_EXIT = True
    159 
    160     for thread in threadparse:
    161         thread.join()
    162         print "2"
    163 
    164     with lock:
    165         # 关闭文件
    166         filename.close()
    167     print "谢谢使用!"
    168 
    169 if __name__ == "__main__":
    170     main()
  • 相关阅读:
    【MongoDb入门】15分钟让你敢说自己会用MongoDB了
    【干货】基于Owin WebApi 使用OAuth2进行客户端授权服务
    【学习】在Windows10平台使用Docker ToolBox安装docker(一)
    快速搭建WebAPI(Odata+Code-First)附Odata条件查询表~
    使用QuertZ组件来搞项目工作流(一)
    AspNetCore 使用NLog日志,NLog是基于.NET平台开的类库!(又一神器)
    AspNetCore 基于流下载文件与示例代码
    再见了Server对象,拥抱IHostingEnvironment服务对象(.net core)
    AspNetCore 文件上传(模型绑定、Ajax) 两种方式 get到了吗?
    AspNetCore 目前不支持SMTP协议(基于开源组件开发邮件发送,它们分别是MailKit 和 FluentEmail )
  • 原文地址:https://www.cnblogs.com/wanglinjie/p/9196851.html
Copyright © 2011-2022 走看看