zoukankan      html  css  js  c++  java
  • python爬虫之如何实现并发

    在学习爬虫过程中,我发现使用并发的方案是必不可少的!实现并发的方式有很多,如多线程、多进程还有异步IO等等。在实际运用中,我也只是会各种调包实现并发,但是内部怎么实现的却是一点都不知道。奈何我有一颗好奇的心,这篇文章就是来写一下我学习实现并发的几个方案和python中实现并发的常用模块以及分析它们底层是怎么实现的!

    爬虫性能相关

    爬虫是基于浏览器客户端实现的一种技术!

    在使用爬虫的时候,不难发现我们使用requests模块帮助我们来发请求,如果服务端没有返回响应,那么我们的程序就会像二愣子一样傻不愣登的死等下去(单线程方案)。

    上面的方法显然效率是特别低的。聪明的做法应该是“同时”去发多个请求,某一请求完成后“告诉一下我”,我再去拿数据搞事情。

    我们所熟识的能够“同时”帮我们做操作,实现并发效果的方法有多线程和多进程还有异步IO。因为这些方法都是能大大提高我们爬虫性能滴!所以我们也需要学习掌握这些方法的使用和原理

    下面我来分享下关于学习利用多线程、多进程和异步IO这三种常见的实现并发的方案是怎么去实现”并发“和”回调“一些学习心得!

    参考文献


    多线程实现并发请求

    使用线程池开多线程

    from concurrent.futures import ThreadPoolExecutor
    import requests
    
    def task(url):
        """
        下载页面
        为了方便区分我们这里把该函数叫做主体函数,主体函数执行完后会触发回调函数
        """
        response = requests.get(url)
        print(url,response)
        reuten response
    
    def done(future,*args,**kwargs):
        """
        回调函数
        """
        print(future,args,kwargs)  # 回调函数不直接把response传入,而是封装成一个future对象
        response = future.result()  # 通过future对象的result方法可以取到我们的返回值
    
    pool = ThreadPoolExecutor(5)
    url_list = [
        'http://www.baidu,com',
        'http://www.pianshen.com/article/292812067/',
        'http://deehuang.github.io',
        'http://www.zhihu.com',
        'http://www.sina.com'
    ]
    
    for url in url_list:
        v = pool.submit(task,url)  # 使用线程池中的线程去执行task函数,把url作为实参传入
        v.add_done_callback(done)  # 添加回调函数,这里表示task执行完后执行done方法
    pool.shutdown(wait=True)  # 关闭线程池,Wait是等待所有调度的线程执行完后再关闭
    

    如果我们希望线程执行完task函数后再执行一个回调函数,我们可以使用一个变量去接收pool.sumit(task,url)的返回值,然后给这个返回值可以添加一个回调函数add_done_callback(done)
    上面的例子中,我们执行完task后传入回调函数中的参数是一个future对象(这是add_done_callback()帮我封装的,它除了封装了返回值还有别的东西),而不直接是task返回的response对象。通过future.result()就可以拿到我们的requests请求返回的response对象!
    PS:实际运用中,我们也可以不使用回调函数,可以直接把所有的逻辑代码写到主体函数中,使用和不使用回调都是多线程方式,这两种多线程方式只不过编写方式不一样而已(使用回调函数代码耦合会低一些)。

    这就是我们使用线程池实现多线程实现并发的发起请求操作的方法(有些小伙伴可能会"有开多线程难道非要用线程池?"的疑问。使用线程池是为了方便对线程的管理, 线程的创建和销毁的开销是巨大的,而通过线程池的重用大大减少了这些不必要的开销,当然既然少了这么多消费内存的开销,其线程执行速度也是突飞猛进的提升。 )

    多进程实现并发请求

    多进程实现并发方案跟多线程的实现的方案几乎是一模一样的。
    只不过我们引入的是线程池了

    from concurrent.futures import ProcessPoolExecutor
    pool = processPoolExecutor(8)
    v = pool.submit(task,url) # 使用进程池中的线程去执行task函数,把url作为实参传入
    v.add_done_callback(done) # 添加回调函数,这里表示task执行完后执行done方法


    总结一下多线程和多进程实现并发的方案:

    1.优点:可以实现并发

    2.缺点:如果请求发出去后和等待返回响应的这段时间里(IO阻塞),这些被调度的线程(进程)就处于没事做的空闲状态。我觉得挺浪费的(因为开启线程就占用了资源),既然占用资源我就想让线程(进程)不断地帮我做事情,如果在阻塞的事件中可以让这些线程(进程)去做别的事情,那真是美滋滋(这大概是所有老板的心声吧~)

    3.编写方式:

    • 直接返回处理
    • 通过回调函数处理

    4.两种方案的对比(从对比线程和进程的区别去探讨这个问题):

    • 线程是cpu工作的最小单元,线程存在进程里边,”它是真实地工作者“
    • 每开一个进程,该进程中的所有线程共享这个资源集里的资源 头脑风暴:进程就像是一个房子。这个房子里面人就是线程,该房子(线程)里的所有人共享这个房子的所有资源。通过这个例子不难想象,开进程是十分耗费资源的,相当于重搭一个房子。而使用多线程的方式就相当于在屋子里多放几个人而不用专门为每个人重搭一个房子
    • 什么时候用多进程?什么时候用多进程?
      因为python的GIL锁的原因(同一时刻只允许一个进程中的一个线程被cpu调度),所以对于计算密集型的程序我们使用多进程的方案会好一些;对于io密集型的程序用多线程会更好(遇到io操作的时候不用跟cpu交互,因为遇到Io操作cpu就会切到别的线程去执行,所以就好像不受GIL的限制)

    异步IO实现(伪)并发请求

    协程

    学习异步IO模型,我们首先要理解协程的概念:

    协程简单的理解为规定了让一个线程先执行一下一个函数1,执行到一半,再切换到另外一个函数2执行,然后又回去执行函数1(是从上次执行的状态继续往下执行)……。这样的过程就像是线程分片了(然线程先执行这个又执行下那个),所以它也被叫做微线程

    实现上面过程的方式就是用我们的生成器函数,在函数中使用yield返回函数值,就是一个生成器函数。当函数遇到yield就会跳出函数并返回yield所带的值,再次执行该函数的时候会从上一次yield的地方继续往下执行(实现了保存上下文状态)。

    协程的特点在于是一个线程执行。
    协程的最大优势是极高的执行效率,因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程的数量越多,协程的性能优势越明显。

    第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多

    更多的关于协程的学习可以参考这里

    总结:协程本质是一种上下文切换技术,通过生成器yield记录状态的特性来实现

    异步IO

    协程仅仅提供的功能只是切换,它只能单纯的执行下这个函数又去执行下别的函数……这样的过程。利用协程可以并发的发送多个请求(实际上不是并发只不过速度非常快看似并发)。即是用一个线程就能把所有的请求(放到不同的生成器函数中)并发的发送过去。但是单纯的它并不能帮我们完成任何的操作!

    异步IO的理解:如果一个IO被阻塞,系统会切换到其他的进程/线程/协程以便充分利用CPU。并且,在IO的数据返回后,能实时监测并实现自动回调这就是异步IO(异步强调的是回调功能)。如果不切换,一直等待这个IO完成,就是同步IO

    协程和异步IO的关系:异步IO的实现可以用多进程也可以用多线程,但是它们都存在一个限制,就是线程或进程的数量在一个操作系统中是有限的,同时它们切换也要消耗一定的资源。要知道很多时候我们要并发的代码块并不大,比如说只是get一个网页,因此,人们会把这个代码块进一步的从线程中脱离出来,就是利用我们的协程,它是微线程,能帮助我们实现像线程切换的效果(协程本质是一种上下文切换技术,通过生成器yield记录状态的特性来实现)。但是异步IO并不是说实现切换功能就行了,它最主要的功能是回调!怎么实现自动回调呢?我们后面再说(可以提前引入一下,利用”io多路复用“)

    利用协程+异步IO就可以完成并发的发送请求并且一旦某一个请求的结果返回时,回调继续执行该请求函数下面的任务。因为是一个线程处理多个http请求,所以当同一时刻有多个http请求返回的时候也只能处理一个请求,所以我们称异步IO方案实现的并发请求

    可能有的人还是有点不太理解上面是什么意思?不是协程自己就已经实现了并发的去发送请求吗?
    我们所说的并发HTTP请求实际上是包括了两个部分的:发送请求+处理响应内容。不是说你只是发个请求后面的事情就不管了。我们请求的目的不是就是为了拿数据搞事情吗?多线程和多进程每个进程遇到io操作是同步操作,所以不用考虑回调的问题。(就是说线程/进程一直等到数据返回了才会往下执行,这期间该线程/进程会被一直挂起)。

    协程完成的只是切换功能,但是什么时候切换呀!它不知道HTTP请求啥时候会返回!所以我们需要利用异步IO的功能(监听请求并回调)。

    总结:

    异步IO不仅需要能在遇到IO阻塞的操作时候能实现切换,还需要监听请求状态并自动回调! 自动回调! 自动回调!(重要的事情说三遍)。可以简单的傻瓜式理解为,协程就是切换,异步就是回调。
    所以协程+异步IO ==》 一个线程发送N个Http请求

    这里需要注意一下的是:
    异步IO的并非必须跟协程搭配。切换功能的实现可以使用多线程/进程(或者是后面会说到的无阻塞socket)。使用协程带给我们的好处是开销小并且能在碰到IO操作的进行切换(实现非阻塞)。还有!在使用异步io的监测到请求有响应时候能提供切回去,继续往下执行的效果。即给我们的回调实现了便捷!

    因为下面会讲到无阻塞socket+io多路复用(提前引入,异步IO的监测功能就它提供)。我一开始就总是疑惑协程不是跟异步IO有关系吗。在这里面用在了哪里?其实这个方案里并没有引用协程。。如果还想深究,看完全部文章后,可以去看一下这篇文章

    这里介绍协程最主要是因为紧接着要介绍的异步IO模块有些是通过协程去实现的!

    异步IO模块的使用

    Python作为一个"面向调包"的语言(调侃一下),这种单线程+异步IO来实现处理N多个请求的模块当然是必须存在的,我们慢慢的引入这些模块。

    asyncio 模块

    python3的内置模块asyncio可以帮助我们完成上面的操作

    """
    asyncio模块的固定用法
    """
    import asyncio
    
    @asyncio.coroutine
    def func1():
    	print('执行func1之前')
    	yield from asyncio.sleep(5)   #必须写的是yield form--->固定写法
    	print('执行func1结束')
    	
    tasks = [func1(),func1()]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close() 
    

    是不是把上述的代码改成发送http请求,获取结果不就实现了并发的发送处理http请求了呢?

    很遗憾的是asynico模块不支持HTTP请求,支持TCP请求。我们知道HTTP是基于TCP做的,所以我们可以根据这个协议来对这个模块做一个改造

    回顾一下HTTP和TCP请求:

    TCP:

    client = socket()
    client.connect()
    client.send(b'asdasdasdas')
    

    HTTP:
    HTTP实际上内部也是用的TCP请求的方法:

    client = socket()
    client.connect()
    client.send(data)
    

    只不过它在send()data内容是不一样的,就是说HTTP只是规定了一个数据格式
    它的格式是这样的:
    data = """GET %S HTTP/1.0\r\nHost:%s\r\n\r\n"""%(url,host)
    其实就是我们所说的Http请求三部分:请求首行、请求头、请求体

    PS:请求首行和请求头之间一个换行,请求头和请求头直接一个换行,请求头和请求体之间两个换行

    这样我们就可以通过自己构造http数据格式来通过asyncio模块去发HTTP请求了!

    """
    asyncio模块实现发送Http请求  --->固定用法
    """
    import asyncio
    
    @asyncio.coroutine
    def task(host,url='/'):
    	reader,writer = yield from asyncio.open_connection(host,80)# 创建连接,本质上就是通过socket创建一个连接
        
        request_headet_content = """GET %S HTTP/1.0\r\nHost:%s\r\n\r\n"""%(url,host)#我们构造的http请求的格式
        request_headet_content = bytes(request_header_content,encoding='utf-8')
        
        writer.write(request_header_content)  # TCP的sent
        yield from writer.drain()
        text = yield from reader.read()  # Tcp的recv
        print(host,utl,text)
        writer.close()  # 关闭socket
    	
    tasks = [task('deehuang.github.io','/link/'), # (host,url)
             task('deehuang.github.io','/tags/')]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close() 
    

    asyncio+aiohttp 模块

    asyncio模块可以实现异步IO的功能,但是对HTTP的不友好显然给我们带来了许多麻烦。我们每次都要自己去封装HTTP数据包,但是python作为”面向调包“的语言,它当然也有帮我们封装HTTP数据包的模块,就是我们的aiohttp模块(这不是一个内置模块,需要先安装pip install aiohttp再使用)

    asynico+aiohttp两者结合就方便地实现了使用异步IO发送HTTP请求了:

    """
    asyncio+aiohttp模块的搭配固定用法
    """
    import asyncio
    import aiohttp
    
    @asyncio.coroutine
    def task(url):
    	print(url)
        response = yield from aiohttp.reques('GET',url) #这一句代码就把上面我们自己封装Http数据包和创建socket连接,返回值就相当于是我们的socket链接对象
        data = yield form response.read()
        print(url,response)
        response.close()
        
    	
    tasks = [task('http://deehuang.github.io/'), # (url)
             task('http://www.baidu.com/')]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close() 
    

    小总结:
    asynico里面帮助我们完成的是异步的功能
    aiohttp帮助我们完成的是封装Http数据包的功能和创建socket链接的功能
    这两个组件是配合使用的,aiohttp也是基于asynico去开发的

    asyncio+requests 模块

    asynico除了可以搭配aiohttp模块使用,还能和requests模块搭配使用!

    """
    asyncio+requests模块的搭配固定用法
    """
    import asyncio
    import requests
    
    @asyncio.coroutine
    def task(func,*args):
        print(func,args)
    	loop = asyncio.get_event_loop()
        future = loop.run_in_executor(None,func,*args) # 把args当作参数传入func中
        response = yield from future
        print(response.url,response.content)
        
    	
    tasks = [task(requests.get,'http://deehuang.github.io/'), # (func,url)
             task(requests.get,'http://www.baidu.com/')]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close() 
    

    requests模块实际上在这里也是帮助我们封装Http数据包并且建立socket连接,然后通过socket发过去

    gevent 模块

    如今主流的一个帮助我们做网络请求提高并发的模块,它内部原理跟asyncio原理本质上是一样的!

    gevent内部依赖于greenlet模块,greenlet是一个协程的模块(复习一下协程,是一个微线程,是我们自己指定一个线程先执行什么再执行什么再执行什么……这样的一个过程)。greenlet只是做协程,处理网络请求我们没办法指定先执行什么然后执行什么,它是根据请求的响应来决定什么时候执行的。所以说gevent依赖于greenlet实现切换功能,而gevent内部也实现了异步IO的功能。两者结合起来去完成并发。

    gevent,greenlet都不是内置模块,所以需要去安装它们
    pip install greenlet
    pip install gevent

    geventasyncio一样,内部只是支持socket级别的操作,即只支持TCP,不支持HTTP。所以我们还是需要搭配其他的http封包模块去帮助我们发Http请求

    gevent+requests:

    """
    gevent+requests实现并发  ---->标准用法
    """
    import gevent
    import requests
    from gevent import monkey
    
    monkey.patch_all()  #这句不要忘记了,它内部会找到我们原来的所有的socket(原来socket中的io操作如sent,recv,connect等等都是同步操作),都变成gevent帮我们封装成异步IO的socket 
    
    def task(method,url,req_kwargs):
        print(method,url,req_kwargs)
        response = requests.request(method=method,url=url,**req_kwargs)
        print(response.url,response.content)
    
    #### 发送请求 ####
    gevent.joinall([
      gevent.spawm(task,method='get',url='http://deehuang.github.io/',req_kwargs={}),
      gevent.spawm(task,method='get',url='http://deehuang.github.io/',req_kwargs={}),
      gevent.spawm(task,method='get',url='http://deehuang.github.io/',req_kwargs={}),
    ])
    
    

    如果我们需要同时并发的任务有特别多,例如有一百个请求需要发出去。我们一个线程处理的完吗?肯定效率是非常低的。所以gevent非常友好的给我们提供了协程池,来控制我们最多向远程发起请求的数量

    #### 发送请求(协程池控制最大携程数量) ####
    from gevent.pool import Pool
    pool = Pool(5) # 最多调用五个协程切换任务
    gevent.joinall([
      gevent.spawm(task,method='get',url='http://deehuang.github.io/',req_kwargs={}),
      gevent.spawm(task,method='get',url='http://deehuang.github.io/',req_kwargs={}),
      gevent.spawm(task,method='get',url='http://deehuang.github.io/',req_kwargs={}),
    ])
    
    

    grequests 模块

    再次感慨“面向调包”语言的强大,有人已经把上面的gevent和requests封装成一个模块供我们快速调用了。(其实我觉得是有点low的,因为上面也没几句代码......)

    import grequests
    
    request_list = {
    	grequests.spawm(task,method='get',url='http://deehuang.github.io/',req_kwargs={}),
    	grequests.spawm(task,method='get',url='http://deehuang.github.io/',req_kwargs={}),
    	grequests.spawm(task,method='get',url='http://deehuang.github.io/',req_kwargs={}),
    }
    
    #### 执行并获取响应列表 ####
    response_list = grequests.map(request_list,size=5) #size表示的协程池内协程数量
    print(response_list)
    

    Twisted 模块

    不是python的内置模块,所以需要安装 pip install twisted

    from twisted.web.client import getPage,defer
    from twisted.internet import reactor
    
    def one_done(arg):
        """每下载完一个页面就执行这个函数"""
        print(arg)
       
    def all_done(arg):
        """所有页面下载完后执行这个函数"""
        print('done')
        reactor.stop() # 全部请求处理完后,终止循环
    
        
    @defer.inlineCallbacks    
    def task(url):
        res = getPage(bytes(url,encoding='utf-8'))  # 发送http请求
        res.addCallback(one_done)  # 添加回调函数,每下载完成一个页面后执行onde_done回调函数
        yield res
    
    url_list = ['http://deehuang.github.io/',
                'http://www.baidu.com/']
    
    defer_list = []  # 放的是已经向url发送请求的对象
    for url in url_list:
        v = task(url) # 执行task函数,因为有defer_inlineCallbacks装饰器,当我们执行到getPage()后会立即返回!即把请求发过去不等请求回来,实现并发发请求。返回已经向url发送请求的对象
        defer_list.append(v)   
    d = defer.DeferredList(defer_list)  # 把已经向url发送请求的对象列表放到该特殊对象中
    d.addBoth(all_done)
    
    reactor.run() # 这是一个死循环,它会“一直”去DeferredList中监听所有的请求状态,如某一请求返回后接着从原来返回的函数往下执行(该实例中每个请求返回后去执行one_done)。所有请求处理完后触发d.addBoth中添加的回调函数,即all_done,实现异步操作。
    
    # 请求全部处理完后,reacotor.run还是会傻傻的一直循环,要终止循环我们需要在all_done中去终止循环,
    

    上面例子中的reactor.run()死循环,我们有一个专业的名词叫做“事件循环”
    这个事件循环的作用就是帮助我们监测咱们发出去的请求,看它是否又没返回。
    所以我们在使用所有的异步IO框架的时候要有一个概念:通过事件循环去监听每个请求的返回状态,当每个请求返回后通过回调执行相应的方法。所有的请求都返回处理完后,需要把整个事件循环终止掉!

    PS: scrapy异步IO请求的实现是基于Twisted来完成的

    Tornado 模块

    Tornado框架的异步功能用来做异步IO请求模块也是相当不错的,它的用法和Twisted十分相似。

    from tornado.httpclient import AsyncHTTPClient
    from tornado.httpclient import HTTPRequest
    from tornado import ioloop
    
    COUNT = 0 # 计数器,用于中止IO循环
    def handle_response(response):
        """
        处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop()
        """
        global COUNT
        COUNT -= 1
        if response.error:
            print("Error:", response.error)
        else:
            print(response.body)
    	if COUNT = 0: # 终止事件循环
            ioloop.IOLoop.current().stop()
        
    
    def func():
        url_list = [
            'http://www.baidu.com',
            'http://cn.bing.com',
        ]
        global COUNT
        COUNT = len(url_list)
        for url in url_list:
            print(url)
            http_client = AsyncHTTPClient() 
            http_client.fetch(HTTPRequest(url), handle_response) # handle_response是回调函数
    
    
    ioloop.IOLoop.current().add_callback(func)
    ioloop.IOLoop.current().start()  # 死循环  ---> 事件循环
    

    Tornado不像Twisted一样内部自动的维护一个计算器。每次计算我们要发送请求的数量(url_list中的数量),请求返回执行回调函数后该计数器自动减1。当计数器为0时应该终止事件循环函数ioloop.IOLoop.current().stop()。这个计数器在Tornado中需要我们手动的去设计
    Twisted已经内部帮我们把计数器封装好了,当计数器值为0时在内部触发add.Both()添加的回调函数。


    对于上面的异步IO请求模块使用的优先级,我个人认为是这样的:
    gevent > Twisted > Tornado > asyncio (这个比较标准纯粹是怎么方便怎么用.....)


    自定义异步IO模块

    这些内置或第三方的异步IO请求模块,在日常生活中我们作为一个”使用者“掌握怎么去使用就足够了。对于内部很多细节的实现我们基本不用去深究。但是,作为一个充满好奇心的小白当然是不满足的!下面将摸索一下异步IO请求的底层原理及实现,以一个(小白)“开发者”的角度自己去写一个异步IO模块!

    我们是否有这样的疑问:我们知道协程提供切换功能实现非阻塞,异步IO使请求得到响应后实现自动回调功能。两者加起来就可以用一个线程发送处理多个http请求。那么异步IO是怎么做到监测到请求的响应的呢???(提前引入一下是通过“io多路复用”来实现的)
    所以要自定义异步IO模块,我们首先要理解一些前戏:

    非阻塞socket

    我们先来由浅入深的来看看一个Http请求的本质。我们知道HTTP是建立在TCP之上的协议,所以我们先来从讨论下socket。
    socket有客户端和服务端。所有的web网站都是服务端,而我们的浏览器属于客户端。

    如图我们写了一个URL,回键一敲!


    这就往这个URL地址发送了一个请求,它本质上就是把一些规定好格式的字符串通过socket.sent("我是http数据包")发到服务端去了。

    上面的过程中,浏览器帮我做了两件事:1.创建socket对象与服务器连接。2.往url地址发送请求。
    其实这里边有两个io操作,首先是建立“连接”,然后是“发送”数据。这两个过程都是比较耗时的。默认情况下“连接”和“发送”或者“接收数据”的时候是会阻塞住的,如果我们想让它不阻塞,我们可以在创建socket的时候设置setblocking(Fasle),要注意的一点是,不阻塞的情况下,程序无数据(连接无响应,数据未返回)就会报错(可以用异常处理来解决)

    import socket
    
    server = socket.socket()
    # 设置非阻塞
    server.setblocking(False)
    # 此时所有的socket操作都不会阻塞
    # 设置了非阻塞后,客户端发送连接请求不会等待服务端响应就直接往下走
    

    IO多路复用

    上述在使用无阻塞socket时,我们说可以使用异常处理去处理无数据(连接无响应,数据未返回)的地方。异常处理的作用只能忽略、也可以说是跳过了它们。可是我们是希望在遇到IO阻塞的地方能不阻塞去执行别的任务,等阻塞的地方返回响应时,我们能马上发现并再去做相应的处理

    那么怎么使得阻塞的地方返回响应时我们能马上做相应处理呢?我们可以整一个死循环,去不断地监测多个socket对象。当某一socket对象有变化时(建立连接成功或接收到数据),我们立即把这个socket对象拿出来做相应的处理。
    上面这一想法有一个专业的术语,叫做Io多路复用,它的意思是“单个线程,通过记录跟踪每个I/O流(sock)的状态,来同时管理多个I/O流” 。
    不明白?从字面上理解就是多路网络复用一个io线程
    不明白?我所理解的大白话就是用一个线程去监听多个socket对象的状态改变!
    还不明白?看下图:

    图中可以明白:在同一个线程里面, 通过拨开关的方式,来同时传输多个I/O流 。现在市面上使用的io多路复用模块有select,pollepoll。它们三个都是IO多路复用的具体的实现。
    头脑风暴:你有N个不知道什么时候来水的水龙头需要接水,你根据某种信号一会儿拧这个龙头,一会儿拧那个龙头把水都接了就是多路复用(一个线程)。使用残像拳在每个水龙头前派一个你的分身蹲守就是Threaded IO(多线程)。其实后者也没啥不好,因为未来的内核会消除上下文切换的软硬件性能损耗
    更多内容理解内容可以参考这里

    这里我们只从客户端的角度去实现无阻塞socket+io多路复用的使用(这里使用的是python中io多路复用模块select模块的)。再次重复一下:使用IO多路复用模块的作用是帮助我们循环去监听哪个sock发送了变化。

    """
    伪代码,主要是帮助理解
    """
    import socket
    server = socket.socket()
    # 设置非阻塞
    server.setblocking(False)
    try:
        socket对象1.connet()
        socket对象2.connet()
        socket对象3.connet()  # 无阻塞,程序直接往下走,无数据报错
    except Exception:
    	pass   # 捕捉无数据异常
        
    while True:
        r,w,e = select.select([socket对象1,socket对象2,socket对象3],#监测是否有数据传来
                              [socket对象1,socket对象2,socket对象3],#监测是否完成连接
                              [],#监测是否发生异常的socket
                              0.05) #最多等待的超时时间
        
    # r 表示有人给我发数据,即当有数据发送过来后,就会把该socket对象放到r中:此时可以通过socket对象.recv去接收数据了
    
    # w 表示我已经和别人创建连接成功,即当连接创建完成后(三次握手),就会把该socket对象放在w中:此时可以通过socket对象.sent('我是Http数据包')发送http请求了
    
    #e 表示socket内部发生异常就会把该socket对象放在e中
    

    select模块以列表形式接受四个参数,分别是需要监听的可读文件对象,可写文件对象,产生异常的文件对象和超时设置,当某个文件描述符状态改变后,会返回三个列表。 (这里我们的文件描述符只针对socket

    select会一直轮询,挨个问一遍socket对象,你有没有变化啊?有变化就把该socket添加到相应的存储空间(如上面的r、w和e)。
    当问完后,此时如果我有设定第四个参数超时时间 timeout = n时,那么如果监听的socket均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行;
    当超时时间为空,则select会一直阻塞,直到监听的socket有发生变化

    也就是说我们select无论是否设置了参数timeout,只要socket有变化,就会返回变化的socket列表,然后程序继续往下执行(而不是说一直阻塞住直到监听到所有的监听对象都有变化才返回)。
    我们对这些返回的socket执行完相应任务,select结束了(即一有socket状态改变,它就不阻塞了,会往下执行程序)! 如果我们想让它一直监听轮询直到处理完全部的socket的话就需要把它放到死循环中去!这个循环就叫做事件循环(用于检测请求的socket是否已经就绪,从而执行相关操作)。
    更多select模块的讲解可以看这里

    拓展: r,w,e = select.select([],[],[],0.05)里面的[]参数中,不仅可以放socket对象,只要一个对象提供fileno方法,并返回一个文件描述符,就可以被select监听!

    所以select内部监听的其实是对象下的fileno()方法返回的文件描述符。
    即我们自己写一个类,类中封装一个fileno()方法,因为我们没办法自己写文件描述符,所以我们直接调用socket对象的文件描述socket().fileno()在我们方法的最后返回。那么我们自定义的对象就可以放到select中去被监听!

    select监听自定义对象

    要记住上面的三个前戏内容(无阻塞socket,io多路复用模块select,select监听的对象)!!它是我们自定义异步IO模块的关键!

    自定义异步IO模块开发

    有了上面的前戏,我们不难理解 异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】!
    异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】 !
    异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】 ! (重要的事情说三遍!!!)

    要明确的一点是:我们一直在写的都是socket的客户端,因为爬虫基于是客户端实现的一种技术!

    本质上浏览器发送一个请求就是做下面这样的事情(认识HTTP的本质):

    import socket
    import select
    ###############HTTP请求本质,阻塞###############
    sk = socket.socket()
    # 1.连接
    sk.connect(('www.baidu.com',80))  # IO阻塞
    print('连接成功啦!!')
    
    # 2.连接成功发送消息
    sk.send(b"GET / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\n")
    #sk.send(b"POST / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\nk1=v1&k2=v2")
    
    # 3.等待服务端响应
    data = sk.recv(8096) # IO阻塞
    print(data)
    
    # 关闭连接
    sk.close()
    
    ###############HTTP请求本质,无阻塞###############
    sk = socket.socket()
    sk.setblocking(False) # 设置完后,该socket就是非阻塞的了
    # 1.连接
    try:
        sk.connect(('www.baidu.com',80))
        # 无阻塞会直接往下走,会报BlockingIOError:无法立即完成一个非阻止性套接字操作
        # 使用异常处理处理日常使程序正常执行,不至于崩溃
        print('连接成功啦!!')
    except BlockingIOError as e:
    	print(e)
    
    while True:
    	# 弄一个事件循环不断去监听socket有没有返回响应
    
    # 2.连接成功发送消息
    sk.send(b"GET / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\n")
    #sk.send(b"POST / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\nk1=v1&k2=v2")
    
    # 3.等待服务端响应
    data = sk.recv(8096) # IO无阻塞
    print(data)
    
    # 关闭连接
    sk.close()
    

    上面案例中我写的非阻塞的Http请求实现方式,使用了一个事件循环不断地去监听socket有没有返回响应。此时程序会卡在这里的。这是非常Low的。
    既然使用非阻塞socket,我们希望的就是能在遇到IO阻塞的时候去执行别的任务(如再去发送别的请求),等到有数据返回的时候执行回调,这就是我们异步IO的功能(使用select去实现)。而不是卡在一个地方!循环监测”一个socket”的状态。
    所以我们的正戏开始!下面我们就来实现自定义异步IO模块:

    import socket
    import select
    
    class HttpRequest:
        """封装请求和相应的基本数据比如说域名Host和回调函数"""
        def __init__(self,sk,host,callback):
            self.socket = sk
            self.host = host
            self.callback = callback
        def fileno(self):
            """让select可以直接监听我们这个对象"""
            return(self.socket.fileno)
    
    class HttpResponse:
        """格式化和处理响应数据(分割请求头请求体)"""
        def __init__(self,recv_data):
            self.recv_data = recv_data
            self.header_dict = {}
            self.body = None
        	initialize()
        
        def initialize(self):
            # 分割响应头和响应体,以两个换行符分割,1表示顺序分割1次 注意接收到的数据是bytes类型
        	headers,body = self.recv_data.split(b'\r\n\r\n',1)
            self.body = body
            # 分割出每一个响应头,以一个换行符分割
    		header_list = headers.split(b'\r\n')
            for i in header_list:
                #分出每个响应头中的键值对以字典形式存放
                h_str = str(h,encoding='utf-8')
                v = h_str.split(":",1)  # 响应头中键值对是以':'分割的(响应首行不是键值对形式,需处理)
                if len(v) == 2:
                    # 以键值对形式存在的响应头,直接放入字典中
                    self.header_dict[v[0]] = v[1]
                 elif len(v) == 1:
                    # 请求首行放的是一些staus等信息,不以键值对形式存在,这里做特殊处理。
                    # 请求首行的信息HTTP/1.1 200 OK
                    self.header_dict['staus'] = v[0]
                    
    
    class AsyncRequest:
        def __init__(self):
            self.conn = []   # 用于检测sock是否有数据可读
            self.connection = [] # 用于检测是否已经连接成功
            
        def add_request(self,host,callback):
            """创建socket并连接服务器"""
            try:
                sk = socket.socket()
                sk.setblocking(False)  # 设置为非阻塞socket
                sk.connect((host,80))  # 非阻塞,发送链接请求后马上往下执行,会报错
            except BlockingIoError as e:
                pass
            request = HttpRequest(sk,host,callback)
             # 把request封装的socket对象全放到了我们的conn列表中
            self.conn.append(request) 
            # 把request封装的socket对象全放到了我们的connection列表中
            self.connection.append(request) 
        def run(self):
            """事件循环,用于检测请求的socket是否已经就绪,从而执行相关操作"""
            while True:  
                r,w,e = select.select(self.conn,self.connection,self.conn,0.05)
                # 设置了timeout,轮询完后阻塞timeout时间,返回列表
                # 监测socket连接上了,所以循环遍历w(如果是空列表是不会执行for循环内容的)
    			for request in w:
                    print(request.host,'连接成功...')
                    # 只要能循环到,表示socket和服务器端已经连接成功
                    tpl = "GET %S HTTP/1.0\r\nHost:%s\r\n\r\n"%(w.host)
                    request.socket.send(bytes(tpl,encoding='utf-8'))
                # 因为成功连接后没必要继续监听,so要把该socket从监测是否连接成功的列表中移除掉
                    self.connection.remove(request)
                 
                # 连接成功后我们就要看下连接成功的socket是否有数据传过来了,所以要遍历r
                # 如果是空列表(即没有socket有数据可读时r就是[]空的)是不会执行for循环内容的
                	for request in r:
                        # request是HttpRequset对象,如果还没有数据返回,
                        recv_data = bytes() # 表示这个一个空的字节
                    	while True:
                           	# 因为服务端可能返回的数据大于我们指定接收的大小,所以要循环去收完
                            try:
                                # 非阻塞socket没有接收到数据也会往下执行,报错
                                # 所以必须使用异常处理
                            	chunck = request.socket.recv(8096)
                                recv_data += chunck
                            except Exception as e:
                                # 出异常表示数据接收完了或者数据为空
                                break
    						response = HttpResonse(recv_data)  # 把接收的http数据响应头响应体分割 
                            request.callback(response) # 让每个响应去执行相应的回调函数
                            request.socket.close()
                            self.conn,remove(r) #接收完数据后,该sock就不需要继续被监听了
                	
                    if len(self.conn) == 0:
                        #处理完所有的请求后我们要退出循环,否则select会一直监听
                        break
                            
                       
    def f1(response):
        print('保存到文件',response.header_dict)
    
    def f2(response):
        print('保存到数据库',response.header_dict)     
        
    url_list=[
        {'host':'www.baidu.com','callback':f1},
        {'host':'cn.bing.com','callback':f1},
        {'host':'deehuang.github.io',,'callback':f1},
    ]
    req = AsyncRequest()
    for item in url_list:
        req.add_request(item['host'],item['callback'])  # 给每个请求创建socket并连接url所指的服务器
    req.run()
    
            
    

    案例中的我们使用一个死循环去执行run()。这个死循环其实就是我们的在上面介绍常用的python内置和第三方异步IO模块的时候所说到的事件循环。它的作用就是检测请求的socket是否已经就绪,从而执行相关操作

    补充知识点:所有和http相关的模块,包括我们的web框架,本质上对接收的到的http请求数据使用的分割请求头和请求体方法都是这样的header,body = recv_data.split(b'\r\n\r\n',1)即从左到右对第一个双换行符进行分割(因为http的请求头和请求体就是用双换行符去分开标识的)。而对于每个请求头我们使用“/r/n”单换行符就可以进行分割header_list = headers.split(b'\r\n')。对于请求头中的键和值和值是以:去分割的!我们案例中可以以同样的方法会分割和处理我们拿到的响应数据,案例专门使用HttpReponse()对象去处理响应数据

    如果我们要想实现每个请求执行相关操作,需要在url_list中带上要执行的回调函数


    上面短短一百多行代码,就是实现了我们的异步IO模块了!
    要注意的一点是,这个设计的异步框架对于框架本身不是异步的,我们只不过是让socket不阻塞,然后加上个循环一直去检测socket的状态。当这个模块在别人调用的时候才实现了异步!既对使用者来说才是异步的。

    再思考一下:
    select能做异步吗?selcet仅仅是io多路复用,它唯一的功能就是:同时监听多个socket对象!同时监听多个socket对象!同时监听多个socket对象!。它监听的这个过程本来就是同步的(不断去监听sock的状态),它自己完成不了异步的操作,而我们利用它的特性可以去实现我们的异步IO模块
    什么是异步IO?字面上看它首先是异步的,然后它是一个IO请求。所以异步IO的意思就是当我们遇到IO请求的时候,不再一边傻等,而是能去做别的事情(非阻塞socket)。等IO请求完成后内部自动完成回调(”利用“io多路复用去实现,ps:是利用不是使用,Io多路复用只能干什么??看上面!!)去执行我指定的函数这样的一个过程
    所以异步是啥东西?我简单的理解是:异步就是自动回调!自动回调!自动回调!

    So,通过非阻塞的socket+利用io多路复用 就可以实现我们的异步IO模块

    总结异步IO模块的实现:

    1. 设置非阻塞socket
    2. select监听多个socket的状态(可以用我们自己定制的对象fileno方法+return socket描述符 sk.fileno()
    3. 监测scoket状态变化调用对应的回调函数
    4. 事件循环,即然后重复上面2,3过程。实现不停请求socket状态并调用对应的回调函数
  • 相关阅读:
    7款纯CSS3实现的炫酷动画应用
    9款基于HTML5/SVG/Canvas的折线图表应用
    8款耀眼的jQuery/HTML5焦点图滑块插件
    10款很酷的HTML5动画和实用应用 有源码
    13款精彩实用的最新jQuery插件
    9款超绚丽的HTML5/CSS3应用和动画特效
    8款最受欢迎的HTML5/CSS3应用及源码
    Zookeeper可以干什么
    MySQL数据库优化
    SQL语句的执行过程
  • 原文地址:https://www.cnblogs.com/deehuang/p/14394771.html
Copyright © 2011-2022 走看看