zoukankan      html  css  js  c++  java
  • 多线程爬虫案例

    多线程糗事百科案例

    案例要求参考上一个糗事百科单进程案例

    Queue(队列对象)

    Queue是python中的标准库,可以直接import Queue引用;队列是线程间最常用的交换数据的形式

    python下多线程的思考

    对于资源,加锁是个重要的环节。因为python原生的list,dict等,都是not thread safe的。而Queue,是线程安全的,因此在满足使用条件下,建议使用队列

    1. 初始化: class Queue.Queue(maxsize) FIFO 先进先出

    2. 包中的常用方法:

      • Queue.qsize() 返回队列的大小

      • Queue.empty() 如果队列为空,返回True,反之False

      • Queue.full() 如果队列满了,返回True,反之False

      • Queue.full 与 maxsize 大小对应

      • Queue.get([block[, timeout]])获取队列,timeout等待时间

    3. 创建一个“队列”对象

      • import Queue
      • myqueue = Queue.Queue(maxsize = 10)
    4. 将一个值放入队列中

      • myqueue.put(10)
    5. 将一个值从队列中取出

      • myqueue.get()

    多线程示意图

    import threading
    from queue import Queue
    from lxml import etree
    import requests
    import json
    import time
    
    
    class ThreadCrawl(threading.Thread):
        def __init__(self, threadName, pageQueue, dataQueue):
            # 调用父类的初始化方法
            # threading.Thread.__init__(self)
            super(ThreadCrawl, self).__init__()
            # 线程的名字
            self.threadName = threadName
            # 页码队列
            self.pageQueue = pageQueue
            # 数据队列
            self.dataQueue = dataQueue
            # 请求报头
            self.headers = {
                "Uaer-Agent": "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
            }
    
        def run(self):
            print('启动:{}'.format(self.threadName))
            while not CRAWL_EXIT:
                try:
                    # 取出队列中的一个数字, 先进先出
                    # 可选参数block, 默认值为True, 两种用法
                    # 1. 如果队列为空, block为True的话, 不会结束, 就会进入阻塞状态, 知道队列有新的数据
                    # 2. 如果队列为空, block为False的话, 就会弹出一个Queue.empty()异常
                    page = self.pageQueue.get(False)
                    url = 'http://www.qiushibaike.com/8hr/page/{}/'.format(page)
                    content = requests.get(url, headers=self.headers).text
                    # content = content.decode('utf-8')
                    # print(content)
                    time.sleep(1)
                    self.dataQueue.put(content)
                except:
                    pass
            print('结束:{}'.format(self.threadName))
    
    
    class ThreadParse(threading.Thread):
        def __init__(self, threadName, dataQueue, filename, lock):
            super(ThreadParse, self).__init__()
            # 线程的名字
            self.threadName = threadName
            # 数据队列
            self.dataQueue = dataQueue
            # 保存解析后数据的文件名
            self.filename = filename
            # 锁
            self.lock = lock
    
        def run(self):
            print('启动:{}'.format(self.threadName))
            while not PARSE_EXIT:
                try:
                    html = self.dataQueue.get(False)
                    self.parse(html)
                except:
                    pass
            print('结束:{}'.format(self.threadName))
    
        def parse(self, html):
            # 解析为HTML DOM
            html = etree.HTML(html)
            node_list = html.xpath('//div[contains(@class, "article block untagged mb15")]')
            for node in node_list:
                # print(node)
                # xpath返回的列表,这个列表就这一个参数,用索引方式取出来,用户名
                # .//h2  用户名
                username = node.xpath('.//h2')[0].text.strip()
                # print('username==={}'.format(username))
                # .//div[@class="thumb"]//@src  图片连接
                image = node.xpath('.//div[@class="thumb"]//@src')
                # print('image==={}'.format(image))
                # .//div[@class="content"]/span    取出标签下的内容,段子内容
                content = node.xpath('.//div[@class="content"]/span')[0].text.strip()
                # print('content==={}'.format(content))
                # .//i[@class="number"][0] 点赞  取出标签里包含的内容,点赞
                zan = node.xpath('.//i[@class="number"]')[0].text
                # print('zan==={}'.format(zan))
                # .//i[@class="number"][i] 评论
                comments = node.xpath('.//i[@class="number"]')[1].text
                # print('comments==={}'.format(comments))
                items = {
                    "username" : username,
                    "image" : image,
                    "content" : content,
                    "zan" : zan,
                    "comments" : comments
                }
    
                # with 后面有两个必须执行的操作:__enter__ 和 _exit__
                # 不管里面的操作结果如何,都会执行打开、关闭
                # 打开锁、处理内容、释放锁
                # print("正在写入内容!!!")
                with self.lock:
                    # print("正在写入内容!!!")
                    # # 写入存储的解析后的数据
                    # json_data = json.dumps(items, ensure_ascii=False)
                    # print('jsondata==={}'.format(json_data))
                    # self.filename.write(json_data.encode("utf-8") + "
    ")
                    self.filename.write(str(json.dumps(items, ensure_ascii = False))+'
    ')
                    print("写入完成!!!")
    
    
    # 采集爬虫退出信号
    CRAWL_EXIT = False
    # 解析爬虫退出信号
    PARSE_EXIT = False
    
    
    def main():
        # 页码的队列, 表示10个页面
        pageQueue = Queue(10)
        # 放入1~10个数字, 先进先出
        for i in range(1, 11):
            pageQueue.put(i)
    
        # 采集结果(每页的html源码)的数据队列, 参数为空表示不限制
        dataQueue = Queue()
    
        filename = open('duanzi.json', 'a', encoding='utf-8')
        print("打开文件!!!")
    
        # 创建锁
        lock = threading.Lock()
    
        # 三个采集线程的名字
        crawlList = ["采集线程1号", "采集线程2号", "采集线程3号"]
    
        # 存储三个采集线程的列表集合
        threadcrawl = []
    
        for threadName in crawlList:
            thread = ThreadCrawl(threadName, pageQueue, dataQueue)
            thread.start()
            threadcrawl.append(thread)
            time.sleep(1)
    
        # 三个解析线程的名字
        parseList = ["解析线程1号", "解析线程2号", "解析线程3号"]
    
        # 存储三个解析线程
        threadparse = []
    
        for threadName in parseList:
            thread = ThreadParse(threadName, dataQueue, filename, lock)
            thread.start()
            threadparse.append(thread)
    
        # 等待pageQueue队列为空, 等待之前的操作执行完毕
        while not pageQueue.empty():
            pass
    
        # 如果pageQueue为空, 采集线程退出循环
        global CRAWL_EXIT
        CRAWL_EXIT = True
    
        print('pageQueue队列为空')
    
        for thread in threadcrawl:
            thread.join()
            print("1")
    
        while not dataQueue.empty():
            pass
    
        global PARSE_EXIT
        PARSE_EXIT = True
    
        for thread in threadparse:
            thread.join()
            print("2")
    
        with lock:
            print("关闭文件!!!")
            # 关闭文件
            filename.close()
        print("谢谢使用")
    
    
    if __name__ == '__main__':
        main()
    

      

  • 相关阅读:
    171. Excel Sheet Column Number (Easy)
    349. Intersection of Two Arrays (Easy)
    453. Minimum Moves to Equal Array Elements (Easy)
    657. Judge Route Circle (Easy)
    CSS笔记
    保存页面状态
    UI开发总结
    ubuntu 下配置munin
    反向代理配置
    JavaScript 高级程序设计第二版
  • 原文地址:https://www.cnblogs.com/amou/p/9292245.html
Copyright © 2011-2022 走看看