zoukankan      html  css  js  c++  java
  • 寒假学习笔记14

    多线程爬虫实例,以下是源码

    import requests
    from lxml import etree
    import json
    from queue import Queue
    import threading
    
    '''
    1. 创建 URL队列, 响应队列, 数据队列 在init方法中
    2. 在生成URL列表中方法中,把URL添加URL队列中
    3. 在请求页面的方法中,从URL队列中取出URL执行,把获取到的响应数据添加响应队列中
    4. 在处理数据的方法中,从响应队列中取出页面内容进行解析, 把解析结果存储数据队列中
    5. 在保存数据的方法中, 从数据队列中取出数据,进行保存
    6. 开启几个线程来执行上面的方法
    '''
    
    def run_forever(func):
        def wrapper(obj):
            while True:
                func(obj)
        return wrapper
    
    
    class QiubaiSpider(object):
    
        def __init__(self):
            self.headers = {
                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36'
            }
            self.url_pattern = 'https://www.qiushibaike.com/8hr/page/{}/'
            # url 队列
            self.url_queue = Queue()
            # 响应队列
            self.page_queue = Queue()
            # 数据队列
            self.data_queue = Queue()
    
    
        def add_url_to_queue(self):
            # 把URL添加url队列中
            for i in range(1, 14):
                self.url_queue.put(self.url_pattern.format(i))
    
        @run_forever
        def add_page_to_queue(self):
            ''' 发送请求获取数据 '''
            url = self.url_queue.get()
            # print(url)
            response = requests.get(url, headers=self.headers)
            print(response.status_code)
            if response.status_code != 200:
                self.url_queue.put(url)
            else:
                self.page_queue.put(response.content)
            # 完成当前URL任务
            self.url_queue.task_done()
    
        @run_forever
        def add_dz_to_queue(self):
            '''根据页面内容使用lxml解析数据, 获取段子列表'''
            page = self.page_queue.get()
            # print(page)
            element = etree.HTML(page)
            # 先分组,获取分组列表
            div_s = element.xpath('//*[@id="content-left"]/div')
            # 遍历分组列表, 再使用xpath获取内容
            dz_list = []
            for div in div_s:
                item = {}
                # 每个段子包含发送人头像URL, 昵称, 性别, 段子内容, 好笑数,评论数
                # 头像URL
                item['head_url'] = self.get_first_element(div.xpath('./div[1]/a[1]/img/@src'))
                if item['head_url'] is not None:
                    item['head_url'] = 'http:' + item['head_url']
    
                    # 昵称
                item['author_name'] = self.get_first_element(div.xpath('./div[1]/a[2]/h2/text()'))
                # 性别
                gender_class = self.get_first_element(div.xpath('./div[1]/div/@class'))
                if gender_class is not None:
                    item['author_gender'] = 'man' if gender_class.find('man') != -1 else 'women'
                # 段子内容
                item['dz_content'] = self.get_first_element(div.xpath('./a/div/span[1]/text()'))
    
                # 好笑数
                item['dz_funny'] = self.get_first_element(div.xpath('./div[2]/span[1]/i/text()'))
                # 评论数
                item['dz_comments'] = self.get_first_element(div.xpath('./div[2]/span[2]/a/i/text()'))
                # print(item
                dz_list.append(item)
            # print(dz_list)
            self.data_queue.put(dz_list)
            self.page_queue.task_done()
    
        def get_first_element(self, list):
            '''获取列表中第一个元素,如果是空列表就返回None'''
            return list[0] if len(list) != 0 else None
    
        @run_forever
        def save_dz_list(self):
            '''把段子信息保存到文件中'''
            dz_list = self.data_queue.get()
            # print(dz_list)
            with open('qiushi_thread.txt', 'a', encoding='utf8') as f:
                for dz in dz_list:
                    json.dump(dz, f, ensure_ascii=False)
                    f.write('
    ')
            self.data_queue.task_done()
    
    
        def run_use_more_task(self, func, count=1):
            '''把func放到线程中执行, count:开启多少线程执行'''
            for i in range(0, count):
                t = threading.Thread(target=func)
                t.setDaemon(True)
                t.start()
    
        def run(self):
            # 开启线程执行上面的几个方法
            url_t = threading.Thread(target=self.add_url_to_queue)
            # url_t.setDaemon(True)
            url_t.start()
    
            self.run_use_more_task(self.add_page_to_queue, 3)
            self.run_use_more_task(self.add_dz_to_queue, 2)
            self.run_use_more_task(self.save_dz_list, 2)
    
            # 使用队列join方法,等待队列任务都完成了才结束
            self.url_queue.join()
            self.page_queue.join()
            self.data_queue.join()
    
    if __name__ == '__main__':
        qbs = QiubaiSpider()
        qbs.run()
    

      

  • 相关阅读:
    管理配置KVM,热添加、热迁移
    《google工作整理术》21条原则
    【教你玩转云计算】在阿里云一键安装快速部署Oracle11g 【转】
    Oracle数据库开启归档日志及rman备份情况查询
    【转】CentOS7一键部署OpenStack
    【转】基于openstack安装部署私有云详细图文教程
    Oracle Database 12c数据库中文配置安装图解教程(详细安装步骤)
    精通Linux(第2版) 第3章 设备管理
    精通Linux(第2版) 第2章 基础命令和目录结构
    四种IO 模型
  • 原文地址:https://www.cnblogs.com/wxy2000/p/12318396.html
Copyright © 2011-2022 走看看