zoukankan      html  css  js  c++  java
  • Python实现基于协程的异步爬虫

    Python实现基于协程的异步爬虫

    一、课程介绍

    1. 课程来源

    本课程核心部分来自《500 lines or less》项目,作者是来自 MongoDB 的工程师 A. Jesse Jiryu Davis 与 Python 之父 Guido van Rossum。项目代码使用 MIT 协议,项目文档使用 http://creativecommons.org/licenses/by/3.0/legalcode 协议。

    课程内容在原文档基础上做了稍许修改,增加了部分原理介绍,步骤的拆解分析及源代码注释。

    2. 内容简介

    传统计算机科学往往将大量精力放在如何追求更有效率的算法上。但如今大部分涉及网络的程序,它们的时间开销主要并不是在计算上,而是在维持多个Socket连接上。亦或是它们的事件循环处理的不够高效导致了更多的时间开销。对于这些程序来说,它们面临的挑战是如何更高效地等待大量的网络事件并进行调度。目前流行的解决方式就是使用异步I/O。

    本课程将探讨几种实现爬虫的方法,从传统的线程池到使用协程,每节课实现一个小爬虫。另外学习协程的时候,我们会从原理入手,以ayncio协程库为原型,实现一个简单的异步编程模型。

    本课程实现的爬虫为爬一个整站的爬虫,不会爬到站点外面去,且功能较简单,主要目的在于学习原理,提供实现并发与异步的思路,并不适合直接改写作为日常工具使用。

    3. 课程知识点

    本课程项目完成过程中,我们将学习:

    1. 线程池实现并发爬虫
    2. 回调方法实现异步爬虫
    3. 协程技术的介绍
    4. 一个基于协程的异步编程模型
    5. 协程实现异步爬虫

    二、实验环境

    本课程使用Python 3.4,所以本课程内运行py脚本都是使用python3命令。

    打开终端,进入 Code 目录,创建 crawler 文件夹, 并将其作为我们的工作目录。

    $ cd Code
    $ mkdir crawler && cd crawler
    
    
    

    环保起见,测试爬虫的网站在本地搭建。

    我们使用 Python 2.7 版本官方文档作为测试爬虫用的网站

    wget http://labfile.oss.aliyuncs.com/courses/574/python-doc.zip
    unzip python-doc.zip
    
    
    

    安装serve,一个用起来很方便的静态文件服务器:

    sudo npm install -g serve 
    
    
    

    启动服务器:

    serve python-doc
    
    
    

    如果访问不了npm的资源,也可以用以下方式开启服务器:

    ruby -run -ehttpd python-doc -p 3000
    
    
    

    访问localhost:3000查看网站:

    此处输入图片的描述

    三、实验原理

    什么是爬虫?

    网络爬虫(又被称为网页蜘蛛,网络机器人,在FOAF社区中间,更经常的称为网页追逐者),是一种按照一定的规则,自动地抓取万维网信息的程序或者脚本。

    爬虫的工作流程

    网络爬虫基本的工作流程是从一个根URL开始,抓取页面,解析页面中所有的URL,将还没有抓取过的URL放入工作队列中,之后继续抓取工作队列中的URL,重复抓取、解析,将解析到的url放入工作队列的步骤,直到工作队列为空为止。

    线程池、回调、协程

    我们希望通过并发执行来加快爬虫抓取页面的速度。一般的实现方式有三种:

    1. 线程池方式:开一个线程池,每当爬虫发现一个新链接,就将链接放入任务队列中,线程池中的线程从任务队列获取一个链接,之后建立socket,完成抓取页面、解析、将新连接放入工作队列的步骤。
    2. 回调方式:程序会有一个主循环叫做事件循环,在事件循环中会不断获得事件,通过在事件上注册解除回调函数来达到多任务并发执行的效果。缺点是一旦需要的回调操作变多,代码就会非常散,变得难以维护。
    3. 协程方式:同样通过事件循环执行程序,利用了Python 的生成器特性,生成器函数能够中途停止并在之后恢复,那么原本不得不分开写的回调函数就能够写在一个生成器函数中了,这也就实现了协程。

    四、实验一:线程池实现爬虫

    使用socket抓取页面需要先建立连接,之后发送GET类型的HTTP报文,等待读入,将读到的所有内容存入响应缓存。

    def fetch(url):
        sock = socket.socket()
        sock.connect(('localhost.com', 3000))
        request = 'GET {} HTTP/1.0
    Host: localhost
    
    '.format(url)
        sock.send(request.encode('ascii'))
        response = b''
        chunk = sock.recv(4096)
        while chunk:
            response += chunk
            chunk = sock.recv(4096)
    
        links = parse_links(response)
        q.add(links)
    
    
    

    默认的socket连接与读写是阻塞式的,在等待读入的这段时间的CPU占用是被完全浪费的。

    多线程

    默认这部分同学们都是学过的,所以就粗略记几个重点,没学过的同学可以直接参考廖雪峰的教程:廖雪峰的官方网站-Python多线程

    导入线程库:

    import threading
    
    
    

    开启一个线程的方法:

    t = 你新建的线程
    t.start()   #开始运行线程
    t.join()    #你的当前函数就阻塞在这一步直到线程运行完
    
    
    

    建立线程的两种方式:

    #第一种:通过函数创建线程
    def 函数a():
        pass
    t = threading.Thread(target=函数a,name=自己随便取的线程名字)
    
    #第二种:继承线程类
    class Fetcher(threading.Thread):
        def __init__(self):
            Thread.__init__(self):
            #加这一步后主程序中断退出后子线程也会跟着中断退出
            self.daemon = True
        def run(self):
            #线程运行的函数
            pass
    t = Fetcher()
    
    
    

    线程同时操作一个全局变量时会产生线程竞争所以需要锁:

    lock = threading.Lock()
    
    lock.acquire()      #获得锁
    #..操作全局变量..
    lock.release()      #释放锁
    
    
    
    

    多线程同步-队列

    默认这部分同学们都是学过的,所以就粗略记几个重点,没学过的同学可以直接参考PyMOTW3-queue — Thread-safe FIFO Implementation中文翻译版

    多线程同步就是多个线程竞争一个全局变量时按顺序读写,一般情况下要用锁,但是使用标准库里的Queue的时候它内部已经实现了锁,不用程序员自己写了。

    导入队列类:

    from queue import Queue
    
    
    

    创建一个队列:

    q = Queue(maxsize=0)
    
    
    

    maxsize为队列大小,为0默认队列大小可无穷大。

    队列是先进先出的数据结构:

    q.put(item) #往队列添加一个item,队列满了则阻塞
    q.get(item) #从队列得到一个item,队列为空则阻塞
    
    
    

    还有相应的不等待的版本,这里略过。

    队列不为空,或者为空但是取得item的线程没有告知任务完成时都是处于阻塞状态

    q.join()    #阻塞直到所有任务完成
    
    
    

    线程告知任务完成使用task_done

    q.task_done() #在线程内调用
    
    
    

    实现线程池

    创建thread.py文件作为爬虫程序的文件。

    我们使用seen_urls来记录已经解析到的url地址:

    seen_urls = set(['/'])
    
    
    
    

    创建Fetcher类:

    class Fetcher(Thread):
        def __init__(self, tasks):
            Thread.__init__(self)
            #tasks为任务队列
            self.tasks = tasks
            self.daemon = True    
            self.start()
    
        def run(self):
            while True:
                url = self.tasks.get()
                print(url)
                sock = socket.socket()
                sock.connect(('localhost', 3000))
                get = 'GET {} HTTP/1.0
    Host: localhost
    
    '.format(url)
                sock.send(get.encode('ascii'))
                response = b''
                chunk = sock.recv(4096)
                while chunk:
                    response += chunk
                    chunk = sock.recv(4096)
    
                #解析页面上的所有链接
                links = self.parse_links(url, response)
    
                lock.acquire()
                #得到新链接加入任务队列与seen_urls中
                for link in links.difference(seen_urls):
                    self.tasks.put(link)
                seen_urls.update(links)    
                lock.release()
                #通知任务队列这个线程的任务完成了
                self.tasks.task_done()
    
    
    
    

    使用正则库与url解析库来解析抓取的页面,这里图方便用了正则,同学也可以用Beautifulsoup等专门用来解析页面的Python库:

    import urllib.parse
    import re
    
    
    

    Fetcher中实现parse_links解析页面:

    def parse_links(self, fetched_url, response):
        if not response:
            print('error: {}'.format(fetched_url))
            return set()
        if not self._is_html(response):
            return set()
    
        #通过href属性找到所有链接
        urls = set(re.findall(r'''(?i)href=["']?([^s"'<>]+)''',
                              self.body(response)))
    
        links = set()
        for url in urls:
            #可能找到的url是相对路径,这时候就需要join一下,绝对路径的话就还是会返回url
            normalized = urllib.parse.urljoin(fetched_url, url)
            #url的信息会被分段存在parts里
            parts = urllib.parse.urlparse(normalized)
            if parts.scheme not in ('', 'http', 'https'):
                continue
            host, port = urllib.parse.splitport(parts.netloc)
            if host and host.lower() not in ('localhost'):
                continue
            #有的页面会通过地址里的#frag后缀在页面内跳转,这里去掉frag的部分
            defragmented, frag = urllib.parse.urldefrag(parts.path)
            links.add(defragmented)
    
        return links
    
    #得到报文的html正文
    def body(self, response):
        body = response.split(b'
    
    ', 1)[1]
        return body.decode('utf-8')
    
    def _is_html(self, response):
        head, body = response.split(b'
    
    ', 1)
        headers = dict(h.split(': ') for h in head.decode().split('
    ')[1:])
        return headers.get('Content-Type', '').startswith('text/html')
    
    
    
    

    实现线程池类与main的部分:

    class ThreadPool:
        def __init__(self, num_threads):
            self.tasks = Queue()
            for _ in range(num_threads):
                Fetcher(self.tasks)
    
        def add_task(self, url):
            self.tasks.put(url)
    
        def wait_completion(self):
            self.tasks.join()
    
    if __name__ == '__main__':
        start = time.time()
        #开4个线程
        pool = ThreadPool(4)
        #从根地址开始抓取页面
        pool.add_task("/")
        pool.wait_completion()
        print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
    
    
    
    

    运行效果

    这里先贴出完整代码:

    from queue import Queue 
    from threading import Thread, Lock
    import urllib.parse
    import socket
    import re
    import time
    
    seen_urls = set(['/'])
    lock = Lock()
    
    
    class Fetcher(Thread):
        def __init__(self, tasks):
            Thread.__init__(self)
            self.tasks = tasks
            self.daemon = True
    
            self.start()
    
        def run(self):
            while True:
                url = self.tasks.get()
                print(url)
                sock = socket.socket()
                sock.connect(('localhost', 3000))
                get = 'GET {} HTTP/1.0
    Host: localhost
    
    '.format(url)
                sock.send(get.encode('ascii'))
                response = b''
                chunk = sock.recv(4096)
                while chunk:
                    response += chunk
                    chunk = sock.recv(4096)
    
                links = self.parse_links(url, response)
    
                lock.acquire()
                for link in links.difference(seen_urls):
                    self.tasks.put(link)
                seen_urls.update(links)    
                lock.release()
    
                self.tasks.task_done()
    
        def parse_links(self, fetched_url, response):
            if not response:
                print('error: {}'.format(fetched_url))
                return set()
            if not self._is_html(response):
                return set()
            urls = set(re.findall(r'''(?i)href=["']?([^s"'<>]+)''',
                                  self.body(response)))
    
            links = set()
            for url in urls:
                normalized = urllib.parse.urljoin(fetched_url, url)
                parts = urllib.parse.urlparse(normalized)
                if parts.scheme not in ('', 'http', 'https'):
                    continue
                host, port = urllib.parse.splitport(parts.netloc)
                if host and host.lower() not in ('localhost'):
                    continue
                defragmented, frag = urllib.parse.urldefrag(parts.path)
                links.add(defragmented)
    
            return links
    
        def body(self, response):
            body = response.split(b'
    
    ', 1)[1]
            return body.decode('utf-8')
    
        def _is_html(self, response):
            head, body = response.split(b'
    
    ', 1)
            headers = dict(h.split(': ') for h in head.decode().split('
    ')[1:])
            return headers.get('Content-Type', '').startswith('text/html')
    
    
    class ThreadPool:
        def __init__(self, num_threads):
            self.tasks = Queue()
            for _ in range(num_threads):
                Fetcher(self.tasks)
    
        def add_task(self, url):
            self.tasks.put(url)
    
        def wait_completion(self):
            self.tasks.join()
    
    if __name__ == '__main__':
        start = time.time()
        pool = ThreadPool(4)
        pool.add_task("/")
        pool.wait_completion()
        print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
    
    
    
    

    运行python3 thread.py命令查看效果(记得先开网站服务器):

    此处输入图片的描述

    使用标准库中的线程池

    线程池直接使用multiprocessing.pool中的ThreadPool

    代码更改如下:

    from multiprocessing.pool import ThreadPool
    
    #...省略中间部分...
    #...去掉Fetcher初始化中的self.start()
    #...删除自己实现的ThreadPool...
    
    if __name__ == '__main__':
        start = time.time()
        pool = ThreadPool()
        tasks = Queue()
        tasks.put("/")
        Workers = [Fetcher(tasks) for i in range(4)]
        pool.map_async(lambda w:w.run(), Workers)
        tasks.join() 
        pool.close()
    
        print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
    
    
    

    使用ThreadPool时,它处理的对象可以不是线程对象,实际上Fetcher的线程部分ThreadPool根本用不到。因为它自己内部已开了几个线程在等待任务输入。这里偷个懒就只把self.start()去掉了。可以把Fetcher的线程部分全去掉,效果是一样的。

    ThreadPool活用了map函数,这里它将每一个Fetcher对象分配给线程池中的一个线程,线程调用了Fetcherrun函数。这里使用map_async是因为不希望它在那一步阻塞,我们希望在任务队列join的地方阻塞,那么到队列为空且任务全部处理完时程序就会继续执行了。

    运行python3 thread.py命令查看效果:

    此处输入图片的描述

    线程池实现的缺陷

    我们希望爬虫的性能能够进一步提升,但是我们没办法开太多的线程,因为线程的内存开销很大,每创建一个线程可能需要占用50k的内存。以及还有一点,网络程序的时间开销往往花在I/O上,socket I/O 阻塞时的那段时间是完全被浪费了的。那么要如何解决这个问题呢?

    下节课你就知道啦,下节课见~

  • 相关阅读:
    WPF基础篇之控件模板(ControlTemplate)
    WPF基础篇之移动特效
    WPF基础篇之空间布局
    00-API-Mongoose
    00-API-Vue
    博客园皮肤设置
    15-Node
    16-Vue-A
    15-MongoDB
    14-电商项目
  • 原文地址:https://www.cnblogs.com/mrchige/p/6425685.html
Copyright © 2011-2022 走看看