zoukankan      html  css  js  c++  java
  • 并发下载

    并发下载

    一.queue的实现

    from queue import Queue
    
    queue_object=Queue()
    
    for i in range(4):
        queue_object.put(i)
        
    while not queue_object.empty():
        print(queue_object.get())
    
    0
    1
    2
    3
    
    from queue import LifoQueue
    
    lifo_queue=LifoQueue()
    
    for i in range(4):
        lifo_queue.put(i)
        
    while not lifo_queue.empty():
        print(lifo_queue.get())
    
    3
    2
    1
    0
    
    from queue import PriorityQueue
    
    class Job(object):
        def __init__(self,level,description):
            self.level=level
            self.description=description
            return
        
        def __lt__(self,other):
            return self.level<other.level
        
    priority_queue=PriorityQueue()
    
    priority_queue.put(Job(5,"中级别工作"))
    priority_queue.put(Job(10,"低级别工作"))
    priority_queue.put(Job(1,"重要工作"))
    
    while not priority_queue.empty():
        next_job=priority_queue.get()
        print("开始工作:",next_job.description)
    
    开始工作: 重要工作
    开始工作: 中级别工作
    开始工作: 低级别工作
    

    二.三种技术采集和解析数据对比

    1.单线程实现

    步骤:构建网址—>访问网页并获取源代码—>解析源代码—>转成JSON格式—>存储在本地文件

    from lxml import etree
    import requests
    import json
    
    # 访问网页的请求头
    headers={
        "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36",
        "Accept-Language":"zh-CN,zh;q=0.8"
    }
    
    # 存储解析后数据的本地文件
    local_file=open("duanzi.json","a")
    
    # 解析 html字符串,获取需要信息
    def parse_html(html):
        text=etree.HTML(html)
        
        # 返回所有段子的结点位置
        # contains模糊查询,第一个参数是要匹配的标签,第二个参数是标签名的部分内容
        node_list=text.xpath('//li[contains(@id,"qiushi_tag")]')
        
        for node in node_list:
            # 获取用户名
            username=node.xpath('.//span[@class="recmd-name"]/text()')
        
            #图片链接
            image=node.xpath('.//img/@src')[0]
        
            # 段子内容
            content=node.xpath('.//a[@class="recmd-content"]')[0].text
        
            #点赞
            like=node.xpath('.//div[@class="recmd-num"]/span')[0].text
        
            # 评论
            try:
                comments=node.xpath('.//div[@class="recmd-num"]/span')[3].text
            except IndexError:
                comments=0
        
            items={
                "username":username,
                "image":image,
                "content":content,
                "like":like,
                "comments":comments
            }
        
            local_file.write(json.dumps(items,ensure_ascii=False)+"
    ")
            
    def main():
        # 获取1-10页的网页源代码解析
        for page in range(1,11):
            url="https://www.qiushibaike.com/8hr/page/"+str(page)+"/"
            # 爬取网页源代码
            html=requests.get(url,headers=headers).text
            
            parse_html(html)
    
    main()
    

    2.多线程实现

    从单线程爬虫的流程可以看出,全部过程只是用了一个线程,先爬取一个网页,对网页内容进行解析,然后存储,完成整套操作后再开始爬取下一个网页,每个网页依次进行,效率非常慢

    爬虫的流程简要步骤如下:

    1. 使用一个队列pageQueue保存要访问的网页页码
    2. 同时启动多个采集线程,每个线程都从网页页码队列pageQueue中取出一个要访问的页码,构建网址,访问网址并爬取数据.操作完一个网页后再从网页页码队列中取出下一个页码,依次进行,直到所有的页码都已访问完毕.所有的采集线程保存在列表threadCrawls中
    3. 使用一个队列dataQueue来保存所有的网页源代码,每个线程获取到的数据都放入该队列中
    4. 同时启动多个解析线程,每个线程都从网页源代码队列dataQueue中取出一个网页源代码,并进行解析,获取想要的数据,并转化为JSON格式.解析完成后再取出下一个网页源代码,依次进行,直到所有的源代码都已被取出.将所有的解析线程存储在列表threadParses中
    5. 将解析得到的JSON数据存储在本地文件duanzi.json中
    from IPython.display import Image
    Image(filename="./data/thread.png",width=500)
    

    output_13_0.png

    创建一个ThreadCrawl类,继承自threading.Thread类,用于采集网页信息

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author LQ6H
    
    import requests
    import threading
    from lxml import etree
    import json
    from queue import Queue
    
    
    # 采集网页页码队列是否为空的信号
    CRAWL_EXIT=False
    
    class ThreadCrawl(threading.Thread):
        def __init__(self,threadName,pageQueue,dataQueue):
            threading.Thread.__init__(self)
            # 线程名
            self.threadName=threadName
            # 页码队列
            self.pageQueue=pageQueue
            # 数据队列
            self.dataQueue=dataQueue
    
            self.headers="{'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'}"
    
        def run(self):
            print("启动:"+self.threadName)
    
            while not CRAWL_EXIT:
                try:
                    # 从dataQueue中取出1一个页码数字,先进先出
                    # 可选参数block,默认值是True
                    # 如果队列为空,block为True,会进入阻塞状态,直到队列有新的数据
                    # 如果队列为空,block为False,会弹出一个Queue.empty()异常
                    page=self.pageQueue.get(False)
                    # 构建网页的URL地址
                    url="https://www.qiushibaike.com/8hr/page/"+str(page)+"/"
                    content=requests.get(url,headers=self.headers).text
    
                    # 将爬取到的网页源代码放入dataQueue队列中
                    self.dataQueue.put(content)
                except:
                    pass
    
            print("结束:"+self.threadName)
    

    首先定义一个全局变量CRAWL_EXIT,用于标识pageQueue队列是否为空.当pageQueue不玩空时,线程继续爬取下一个页码;当pageQueue为空时,表明所有的网页都已被爬取完毕,线程就可以退出

    队列pageQueue是线程安全的,使用队列来调度线程,保证了每个线程采集的网页地址不重复

    创建一个ThreadParse类,继承自threading.Thread,用于解析网页信息

    PARSE_EXIT=False
    class ThreadParse(threading.Thread):
        def __init__(self,threadName,dataQueue,localFile,lock):
            super(ThreadParse,self).__init__()
            # 线程名
            self.threadName=threadName
            # 数据队列
            self.dataQueue=dataQueue
            # 保存解析后数据的文件名
            self.localFile=localFile
            # 互斥锁
            self.lock=lock
    
        def run(self):
            print("启动:"+self.threadName)
            while not PARSE_EXIT:
                try:
                    html=self.dataQueue.get(False)
                    self.parse(html)
    
                except:
                    pass
    
            print("结束:"+self.threadName)
    
    
        def parse(self,html):
            text = etree.HTML(html)
            node_list = text.xpath('//li[contains(@id,"qiushi_tag")]')
    
            for node in node_list:
                try:
    
                    # 获取用户名
                    username = node.xpath('.//span[@class="recmd-name"]/text()')
    
                    # 图片链接
                    image = node.xpath('.//img/@src')[0]
    
                    # 段子内容
                    content = node.xpath('.//a[@class="recmd-content"]')[0].text
    
                    # 点赞
                    like = node.xpath('.//div[@class="recmd-num"]/span')[0].text
    
                    # 评论
                    try:
                        comments = node.xpath('.//div[@class="recmd-num"]/span')[3].text
                    except IndexError:
                        comments = 0
    
                    items = {
                        "username": username,
                        "image": image,
                        "content": content,
                        "like": like,
                        "comments": comments
                    }
    
    
                    # with后面有两个必须执行的操作:__enter__和__exit__,打开和关闭
                    # 不管里面的操作如何,都会直接打开和关闭功能
                    # 打开锁,向文件添加内容,释放锁
    
                    with self.lock:
                        # 写入解析后的数据
                        self.localFile.write(json.dumps(items,ensure_ascii=False)+"
    ")
                except:
                    pass
    

    在多线程开发中,为了维护资源的完整性,在访问共享资源时使用共享锁lock.线程获得了锁之后,才可以访问文件localFile,并往里写入数据,写入完毕后,将锁释放,其他线程就可以访问这个文件.同一时刻,只允许一个协程访问该文件

    def main():

    页码队列,存储10个页码,先进先出

    pageQueue=Queue(10)

    for i in range(1,11):

    pageQueue.put(i)

    # 采集结果(网页的HTML源代码)的数据队列,参数为空表示不限制
    dataQueue=Queue()
    # 以追加的方式打开本地文件
    localFile=open("duanzi.json","wb+")
    # 互斥锁
    lock=threading.Lock()
    
    # 3个采集线程的名字
    crawlList=["采集线程1号","采集线程2号","采集线程3号"]
    # 创建,启动和存储3个采集线程
    threadCrawls=[]
    
    for threadName in crawlList:
        thread=ThreadCrawl(threadName,pageQueue,dataQueue)
        thread.start()
        threadCrawls.append(thread)
    
    # 3个解析线程的名字
    parseList=["解析线程1号","解析线程2号","解析线程3号"]
    # 创建,启动和存储3个解析线程
    threadParses=[]
    for threadName in parseList:
        thread=ThreadParse(threadName,dataQueue,localFile,lock)
        thread.start()
        threadParses.append(thread)
    
    while not pageQueue.empty():
        pass
    
    # 如果pageQueue为空,采集线程退出循环
    global CRAWL_EXIT
    CRAWL_EXIT=True
    
    print("pageQueue为空
    ")
    
    for thread in threadCrawls:
        # 阻塞子线程
        thread.join()
    
    while not dataQueue.empty():
        pass
    
    print("dataQueue为空")
    
    global PARSE_EXIT
    PARSE_EXIT=True
    
    for thread in threadParses:
        thread.join()
    
    with lock:
        # 关闭文件,在关闭之前,内容都在内存里
        localFile.close()
    

    if name==“main”:

    main()

    3.协程实现

    在上面实现的多线程爬虫中,分别开启了3个采集线程爬取网页和3个解析线程来解析网页,提供了程序执行的效率.但是,线程是交由CPU调度的,每个时间片段中只能有一个线程执行.而协程是在一个线程内部执行,一旦遇到了网络I/O阻塞,它就会立刻切换到另一个协程中执行,通过不断的轮询,降低了爬取网页的时间.对于爬虫而言,协程和多线程在效率上没有很大的不同

    使用协程来实现爬虫,具体步骤如下:

    1. 定义一个负责爬虫的类,所有的爬虫工作完全交由该类负责
    2. 使用一个队列data_queue保存所有的数据
    3. 创建多个协程任务,每个协程都会使用页码构建完整的网址,访问网址爬取和提取有用的数据,并保存到数据队列中,直到所有网页中的数据提取出来
    4. 将data_queue队列中的数据全部提取出来,保存到本地文件duanzi.txt中
    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author LQ6H
    
    
    import requests
    from queue import Queue
    import time
    from lxml import etree
    import gevent
    
    class Spider(object):
        def __init__(self):
            self.headers={
                "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36"
            }
    
            self.base_url="https://www.qiushibaike.com/8hr/page/"
            # 创建保存数据的队列
            self.data_queue=Queue()
            # 统计数量
            self.count=0
    
        def send_request(self,url):
            print("[INFO]:正在爬取"+url)
            html=requests.get(url,headers=self.headers).content
            # 每次请求间隔1s
            time.sleep(1)
            self.parse_page(html)
    
        def parse_page(self,html):
            html_ogj=etree.HTML(html)
            node_list=html_ogj.xpath('//li[contains(@id,"qiushi_tag")]')
    
            for node in node_list:
                try:
    
                    # 获取用户名
                    username = node.xpath('.//span[@class="recmd-name"]/text()')
    
                    # 图片链接
                    image = node.xpath('.//img/@src')[0]
    
                    # 段子内容
                    content = node.xpath('.//a[@class="recmd-content"]')[0].text
    
                    # 点赞
                    like = node.xpath('.//div[@class="recmd-num"]/span')[0].text
    
                    # 评论
                    try:
                        comments = node.xpath('.//div[@class="recmd-num"]/span')[3].text
                    except IndexError:
                        comments = 0
    
                    items = {
                        "username": username,
                        "image": image,
                        "content": content,
                        "like": like,
                        "comments": comments
                    }
    
                    self.count+=1
                    self.data_queue.put(items)
                except:
                    pass
    
        def start_work(self):
            job_list=[]
            for page in range(1,11):
                # 构建一个协程任务对象
                url=self.base_url+str(page)+"/"
                job=gevent.spawn(self.send_request,url)
    
                # 保存所有的协程任务
                job_list.append(job)
    
            # joinall()接收一个列表,将列表中的所有协程任务添加到任务队列里执行
            gevent.joinall(job_list)
    
            local_file=open("duanzi.json","wb+")
    
            while not self.data_queue.empty():
                content=self.data_queue.get()
                result=str(content).encode("utf-8")
                local_file.write(result+b"
    ")
    
            local_file.close()
            print(self.count)
    
    if __name__=="__main__":
        spider=Spider()
        spider.start_work()
    

    4.性能分析

    使用单线程,多线程和协程实现数据的爬取后,通过计算这3种方式下的耗时情况,比较三种爬虫的效率

    首先导入time模块,然后计算main()函数执行之后与之前的时间差,或者计算调用start_work()方法之前与调用之后的时间差

    计算main()函数执行前后时间差:

    if __name__=="__main__":
        startTime=time.time()
        main()
        print(time.time()-startTime)
    

    计算start_work()方法调用前后时间差:

    if __name__=="__main__":
        spider=Spider()
        start=time.time()
        spider.start_work()
        print("[INFO]:Using time%f secend"%(time.time()-start))
    
  • 相关阅读:
    MongoDB一次节点宕机引发的思考(源码剖析)【华为云分享】
    JavaScript基础修炼(14)——WebRTC在浏览器中如何获得指定格式的PCM数据【华为云分享】
    如何通过虚拟私有云保障服务安全【华为云分享】
    网络服务家族图谱:一张图带您了解华为云网络服务大家族!【华为云分享】
    化鲲为鹏,我有话说:鲲鹏服务器开通流程以及注意事项【华为云分享】
    hadoop小知识札记
    抽取网页中的主要内容
    Hadoop Bloom Filter 使用
    Bloom filter 2
    Bloom Filter
  • 原文地址:https://www.cnblogs.com/LQ6H/p/12940570.html
Copyright © 2011-2022 走看看