zoukankan      html  css  js  c++  java
  • I/O模型实际应用与协程

    I/O模型实际应用与协程

    前言

      本章主要针对上一章节中的I/O模型做一些代码性的练习,尽管可能很少用到。但是你应当知道这些东西。还是具体结合上一篇的理论知识来看吧。最后将协程放在了扩展篇中,因为感觉目前能接触到的很多都是I/O多路复用+异步。协程属实不太常见,可能我层次低了。

     

    阻塞I/O的socket服务端

      阻塞点1:accept()等待三次握手建立链接

      阻塞点2:recv()从内核态的缓冲区复制数据到用户态内存

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    
    import socket
    server = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM)
    
    server.bind(("127.0.0.1",6666)) 
    
    server.listen(5)
    
    while 1:
        conn,client_addr = server.accept()  # 阻塞点 1
        while 1:
            try: 
                data = conn.recv(1024)  # 阻塞点 2
                if not data:
                    break
                conn.send(data.upper())
            except ConnectionResetError as e:
                print(client_addr, "关闭了双向链接")
                break
        conn.close()

     

    非阻塞I/O的socket服务端

      阻塞点1:recv()从内核态的缓冲区复制数据到用户态内存

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    
    import socket
    server = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM)
    
    server.setblocking(False)  # 设置为非阻塞
    
    server.bind(("127.0.0.1",6666))
    
    server.listen(5)
    
    while 1:
        try:
            conn,client_addr = server.accept()  # 不阻塞
        except BlockingIOError as e:  # 捕捉异常
            continue
    
        while 1:
            try:
                data = conn.recv(1024)  # 读取数据时依然会阻塞
                if not data:
                    break
                conn.send(data.upper())
            except ConnectionResetError as e:
                print(client_addr, "关闭了双向链接")
                break
            except BlockingIOError as e:  # 捕捉异常
                continue
    
        conn.close()

     

    select模块与I/O多路复用

      阻塞点1:即描述符监听r_list,如果有事件触发就放到r列表中

      阻塞点2:recv()从内核态的缓冲区复制数据到用户态内存

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    
    import socket
    import select
    
    server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
    
    server.setblocking(False)  # 设置为非阻塞
    
    server.bind(("127.0.0.1", 6666))
    
    server.listen(5)
    
    r_list = [server, ]  # 被监听的socket对象,读取事件
    w_list = []  # 被监听的socket对象,写入事件
    
    while 1:
        r, w, e = select.select(r_list, w_list, [], 0.5)
    
        """
        r:   相当于一个空列表,当被监听的r_list中有任何对象能够读取数据了,则放到 r 列表中。
        w:   同上,放的是可写的
        e:   同上,放的是错误的
        0.5:  循环监听的时间,每隔多少秒监听一次
        """
    
        for i in r:  # 当有任何可读就执行这里,这个就是事件循环。阻塞点 1
            print("-----", r)
            if i == server:
                conn, addr = i.accept()
                r_list.append(conn)  # 将链接通道 conn 放入监听队列
            else:  # 如果不是server端,那就是conn,链接通道
                try:
                    data = i.recv(1024)  # 阻塞点2
                    if not data: break
                    i.send(data.upper())
                except ConnectionResetError:
                    print("断开链接通道", i)
                    r_list.remove(i)  # 将conn从可读列表中移除
    
        """
        1.开启服务端,监听可读事件列表 r_list, r_list = [socket对象,]  现在的r = []
        2.开启客户端,发送请求链接,r 列表新增了一个可读  r = [socket对象] ,执行for循环,条件成立
        3.建立双向链接通道,将链接通道放入可读事件列表 r_list = [socket对象,conn1]
    
        # --- 客户端给服务端发消息 ---
    
        1.此时的被监听列表 r_list = [socket对象,conn1]
        2.客户端发了一个消息, r_list = [conn1,]
        3. 执行for循环,条件不成立
    
        # --- 客户端断开链接 ---
    
        1.从监听列表 r_list 中删除了 conn1
    
        """

     

    selectors模块与异步

      selectors 包可以自动根据操作系统选择poll还是epoll,windows不支持epoll

      selectors包作为select包的升级版,更推荐去使用

      select包目前只支持select方法。

     

      现在很多框架的内部,如著名的异步框架tornadoTwisted ,等等都是通过epoll实现的异步,其实epoll到底属于不属于异步在网络上有很大的争议,下面会有一节解释一下我认为比较好的一个答案。

     

      官方文档

     

      如果是epoll,只有一个阻塞点,那就是recv()从内核态的缓冲区复制数据到用户态内存

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    
    import socket
    import selectors
    
    sel = selectors.DefaultSelector()  # 自动选择,select,poll,epoll
    
    
    def accept(sock, mask):
        conn, client_addr = sock.accept()  # 拿到双向链接
        print("新的双向链接通道{0}来自{1}".format(conn, client_addr))
        sel.register(conn, selectors.EVENT_READ, read)  # 为conn注册可读事件,绑定回调函数read
    
    
    def read(conn, mask):
        try:  # <--针对 windows 平台客户端强制退出服务端会抛出异常
            data = conn.recv(1024)
            if not data:  # <-- 针对 UNIX 平台客户端强制退出服务端会无限收空
                close_conn(conn, mask)
            conn.send(data.upper())
        except ConnectionResetError as e:
            close_conn(conn, mask)
    
    
    def close_conn(conn, mask):
        sel.unregister(conn)  # 注销对conn双向链接通道的监听
        print("断开链接通道", conn)
        conn.close()
    
    
    server = socket.socket()
    server.bind(("127.0.0.1", 6666))
    server.listen(5)
    server.setblocking(False)  # 设置非阻塞
    
    sel.register(server, selectors.EVENT_READ, accept)  # 为server注册可读事件,回调函数为accept
    
    def run():
        while 1:  # 只要有注册的事件触发了,就会执行相应的回调函数,这里是死循环不断检测
            events = sel.select(5)  # 最大监听时间
            for key, mask in events:
                callback = key.data  # key.data 就是回调函数
                callback(key.fileobj, mask)  # 执行回调,key.fileobj是监听的对象,也就是注册时的对象。
    
    if __name__ == '__main__':
        run()

     

      其实我们这里已经实现了单线程来模拟多线程的监听,但是如果返回结果的时候是同时很多client端都来了,那么我们可以使用多线程来进行操作。

    def run():
        from concurrent.futures import ThreadPoolExecutor
        while 1:  # 只要有注册的事件触发了,就会执行相应的回调函数,这里是死循环不断检测
            events = sel.select(5)  # 最大监听时间
            for key, mask in events:
                with ThreadPoolExecutor(5) as pool:
                    callback = key.data  # key.data 就是回调函数
                    pool.submit(callback,key.fileobj,mask)  # 执行回调,key.fileobj是监听的对象,也就是注册时的对象。

     

      单线程中模拟多线程,是阻塞I/O以及单纯的非阻塞I/O都做不到的,他们都需要开多线程或者多进程,当然除了I/O多路复用,还有协程可以供我们使用。

     

    扩展:epoll到底是属于同步还是异步

     

      学习tornadoasyncio这些异步网络库时,遇到了同样的问题,网上查到的也都说不明白,原来几个概念没搞清楚。

        1,I/O操作有多种,处理socket是一种,磁盘读写也是一种,暂时分为网络I/O和文件I/O、

        2,I/O多路复用是操作系统级别的,属于linux操作系统的五种I/O模型中的一种,是操作系统级别同步非阻塞的。

        3,异步网络库 twisted、tornado、asyncio所谓的异步,是应用级别的异步,底层确实是基于epoll实现,基本上都是处理网络 I/O,而且都是基于事件驱动的,使用时划分事件也大多是根据网络请求。

        4,操作系统级别的异步I/O才是真正异步非阻塞的,然后并没有很多应用,貌似unix平台没有,windows NT平台有也很少,而且基本都是文件I/O。

        5,I/O多路复用的实现用的比较多,linux平台的epoll,windows平台的select等,基于epollselect的应用大多是实现网络I/O。

     

      所以,遇到异步框架,异步网络库都应该知道是应用级别的异步,而且基本上都是基于epoll/select实现的。

      已知tornado会根据系统平台,选择epoll还是select

      实现高并发有多种方式,python多进程可以利用多核优势,协程(geventasyncio)可以实现应用级别的异步,celery实现任务异步,消息队列实现服务解耦等等,项目中可以根据实际情况选择或组合不同的方式。

      tornado(web框架/异步网络库):进程+异步+epoll

      asyncio:协程+epoll,使用中需要相应的异步库,常用aiohttp

      geventgreenlet+猴子补丁,猴子补丁把socket相关库改为非阻塞

     

      摘自知乎:个人理解,应用层上的epoll确实是属于异步,而系统层上的epoll并非绝对意义上的异步。

     

    扩展:协程

      协程是不是一个真实存在的东西,但是线程和进程都是操作系统中真实存在的,协程也是做到在单线程中实现并发的效果。

     

      协程是程序员创造出的一个不是真实的东西

      协程可以认为是微线程,对一个线程进程进行分片,

      使得线程在代码块之间可以进行来回切换,而不是在原来的基础上逐行执行。

      一个线程中函数的切换,使得它的切换代价非常低。

     

     

      注意:单纯的协程是没有什么用的,就只是做做切换,你甚至可以用生成器函数yield来完成。

     

      1. 必须在只有一个单线程里实现并发

      2. 修改共享数据不需加锁

      3. 用户程序里自己保存多个控制流的上下文栈

      4. 附加:一个协程遇到I/O操作自动切换到其它协程(如何实现检测I/O,yieldgreenlet都无法实现,就用到了gevent模块(select机制))

     

      greenlet协程模块: 人为做切换

    import greenlet
    ​
    def f1():
        print(1)
        gr2.switch() # 执行协程2
        print(2)
        gr2.switch()
    ​
    def f2():
        print(3)
        gr1.switch()
        print(4)
    ​
    # 表示创建了2个协程。由1个线程创建出来的
    ​
    gr1 = greenlet.greenlet(f1)  
    gr2 = greenlet.greenlet(f2)
    ​
    gr1.switch()  # 执行协程1
    # ==== 执行结果 ====
    """
    1
    3
    2
    4
    """

     

      gevent模块greenlet + IO切换,单纯的协程虽然没什么用,但是如果能够智能的碰见I/O就自己做切换,那就非常牛逼了。

    import gevent
    from gevent import monkey
    monkey.patch_all()  # 对以下代码中有IO操作就切换
    import requests
    ​
    def f(url):
        res = requests.get(url)  # 碰见 I/O 自动切换自另一个协程
        print(url,res.text)
    ​
    gevent.joinall([
        gevent.spawn(f,"https://www.baidu.com/"),  # 协程 1
        gevent.spawn(f,"https://www.yahoo.com/"),  # 协程 2
    ]
    )

     

      1.协程能提高并发吗 ?

      答:协程本身是无法提高并发的,但是协程+I/O切换可以.

     

      2.单线程提高并发的方法是?

      协程+IO切换 gevent,基于事件循环的异步非阻塞框架 Twisted

     

      3.进程,线程,协程的区别是什么?

      进程是资源分配的最小单位,线程是CPU调度的最小单位.

      在一个程序中可以有多个进行,一个进程最少有一个线程.

      和其他语言相比较,其他语言几乎不用进程的,但是在Python中,它的进程和线程是有差异的,Python有个GIL锁,GIL锁保证一个进程在同一时刻只有一个线程被CPU调到.

      对于协程来说,它是有程序员创造出来的不是一个真实存在的东西,它本身是没有意义的,但是当 协程+I/O切换放到一起的时候就可以提高单线程并发的性能.

     

  • 相关阅读:
    Mongodb中Sharding集群
    Codis --豌豆荚开源的Redis分布式中间件
    Linux下查看文件和文件夹大小
    kafka中处理超大消息的一些考虑
    heroku
    MVCC图示
    oracle ORA_ROWSCN 行记录的更新时间
    6个理由告诉你为什么要用NAS
    RAID技术介绍和总结
    新一代分布式任务调度框架:当当elastic-job开源项目的10项特性
  • 原文地址:https://www.cnblogs.com/Yunya-Cnblogs/p/13248146.html
Copyright © 2011-2022 走看看