zoukankan      html  css  js  c++  java
  • Python-异步编程

    1、异步 同步

      函数或方法被调用时,调用者是否得到最终的结果

      直接得到最终结果的,就是同步调用

      不直接得到最终结果的,就是异步调用

    2、阻塞 非阻塞

      函数或方法调用的时候,是否立刻返回

      立即返回就是非阻塞

      不立即返回就是阻塞调用

    3、区别

      同步,异步,与 阻塞,非阻塞 没有关系

      同步,异步强调的是,是否得到最终的结果

      阻塞,非阻塞强调的是时间,是否等待

      同步与异步区别在于:调用者是否得到了想要的结果

      同步就是一直要执行到返回最终的结果

      异步就是直接返回额,但是返回的不是最终的结果,调用者不能通过这种嗲用得到结果,需要童工被调用者的其他方式通知调用者,来取回最终结果。

      阻塞与非阻塞 的区别在于,调用者是否还能干其他的事

      阻塞,调用者就只能干等

      非阻塞,调用者可以先去忙别的,不用一直等。

    4、联系

      同步阻塞:什么都不做,直到拿到最终的结果

      同步非阻塞:等最终结果的期间,可以做别的

      异步阻塞:给一个信息,让等通知,但是期间什么都不干

      异步非阻塞:等通知,但是期间可以做别的

    5、同步IO,异步IO,IO 多路复用

      5.1、IO 两个阶段:

        1、数据准备阶段

        2、内核空间复制数据到用户进程缓冲区阶段

      5.2、发生IO的时候:

        1、内核从输入设备读,写数据

        2、进程从内核复制数据

      系统调用 ---- read 函数

    6、IO 模型

      同步IO:

        同步IO 模型包括:阻塞IO,非阻塞IO, IO多路复用

      ***阻塞IO:

      进程等待(阻塞),知道读写完成(全程等待)

      read/ write 函数

      ****非阻塞IO

      

      

      进程调用read操作,如果IO设备没有准备好,立即返回error ,进程不阻塞,用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间。

      第一阶段数据没有准备好,就先忙别的,等会再来看看,检查数据是否准备好了的过程是非阻塞的。

      第二阶段是阻塞的,即内核空间和用户空间之间复制数据 是阻塞的。

      ****IO 多路复用 

        所谓IO 多路复用,就是同事监控多个IO,有一个准备好了,就不需要等了,开始处理,提高了同时处理IO 的能力

        select 几乎所有的操作系统平台都支持,poll是对select的升级

        epoll,Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加了回调机制,BSD,Mac平台有kqueque,windows有iocp

      

        

      select 为例,将关注的IO 操作 告诉select函数 并调用,进程阻塞,内核监视select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select 返回,在使用read将数据复制到用户进程。

      一般情况下,select最多能监听1024个fd(可以修改,不建议修改),但是由于select 采用轮询的方式,当管理的IO 多了,每次都要遍历全部额fd,效率低下(每次某个IO 设备准备好了,都需要遍历一遍select)

      基于事件驱动的epoll没有管理的fd的上限, 且是回调机制,不需要遍历,效率很高。

          事件驱动IO:
                    通知机制:
                        1、水平触发通知,一直通知,直到处理
                        2、边缘触发, 只通知一次
                    event 是事件驱动IO

      异步IO:

       

      进程发起异步IO 请求,立即返回,内核完成 IO 的两个阶段,内核给进程发一个信号

      

    7、Python中 IO 多路 复用

      IO 多路复用:

      • 大多数操作系统都支持select 和poll
      • Linux 2.5+ 支持epoll
      • BSD,Mac 支持kqueque
      • windoes的iocp

      Python的select库

        实现了select , poll 系统调用,这个基本上操作系统都支持,部分实现了epoll

        底层的IO 多路复用模块。

      开发中的选择:

        1、完全夸平台,使用select ,poll,但是性能较差

        2、针对不同操作系统自行选择支持的技术,

        select 维护一个文件描述符数据结构,单个进程使用有上限,通常1024,线性扫描这个数据结构,效率低,

        poll和select的区别是内部数据结构使用链表没有这个最大限制,但是依然是线性遍历才能知道那个设备就绪了

        epoll使用事件通知机制,使用回调机制提高效率

        select poll 还要从内核空间复制消息到用户空间,而epoll通过内核空间 和用户空间共享的一块内存来减少复制(mmap))

     8、selectors 库

      3.4版本提 selectors 库, 高级IO 复用库

    类层次结构:
       
    BaseSelector
    +-- SelectSelector   实现select
    +-- PollSelector       实现poll
    +-- EpollSelector     实现epoll
    +-- DevpollSelector  实现devpoll
    +-- KquequeSelector实现kqueque

      selector.DefaultSelector 返回当前平台最有效,性能最高的实现、

      源代码: 

    if 'KqueueSelector' in globals():
        DefaultSelector = KqueueSelector
    elif 'EpollSelector' in globals():
        DefaultSelector = EpollSelector
    elif 'DevpollSelector' in globals():
        DefaultSelector = DevpollSelector
    elif 'PollSelector' in globals():
        DefaultSelector = PollSelector
    else:
        DefaultSelector = SelectSelector

      但是,由于没有实现Windows 下的IOCP ,所以,只能退化为select

      abstractmethod register( fileobj,  events, data=Nona)

      为selector 注册一个文件对象,监视它的IO  事件,返回SelectKey对象

      fileobj 被监视文件对象,例如:socket对象

      events 事件,该文件对象必须等待的事件

      data 可选的,与此文件对象相关联的不透明的数据,例如:关联用来存储每个客户端的会话ID,关联方法,通过这个参数在关注的时间产生后让二selector干什么事

      

      selector.SelectorKey 有4 个属性

      fileobj 注册的文件对象

      fd 文件描述符

      events 等待上面的文件描述符的文件对象的事件

      data 注册时关联的数据

      举例: 完成一个TCP Server, 能够接受客户端 请求并回应 客户端消息

     1 import selectors
     2 import threading
     3 import socket
     4 import logging
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s'
     7 logging.basicConfig(level=logging.INFO, format=FORMAT)
     8 
     9 # 构建缺省性能最优的selector
    10 selector = selectors.DefaultSelector()
    11 
    12 # 创建 tcp server
    13 sock = socket.socket()
    14 laddr = '127.0.0.1', 9999
    15 sock.bind(laddr)
    16 sock.listen()
    17 logging.info(sock)
    18 
    19 sock.setblocking(False) #  非阻塞
    20 
    21 # 回调函数,自定义形参
    22 def accept(sock:socket.socket, mask):
    23     """ mask :事件 掩码的或值 """
    24     conn, raddr = sock.accept() # 这里虽然可以阻塞,但是事实上,阻塞是在  events = selector.select() 就处理了
    25     conn.setblocking(False) # 不阻塞
    26     logging.info('new socket {} in accept'.format(conn))
    27 
    28 # 注册文件对象sock 关注读事件,返回SelectorKey
    29 # 将sock,关注事件, data都绑定到key 实例属性上
    30 key = selector.register(sock, selectors.EVENT_READ, accept)
    31 logging.info(key)
    32 # 2018-11-06 14:03:15,407 MainThread 2964 SelectorKey(fileobj=<socket.socket fd=272,
    33 # family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0,
    34 # laddr=('127.0.0.1', 9999)>, fd=272, events=1, data=<function accept at 0x0000000002930268>)
    35 while True:
    36     # 开始监听,等到有文件对象监控事件产生,返回(key, mask) 元组
    37     events = selector.select() # 没有 准备好的io,会阻塞在这里。
    38     logging.info(events)# 是一个二元组 组成的列表
    39     '''
    40     2018-11-06 14:06:20,215 MainThread 5236 [(SelectorKey(fileobj=<socket.socket fd=272,
    41      family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0,
    42       laddr=('127.0.0.1', 9999)>, fd=272, events=1, data=<function accept at 0x0000000002271268>), 1)]
    43     '''
    44     for key, mask in events:
    45         logging.info(key)
    46         logging.info(mask)
    47         callback = key.data #accept
    48         callback(key.fileobj, mask) # accept(key.fileobj, mask)
    测试

      结果:

    1 D:python3.7python.exe E:/code_pycharm/tt7.py
    2 2018-11-06 14:11:01,816 MainThread 8520 <socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)>
    3 2018-11-06 14:11:01,817 MainThread 8520 SelectorKey(fileobj=<socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)>, fd=272, events=1, data=<function accept at 0x0000000002950268>)
    4 2018-11-06 14:11:03,851 MainThread 8520 [(SelectorKey(fileobj=<socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)>, fd=272, events=1, data=<function accept at 0x0000000002950268>), 1)]
    5 2018-11-06 14:11:03,851 MainThread 8520 SelectorKey(fileobj=<socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)>, fd=272, events=1, data=<function accept at 0x0000000002950268>)
    6 2018-11-06 14:11:03,851 MainThread 8520 1
    7 2018-11-06 14:11:03,851 MainThread 8520 new socket <socket.socket fd=268, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 58313)> in accept
    客户端一连接,就退出,且服务端,不会退出

     

      测试:处于一直监听状态 

     1 import selectors
     2 import threading
     3 import socket
     4 import logging
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s'
     7 logging.basicConfig(level=logging.INFO, format=FORMAT)
     8 
     9 # 构造缺省性能最优elector
    10 selector = selectors.DefaultSelector()
    11 
    12 # 创建TCP server
    13 sock = socket.socket()
    14 laddr = '127.0.0.1', 9999
    15 sock.bind(laddr)
    16 sock.listen()
    17 logging.info(sock)
    18 
    19 sock.setblocking(False) # 非阻塞
    20 
    21 # 回调函数,自己定义参数
    22 def accept(sock:socket.socket, mask):
    23     conn, raddr = sock.accept()
    24     conn.setblocking(False)
    25 
    26     logging.info('new socket {} in accept'.format(conn))
    27     key = selector.register(conn, selectors.EVENT_READ, read)
    28     logging.info('--------------------')
    29     logging.info(key)
    30 
    31 # 回调函数
    32 def read(conn:socket.socket, mask):
    33     logging.info('===================')
    34     data = conn.recv(1024)
    35     msg = 'your msg = {} ==='.format(data.decode())
    36     logging.info(msg)
    37     conn.send(msg.encode())
    38 
    39 # 注册 文件对象sock 关注读事件, 返回SelectorKey
    40 # 将sock, 关注事件,data都 绑定到key实例属性上
    41 key = selector.register(sock, selectors.EVENT_READ, accept)
    42 logging.info(key)
    43 
    44 event = threading.Event()
    45 
    46 def select():
    47     while not event.is_set():# 一直处于监听状态
    48         # 开始监听 ,等到有文件对象监控事件产生, 返回(key, mask) 元组
    49         events = selector.select()
    50         for key, mask in events:
    51             callback = key.data
    52             logging.info('********************')
    53             callback(key.fileobj, mask)
    54 
    55 threading.Thread(target=select, name='select').start()
    56 
    57 def main():
    58     while not event.is_set():
    59         cmd = input('>>>')
    60         if cmd.strip() =='quit':
    61             event.set()
    62             fobjs = []
    63             logging.info('{}'.format(list(selector.get_map().items())))
    64 
    65             # 字典遍历,不能直接操作数据
    66             for fd, key in selector.get_map().items(): # 返回注册的项
    67                 print(fd) # 从打印可看出,开始用于accept的sock 也被记录当中
    68                 print(key.fileobj)
    69                 fobjs.append(key.fileobj)
    70 
    71             for fobj in fobjs:
    72                 selector.unregister(fobj)
    73                 fobj.close()
    74             selector.close()
    75 
    76 if __name__ == '__main__':
    77     main()
    View Code

      结果:

     1 D:python3.7python.exe E:/code_pycharm/tt8.py
     2 2018-11-06 14:53:39,911 MainThread 9256 <socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)>
     3 2018-11-06 14:53:39,911 MainThread 9256 SelectorKey(fileobj=<socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)>, fd=272, events=1, data=<function accept at 0x00000000004CC1E0>)
     4 >>>2018-11-06 14:53:55,211 select 8388 ********************
     5 2018-11-06 14:53:55,221 select 8388 new socket <socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 59300)> in accept
     6 2018-11-06 14:53:55,221 select 8388 --------------------
     7 2018-11-06 14:53:55,221 select 8388 SelectorKey(fileobj=<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 59300)>, fd=280, events=1, data=<function read at 0x0000000002A29510>)
     8 2018-11-06 14:53:56,851 select 8388 ********************
     9 2018-11-06 14:53:56,851 select 8388 new socket <socket.socket fd=296, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 59301)> in accept
    10 2018-11-06 14:53:56,851 select 8388 --------------------
    11 2018-11-06 14:53:56,851 select 8388 SelectorKey(fileobj=<socket.socket fd=296, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 59301)>, fd=296, events=1, data=<function read at 0x0000000002A29510>)
    12 
    13 
    14 >>>>>>quit
    15 2018-11-06 14:54:03,426 MainThread 9256 [(272, SelectorKey(fileobj=<socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)>, fd=272, events=1, data=<function accept at 0x00000000004CC1E0>)), (280, SelectorKey(fileobj=<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 59300)>, fd=280, events=1, data=<function read at 0x0000000002A29510>)), (296, SelectorKey(fileobj=<socket.socket fd=296, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 59301)>, fd=296, events=1, data=<function read at 0x0000000002A29510>))]
    16 272
    17 <socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)>
    18 280
    19 <socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 59300)>
    20 296
    21 <socket.socket fd=296, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 59301)>
    22 
    23 Process finished with exit code 0
    连接了两个客户端

    实现ChatServer 群聊:

      测试1:只监听了读,收到客户端信息,直接发送信息,给客户端。并没有监听客户端。

     1 import threading
     2 import selectors
     3 import logging
     4 import socket
     5 
     6 FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s'
     7 logging.basicConfig(level=logging.INFO, format=FORMAT)
     8 
     9 
    10 class ChatServer:
    11     def __init__(self, ip='127.0.0.1', port=9999):
    12         self.laddr = ip, port
    13         self.sock = socket.socket()
    14         self.event = threading.Event()
    15         self.selector = selectors.DefaultSelector()
    16 
    17     def start(self):
    18         self.sock.bind(self.laddr)
    19         self.sock.listen()
    20 
    21         self.selector.register(self.sock, selectors.EVENT_READ, self.accept)
    22         threading.Thread(target=self.select, name='select', daemon=True).start()
    23 
    24     def select(self):
    25         while not self.event.is_set():
    26             events = self.selector.select()
    27             for key, mask in events:
    28                 callbakck = key.data
    29                 callbakck(key.fileobj, mask)
    30 
    31     def accept(self,sock, mask):
    32         conn, raddr = sock.accept()
    33         key = self.selector.register(conn, selectors.EVENT_READ, self.send)
    34 
    35     def send(self,conn, mask):
    36         data = conn.recv(1024)
    37 
    38         if data.strip() == b'quit' or data == b'':
    39             self.selector.unregister(conn)
    40             conn.close()
    41             return
    42 
    43         for key in self.selector.get_map().values():
    44             if key.data !=self.accept:
    45                 msg = 'your msg is {}'.format(data.decode()).encode()
    46                 key.fileobj.send(msg)
    47 
    48     def stop(self):
    49         self.event.set()
    50         fobjs = []
    51         for fd, key in self.selector.get_map().items():
    52             fobjs.append(key.fileobj)
    53         for fobj in fobjs:
    54             self.selector.unregister(fobj)
    55             fobj.close()
    56         self.selector.close()
    57 
    58 def main():
    59     cs = ChatServer()
    60     cs.start()
    61 
    62     while True:
    63         print('=======')
    64         cmd = input(">>>>")
    65         if cmd == 'quit':
    66             cs.stop()
    67             break
    68         print(threading.enumerate())
    69 
    70 if __name__ == '__main__':
    71     main()
    View Code

      将服务端封装为了一个类,实例化 一个对象,进行操作

      使用了IO 复用 的模型,不需要创建容器去收纳客户端,selects,提供了一个字典,收纳 (fd, selectkey)

      不需要多线程,只需要将阻塞主线程的监听放在一个新线程中,不会阻塞主线程,就行。

      只需要循环监听函数就行,不需要 循环 send 和 recv

      

      测试2:实现读写分离,利用中间容器,存放读到的东西,写的时候,从里边拿。这里使用了Queue.

         self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.recv)


        注册语句,要监听  selectors.EVENT_READ | selectors.EVENT_WRITE 读 与 写 事件
         回调的时候,需要mask 来判断究竟是读触发了,还是写触发了,所以,需要修改方法声明,增加mask

      注意:读 和 写 是分离的,那么,handle 函数,应该这样写:   

    1 def handle(self, sock, mask):
    2     if mask & selectors.EVENT_READ: 读是1,写是2(十进制),二进制01,10,所以位与
    3         pass
    4     
    5     # 注意: 这里是某一个socket的写操作
    6     if mask & selectors.EVENT_WRITE:# 写缓冲区准备好了,可以写数据了
    7         pass

      handle方法 里面处理读, 写,mask有可能是0b01 0b10, 0b11

      问题是,假设读取到了客户端发来的数据后,如何写进去》?

      为每一个与客户端连接的socket对象增加对应的队列

      与每一个客户端连接的socket对象,自己维护一个队列,某一个客户端收到消息后,会遍历发给所有客户端的队列,这里完成一对多,即一份数据放到了所有的队列中。

      与每一个客户端练级的socket对象,发现自己对联有数据,就发送给客户端。

      

     1 import threading
     2 import selectors
     3 import logging
     4 import socket
     5 from queue import  Queue
     6 
     7 FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s'
     8 logging.basicConfig(level=logging.INFO, format=FORMAT)
     9 
    10 
    11 class ChatServer:
    12     def __init__(self, ip='127.0.0.1', port=9999):
    13         self.laddr = ip, port
    14         self.sock = socket.socket()
    15         self.event = threading.Event()
    16         self.selector = selectors.DefaultSelector()
    17 
    18     def start(self):
    19         self.sock.bind(self.laddr)
    20         self.sock.listen()
    21 
    22         self.selector.register(self.sock, selectors.EVENT_READ, self.accept)
    23         threading.Thread(target=self.select, name='select', daemon=True).start()
    24 
    25     def select(self):
    26         while not self.event.is_set():
    27             events = self.selector.select()
    28             for key, mask in events:
    29                 if key.data == self.accept:
    30                     callbakck = key.data
    31                     callbakck(key.fileobj)
    32                 else:
    33                     logging.info('{}'.format(threading.enumerate()))
    34                     callbakck = key.data[0]
    35                     callbakck(key, mask)
    36 
    37     def accept(self,sock):
    38         conn, raddr = sock.accept()
    39         self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, (self.handler, Queue()))
    40 
    41     def handler(self, key, mask):
    42         if mask & selectors.EVENT_READ:#
    43             conn = key.fileobj
    44             data = conn.recv(1024)
    45             logging.info(data)
    46             if data.strip() == b'quit' or data == b'':
    47                 self.selector.unregister(conn)
    48                 conn.close()
    49                 return
    50 
    51             msg = 'your msg is {}'.format(data.decode()).encode()
    52             for key in self.selector.get_map().values():
    53                 if isinstance(key.data, tuple):
    54                     key.data[1].put(msg)
    55             logging.info('----------------------')
    56 
    57         if selectors.EVENT_WRITE: #
    58             if not key.data[1].empty():
    59                 key.fileobj.send(key.data[1].get())
    60 
    61 
    62     def stop(self):
    63         self.event.set()
    64         fobjs = []
    65         for fd, key in self.selector.get_map().items():
    66             fobjs.append(key.fileobj)
    67         for fobj in fobjs:
    68             self.selector.unregister(fobj)
    69             fobj.close()
    70         self.selector.close()
    71 
    72 def main():
    73     cs = ChatServer()
    74     cs.start()
    75 
    76     while True:
    77         print('=======')
    78         cmd = input(">>>>")
    79         if cmd == 'quit':
    80             cs.stop()
    81             break
    82         print(threading.enumerate())
    83 
    84 if __name__ == '__main__':
    85     main()
    View Code

      这个程序最大的问题,在select(),一直判断可写,几乎一直循环不停,所以对于写不频繁的情况下,就不要监听 写。

      对于 Server 来说,一般更多的是等待对方发来数据后,响应时才发出数据,而不是积极的等着发送数据,所以,监听 读, 收到数据之后,再发送就可以了。

    为什么要坚持,想一想当初!
  • 相关阅读:
    使用HtmlAgilityPack将HtmlTable填入DataTable
    用EXCEL的VBA将PHPCMS的备份文件转换成HTML的一次尝试
    从微观角度看到宏观世界
    洛克菲特:如何管好你的钱包
    论永生_基因编辑
    如何隐藏自己的应用程序在服务器上不被发现?
    检视阅读
    改变了我对英语理解的语法课
    Rick And Morty使命必达与毁灭--------英语笔记
    文件太大,网速太慢,如何高效的传递到服务器上运行
  • 原文地址:https://www.cnblogs.com/JerryZao/p/9914399.html
Copyright © 2011-2022 走看看