zoukankan      html  css  js  c++  java
  • 实验二:事件驱动-回调函数实现爬虫

    承接上一节课,我们在上节课中提到,socket I/O 阻塞时的那段时间完全被浪费了,那么要如何节省下那一段时间呢?

    非阻塞I/O

    如果使用非阻塞I/O,它就不会傻傻地等在那里(比如等连接、等读取),而是会返回一个错误信息,虽然说是说错误信息,它其实就是叫你过一会再来的意思,编程的时候都不把它当错误看。

    非阻塞I/O代码如下:

    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect(('xkcd.com', 80))
    except BlockingIOError:
        pass
    
    
    

    这里抛出的异常无视掉就可以了。

    单线程上的多I/O

    有了非阻塞I/O这个特性,我们就能够实现单线程上多个sockets的处理了,学过C语言网络编程的同学应该都认识select这个函数吧?不认识也不要紧,select函数如果你不设置它的超时时间它就是默认一直阻塞的,只有当有I/O事件发生时它才会被激活,然后告诉你哪个socket上发生了什么事件(读|写|异常),在Python中也有select,还有跟select功能相同但是更高效的poll,它们都是底层C函数的Python实现。

    不过这里我们不使用select,而是用更简单好用的DefaultSelector,是Python 3.4后才出现的一个模块里的类,你只需要在非阻塞socket和事件上绑定回调函数就可以了。

    代码如下:

    from selectors import DefaultSelector, EVENT_WRITE
    
    selector = DefaultSelector()
    
    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect(('localhost', 3000))
    except BlockingIOError:
        pass
    
    def connected():
        selector.unregister(sock.fileno())
        print('connected!')
    
    selector.register(sock.fileno(), EVENT_WRITE, connected)
    
    
    
    

    这里看一下selector.register的原型

    register(fileobj, events, data=None)
    
    
    

    其中fileobj可以是文件描述符也可以是文件对象(通过fileno得到),events是位掩码,指明发生的是什么事件,data 则是与指定文件(也就是我们的socket)与指定事件绑定在一起的数据。

    如代码所示,selector.register 在该socket的写事件上绑定了回调函数connected(这里作为数据绑定)。在该socket上第一次发生的写事件意味着连接的建立,connected函数在连接建立成功后再解除了该socket上所有绑定的数据。

    事件驱动

    看了以上selector的使用方式,我想你会发现它很适合写成事件驱动的形式。

    我们可以创建一个事件循环,在循环中不断获得I/O事件:

    def loop():
        while True:
         events = selector.select()
            #遍历事件并调用相应的处理
            for event_key, event_mask in events:
             callback = event_key.data
                callback()
    
    
    

    其中events_key是一个namedtuple,它的结构大致如下(fileobj,fd,events,data),我们从data得到之前绑定的回调函数并调用。 event_mask则是事件的位掩码。

    关于selectors的更多内容,可参考官方文档: https://docs.python.org/3.4/library/selectors.html

    完成后续工作

    现在我们已经明白了基于回调函数实现事件驱动是怎么一回事了,接着来完成我们的爬虫吧。

    首先创建两个set,一个是待处理url的集合,一个是已抓取url的集合,同时初始化为根url '/'

    urls_todo = set(['/'])
    seen_urls = set(['/'])
    
    
    
    

    抓取一个页面会需要许多回调函数。比如connected,它会在连接建立成功后向服务器发送一个GET请求请求页面。当然它不会干等着服务器响应(那就阻塞了),而是再绑定另一个接收响应的回调函数read_response。如果read_response在事件触发时无法一次性读取完整的响应,那么就会等下次事件触发时继续读取,直到读取到了完整的响应才解除绑定。

    我们将这些回调函数封装在 Fetcher 类中。它有三个成员变量:抓取的urlsocket对象与得到的服务器响应response

    class Fetcher:
        def __init__(self, url):
            self.response = b'' 
            self.url = url
            self.sock = None
    
    
    

    实现fetch函数,绑定connected

        # 在Fetcher类中实现
        def fetch(self):
            self.sock = socket.socket()
            self.sock.setblocking(False)
            try:
                self.sock.connect(('xkcd.com', 80))
            except BlockingIOError:
                pass
    
            selector.register(self.sock.fileno(),
                              EVENT_WRITE,
                              self.connected)
    
    
    

    注意到fetch函数在内部调用connect尝试建立socket连接并绑定回调函数,I/O的处理则都是交给事件循环控制的。Fetcher与事件循环的关系如下:

    # Begin fetching http://xkcd.com/353/
    fetcher = Fetcher('/353/')
    fetcher.fetch()
    
    # 事件循环
    while True:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key, event_mask)
    
    
    
    

    connected的实现:

    def connected(self, key, mask):
        print('connected!')
        #解除该socket上的所有绑定
        selector.unregister(key.fd)
        request = 'GET {} HTTP/1.0
    Host: xkcd.com
    
    '.format(self.url)
        self.sock.send(request.encode('ascii'))
    
        # 连接建立后绑定读取响应的回调函数
        selector.register(key.fd,
                          EVENT_READ,
                          self.read_response)
    
    
    
    

    read_response的实现:

    def read_response(self, key, mask):
        global stopped
    
        chunk = self.sock.recv(4096)  # 每次接收最多4K的信息
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)  # 完成接收则解除绑定
            links = self.parse_links()
    
            # Python set-logic:
            for link in links.difference(seen_urls):
                urls_todo.add(link)
                Fetcher(link).fetch()  # 抓取新的url
    
            seen_urls.update(links)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True         # 当抓取队列为空时结束事件循环
    
    
    

    parse_links 如上一节课,它的作用是返回抓取到的页面中的所有发现的url的集合。 parse_links 之后,遍历了每一个没抓取过的url并为其创建一个新的Fetcher 对象并调用fetch 函数开始抓取。

    parse_links等其它函数的实现:

    def body(self):
        body = self.response.split(b'
    
    ', 1)[1]
        return body.decode('utf-8')
    
    def parse_links(self):
        if not self.response:
            print('error: {}'.format(self.url))
            return set()
        if not self._is_html():
            return set()
        urls = set(re.findall(r'''(?i)href=["']?([^s"'<>]+)''',
                              self.body()))
    
        links = set()
        for url in urls:
            normalized = urllib.parse.urljoin(self.url, url)
            parts = urllib.parse.urlparse(normalized)
            if parts.scheme not in ('', 'http', 'https'):
                continue
            host, port = urllib.parse.splitport(parts.netloc)
            if host and host.lower() not in ('xkcd.com', 'www.xkcd.com'):
                continue
            defragmented, frag = urllib.parse.urldefrag(parts.path)
            links.add(defragmented)
    
        return links
    
    def _is_html(self):
        head, body = self.response.split(b'
    
    ', 1)
        headers = dict(h.split(': ') for h in head.decode().split('
    ')[1:])
        return headers.get('Content-Type', '').startswith('text/html')
    
    
    
    

    将事件循环改为stopped时停止:

    start = time.time()
    stopped = False
    
    def loop():
        while not stopped:
            events = selector.select()
            for event_key, event_mask in events:
                callback = event_key.data
                callback()
    
    print('{} URLs fetched in {:.1f} seconds'.format(
        len(seen_urls), time.time() - start))
    
    
    

    运行效果

    这里先奉上完整代码:

    from selectors import *
    import socket
    import re
    import urllib.parse
    import time
    
    
    urls_todo = set(['/'])
    seen_urls = set(['/'])
    #追加了一个可以看最高并发数的变量
    concurrency_achieved = 0
    selector = DefaultSelector()
    stopped = False
    
    
    class Fetcher:
        def __init__(self, url):
            self.response = b''
            self.url = url
            self.sock = None
    
        def fetch(self):
            global concurrency_achieved
            concurrency_achieved = max(concurrency_achieved, len(urls_todo))
    
            self.sock = socket.socket()
            self.sock.setblocking(False)
            try:
                self.sock.connect(('localhost', 3000))
            except BlockingIOError:
                pass
            selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
    
        def connected(self, key, mask):
            selector.unregister(key.fd)
            get = 'GET {} HTTP/1.0
    Host: localhost
    
    '.format(self.url)
            self.sock.send(get.encode('ascii'))
            selector.register(key.fd, EVENT_READ, self.read_response)
    
        def read_response(self, key, mask):
            global stopped
    
            chunk = self.sock.recv(4096)  # 4k chunk size.
            if chunk:
                self.response += chunk
            else:
                selector.unregister(key.fd)  # Done reading.
                links = self.parse_links()
                for link in links.difference(seen_urls):
                    urls_todo.add(link)
                    Fetcher(link).fetch()
    
                seen_urls.update(links)
                urls_todo.remove(self.url)
                if not urls_todo:
                    stopped = True
                print(self.url)
    
        def body(self):
            body = self.response.split(b'
    
    ', 1)[1]
            return body.decode('utf-8')
    
        def parse_links(self):
            if not self.response:
                print('error: {}'.format(self.url))
                return set()
            if not self._is_html():
                return set()
            urls = set(re.findall(r'''(?i)href=["']?([^s"'<>]+)''',
                                  self.body()))
    
            links = set()
            for url in urls:
                normalized = urllib.parse.urljoin(self.url, url)
                parts = urllib.parse.urlparse(normalized)
                if parts.scheme not in ('', 'http', 'https'):
                    continue
                host, port = urllib.parse.splitport(parts.netloc)
                if host and host.lower() not in ('localhost'):
                    continue
                defragmented, frag = urllib.parse.urldefrag(parts.path)
                links.add(defragmented)
    
            return links
    
        def _is_html(self):
            head, body = self.response.split(b'
    
    ', 1)
            headers = dict(h.split(': ') for h in head.decode().split('
    ')[1:])
            return headers.get('Content-Type', '').startswith('text/html')
    
    
    start = time.time()
    fetcher = Fetcher('/')
    fetcher.fetch()
    
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key, event_mask)
    
    print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format(
        len(seen_urls), time.time() - start, concurrency_achieved))
    
    
    
    
    

    输入python3 callback.py命令查看效果。不要忘了先开网站的服务器哦。

    此处输入图片的描述

    基于回调函数实现的缺陷

    想想之前从建立连接到读取响应到解析新的url到工作队列中,这一切都能够在一个函数中完成,就像下面这样:

    def fetch(url):
        sock = socket.socket()
        sock.connect(('localhost', 3000))
        request = 'GET {} HTTP/1.0
    Host: localhost
    
    '.format(url)
        sock.send(request.encode('ascii'))
        response = b''
        chunk = sock.recv(4096)
        while chunk:
            response += chunk
            chunk = sock.recv(4096)
    
        links = parse_links(response)
        q.add(links)
    
    
    

    而用回调函数实现,整个整体就支离破碎了,哪里阻塞就又不得不在那里把函数一切为二,代码会显得非常乱,维护也变的很麻烦。更麻烦的是如果在回调函数中抛出了异常,你根本得不到什么有用的信息:

    Traceback (most recent call last):
      File "loop-with-callbacks.py", line 111, in <module>
        loop()
      File "loop-with-callbacks.py", line 106, in loop
        callback(event_key, event_mask)
      File "loop-with-callbacks.py", line 51, in read_response
        links = self.parse_links()
      File "loop-with-callbacks.py", line 67, in parse_links
        raise Exception('parse error')
    Exception: parse error
    
    
    

    你看不到这个回调函数的上下文是什么,你只知道它在事件循环里。你想在这个函数外抓取它的异常都没地方下手。但是这又是回调实现无法避免的缺陷,那我们想实现并发异步应该怎么办咧?

  • 相关阅读:
    RFID Hacking②:PM3入门指南
    技术解析:锁屏绕过,三星Galaxy系列手机也能“被”呼出电话
    技术分享:逆向破解华为路由器第一部分
    GSM BTS Hacking: 利用BladeRF和开源BTS 5搭建基站
    js生成随即字符串
    ES6 对象解构
    vue隐藏APP启动时显示的{{}}
    国内常用的三种框架:ionic/mui/framework7对比
    vue for 绑定事件
    html5视频全频播放
  • 原文地址:https://www.cnblogs.com/mrchige/p/6498363.html
Copyright © 2011-2022 走看看