zoukankan      html  css  js  c++  java
  • 网络编程---异步同步

    异步编程

    重要概念

    同步、异步

    阻塞、非阻塞

    区别*

     

    联系

    同步IO、异步IO、IO多路复用

    IO两个阶段

     

    IO模型

    同步IO

    阻塞IO

     

    非阻塞IO

     

    IO多路复用

     

    异步IO

    Python中的IO多路复用

    selectors库

     

    #在selects模块源码最下面有如下代码
    # Choose the best implementation, roughly:
    #    epoll|kqueue|devpoll > poll > select.
    # select() also can't accept a FD > FD_SETSIZE (usually around 1024)
    if 'KqueueSelector' in globals():
        DefaultSelector = KqueueSelector
    elif 'EpollSelector' in globals():
        DefaultSelector = EpollSelector
    elif 'DevpollSelector' in globals():
        DefaultSelector = DevpollSelector
    elif 'PollSelector' in globals():
        DefaultSelector = PollSelector
    else:
        DefaultSelector = SelectSelector

     1 #使用示例
     2 import selectors
     3 import threading
     4 import socket
     5 import logging
     6 
     7 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
     8 logging.basicConfig(level=logging.INFO,format=FORMAT)
     9 
    10 #回调参数自己定义形参
    11 def accept(sock:socket.socket,mask):
    12     """mask:事件掩码的或值"""
    13     conn,addr = sock.accept()
    14     conn.setblocking(False)#不阻塞
    15     pass
    16 
    17 #回调函数
    18 def read(conn:socket.socket,mask):
    19     pass
    20 
    21 #构造缺省性能最优selector
    22 selector = selectors.DefaultSelector()
    23 
    24 #创建Tcp Server
    25 socket  = socket.socket()
    26 socket.bind(('0.0.0.0',9999))
    27 socket.listen()
    28 logging.info(socket)
    29 socket.setblocking(False)#非阻塞
    30 
    31 #注册文件对象sock关注读事件,返回SelectorKey
    32 #将socket、关注事件、data都绑定到key实例属性上
    33 key = selector.register(socket,selectors.EVENT_READ,accept)
    34 logging.info(key)
    35 
    36 e = threading.Event()
    37 
    38 def select(e:threading.Event):
    39     while not e.is_set():
    40         #开始监视,等到有文件对象监控事件产生,返回(key,mask)元组
    41         events = selector.select()#阻塞
    42         print('-'*30)
    43         for key,mask in events:
    44             logging.info(key)
    45             logging.info(mask)
    46             callback = key.data #回调函数
    47             callback(key.fileobj,mask)
    48 
    49 threading.Thread(target=select,args=(e,),name='select').start()
    50 
    51 def main():
    52     while not e.is_set():
    53         cmd = input('>>>')
    54         if cmd.strip() == 'quit':
    55             e.set()
    56             fobjs = []
    57             logging.info("{}".format(list(selector.get_map().items())))
    58             
    59             for fd,key in selector.get_map().items():
    60                 print(fd,key)
    61                 print(key.fileobj)
    62                 fobjs.append(key.fileobj)
    63             
    64             for fobj in fobjs:
    65                 selector.unregister(fobj)
    66                 fobj.close()
    67             selector.close()
    68 
    69 if __name__ == '__main__':
    70     main()

    练习

     

     1 import socket
     2 import threading
     3 import logging
     4 import datetime
     5 import selectors
     6 
     7 FORMAT="%(asctime)s %(threadName)s %(thread)d %(message)s"
     8 logging.basicConfig(level=logging.INFO,format=FORMAT)
     9 
    10 #TCP Server
    11 class ChatServer():
    12     def __init__(self,ip='0.0.0.0',port=9999):
    13         self.addr = (ip,port)
    14         self.socket = socket.socket()
    15         self.event = threading.Event()
    16         self.clients = {}
    17         #增加
    18         self.selector = selectors.DefaultSelector()#创建selector
    19 
    20     def start(self):#启动监听
    21         self.socket.bind(self.addr)
    22         self.socket.listen()
    23         # threading.Thread(target=self.accept,name='accept').start()
    24         #增加
    25         self.socket.setblocking(False)
    26         self.selector.register(self.socket,selectors.EVENT_READ,self.accept)#注册
    27         threading.Thread(target=self.select,name='select',daemon=True).start()
    28 
    29     #增加
    30     def select(self):#阻塞
    31         while not  self.event.is_set():
    32             #开始监视,等到某文件对象被监控的事件产生,返回(key,mask)元组
    33             events = self.selector.select()
    34             print("*"*30)
    35             for key,mask in events:
    36                 logging.info(key)
    37                 logging.info(mask)
    38                 callback = key.data#回调函数
    39                 callback(key.fileobj)
    40 
    41     #回调函数
    42     def accept(self,sock:socket.socket):#接受客户端连接
    43         conn,addr = sock.accept()
    44         conn.setblocking(False)
    45         #注册,监视每一个与客户端的连接的socket对象
    46         self.selector.register(conn,selectors.EVENT_READ,self.recv)
    47 
    48     #回调函数
    49     def recv(self,sock:socket.socket):#接受客户端数据
    50         data = sock.recv(1024)
    51         if not data or data == b'quit':#客户端主动断开或退出,注销并关闭socket
    52             self.selector.unregister(sock)
    53             sock.close()
    54             return
    55         msg = "{:%Y/%m/%d %H%:M:%S} {}:{}
    {}
    ".format(datetime.datetime.now(),
    56                                                         *sock.getpeername(),data.encode())
    57         #群发
    58         for key in self.selector.get_map().values():
    59             if key.data == self.recv:#排除self.accept
    60                 key.fileobj.send(msg)
    61 
    62     #停止服务
    63     def stop(self):
    64         self.event.set()
    65         fobjs = []
    66         for fd,key in self.selector.get_map().items():#fd:key对象
    67             fobjs.append(key.fileobj)
    68         for fobj in fobjs:
    69             self.selector.unregister(fobj)
    70             fobj.close()
    71         self.selector.close()
    72 
    73 def main():
    74     cs = ChatServer()
    75     cs.start()
    76 
    77     while True:
    78         cmd = input(">>>")
    79         if cmd == 'quit':
    80             cs.stop()
    81             break
    82         logging.info(threading.enumerate())
    83 
    84 if __name__ == '__main__':
    85     main()

     

     

     1 #-*- codeing:utf-8 -*-
     2 import socket
     3 import threading
     4 import datetime
     5 import logging
     6 import selectors
     7 from queue import Queue
     8 
     9 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
    10 logging.basicConfig(level=logging.INFO,format=FORMAT)
    11 
    12 class ChatServer:
    13     def __init__(self,ip='127.0.0.1',port=9999):
    14         self.sock = socket.socket()
    15         self.addr = (ip,port)
    16         self.clients = {}
    17         self.event = threading.Event()
    18         self.selector = selectors.DefaultSelector()#创建selector
    19 
    20     def start(self):#启动监听
    21         self.sock.bind(self.addr)
    22         self.sock.listen()
    23         self.sock.setblocking(False)
    24         #注册
    25         self.selector.register(self.sock,selectors.EVENT_READ,self.accept)
    26         threading.Thread(target=self.select,name='selector').start()
    27 
    28     def select(self):
    29         while not self.event.is_set():
    30             #开始监视,等到某文件对象被监控的事件发生,返回(key,mask)元组
    31             events = self.selector.select()#阻塞,直到events
    32             for key,mask in events:
    33                 if callable(key.data):
    34                     callback = key.data
    35                     callback = (key.fileobj,mask)
    36                 else:
    37                     callback = key.data[0]
    38                     callback(key,mask)
    39 
    40     def accept(self,sock:socket.socket,mask):#接收客户端连接
    41         conn,raddr = self.sock.accept()
    42         conn.setblocking(False)
    43         self.clients[raddr] = (self.handler,Queue())
    44         #注册,监视每一个与客户端连接的socket对象
    45         self.selector.register(conn,selectors.EVENT_READ | selectors.EVENT_WRITE,self.clients[raddr])
    46 
    47     def handler(self,key:selectors.SelectorKey,mask):#接收客户端数据
    48         if mask & selectors.EVENT_READ:
    49             sock = key.fileobj
    50             raddr = sock.getpeername()
    51             data = sock.recv(1024)
    52             if not data or data == b'quit':  # 客户端主动断开或退出,注销并关闭socket
    53                 self.selector.unregister(sock)
    54                 sock.close()
    55                 self.clients.pop(raddr)
    56                 return
    57             for k in self.selector.get_map().values():
    58                 logging.info(k)
    59                 if isinstance(k.data,tuple):
    60                     k.data[1].put(data)
    61         if mask & selectors.EVENT_WRITE:
    62             #因为写一直就绪,mask为2,所以一直可以写,从而导致select()不断循环,如同不阻塞一样
    63             if not key.data[1].empty():
    64                 key.fileobj.send(key.data[1].get())
    65 
    66         # 停止服务
    67 
    68     def stop(self):
    69         self.event.set()
    70         fobjs = []
    71         for fd, key in self.selector.get_map().items():
    72             fobjs.append(key.fileobj)
    73         for fobj in fobjs:
    74             self.selector.unregister(fobj)
    75             fobj.close()
    76         self.selector.close()
    77 
    78 def main():
    79     cs = ChatServer()
    80     cs.start()
    81 
    82     while True:
    83         cmd = input(">>>")
    84         if cmd == 'quit':
    85             cs.stop()
    86             break
    87         logging.info(threading.enumerate())
    88 
    89 if __name__ == '__main__':
    90     main()

    asyncio

     问题的引出

    def a():
        for x in range(3):
            print(x)
    
    def b():
        for x in "abc":
            print(x)
    
    a()
    b()
    
    #运行结果一定是
    0
    1
    2
    a
    b
    c

    import threading
    import time
    
    def a():
        for x in range(3):
            time.sleep(0.001)
            print(x)
    
    def b():
        for x in "abc":
            time.sleep(0.001)
            print(x)
    
    threading.Thread(target=a,name='a').start()
    threading.Thread(target=b,name='b').start()
    #运行结果
    a
    0
    b
    1
    c
    2
    #多进程版本
    import multiprocessing
    import time
    def a():
        for x in range(3):
            time.sleep(0.001)
            print(x)
    
    def b():
        for x in "abc":
            time.sleep(0.001)
            print(x)
    
    if __name__ == '__main__':
        multiprocessing.Process(target=a, name='a').start()
        multiprocessing.Process(target=b, name='b').start()
    
    #运行结果
    0
    1
    a
    2
    b
    c
    #生成器版本
    def a():
        for x in range(3):
            print(x)
            yield
    
    def b():
        for x in "abc":
            print(x)
            yield
    
    x = a()
    y = b()
    for i in range(3):
        next(x)
        next(y)
    
    #运行结果
    0
    a
    1
    b
    2
    c

    事件循环

     

     协程

    协程的使用

    import asyncio
    
    @asyncio.coroutine
    def sleep(x):#协程函数
        for i in range(3):
            print('sleep {}'.format(i))
            yield from asyncio.sleep(x)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(sleep(3))
    loop.close()

    import asyncio
    async def sleep(x):
        for i in range(3):
            print('sleep {}'.format(i))
            await asyncio.sleep(x)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(sleep(3))
    loop.close()

    import asyncio
    import threading
    
    async def sleep(x):
        for i in range(x):
            print('sleep {}'.format(i))
            await asyncio.sleep(x)
    
    
    async def showthread(x):
        for i in range(x):
            print(threading.enumerate())
            await asyncio.sleep(2)
    
    loop = asyncio.get_event_loop()
    tasks = [sleep(3),showthread(3)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    #运算结果
    [<_MainThread(MainThread, started 21012)>]
    sleep 0
    [<_MainThread(MainThread, started 21012)>]
    sleep 1
    [<_MainThread(MainThread, started 21012)>]
    sleep 2
    #协程版本
    import asyncio
    @asyncio.coroutine
    def a():
        for x in range(3):
            print('a.x',x)
            yield
    
    @asyncio.coroutine
    def b():
        for x in 'abc':
            print('b.x',x)
            yield
    
    print(asyncio.iscoroutinefunction(a))
    print(asyncio.iscoroutinefunction(b))
    
    #大循环
    loop = asyncio.get_event_loop()
    tasks = [a(),b()]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    
    #运行结果
    True
    True
    a.x 0
    b.x a
    a.x 1
    b.x b
    a.x 2
    b.x c

    TCP Echo Server举例

    import asyncio
    #TCP Echo Server举例
    async  def handle(reader,writer):
        while True:
            data = await reader.read(1024)
            print(dir(reader))
            print(dir(writer))
            client = writer.get_extra_info('peername')
            message = "{} Your msg {}".format(client,data.decode()).encode()
            writer.writer(message)
            await writer.drain()
    
    loop = asyncio.get_event_loop()
    ip = '127.0.0.1'
    port = 9999
    crt = asyncio.start_server(handle,ip,port,loop=loop)
    server = loop.run_until_complete(crt)
    print(server)#server是监听的socket对象
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        server.close()
        loop.close()

    aiohttp库

    HTTP Server

    from aiohttp import web
    
    async def indexhandle(request:web.Request):
        return web.Request(text=request.path,status=201)
    
    async def handle(request:web.Request):
        print(request.match_info)
        print(request.query_string)#http://127.0.0.1:8080/1?name=12301
        return web.Request(text=request.match_info.get('id','0000'),status=200)
    
    app = web.Application()
    app.router.add_get("/",indexhandle)#http://127.0.0.1:8080/
    app.router.add_get("/{id}",handle)#http://127.0.0.1:8080/12301
    
    web.run_app(app,host='0.0.0.0',port=9977)

    HTTP Client

    import asyncio
    from aiohttp import ClientSession
    
    async def get_html(url:str):
        async with ClientSession() as session:
            async with session.get(url) as res:
                print(res.status)
                print(await res.text())
    
    
    url = 'http://127.0.0.1/ziroom-web'
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_html(url))
    loop.close()
    做一枚奔跑的老少年!
  • 相关阅读:
    数据库分区与分表
    Paxos算法简单介绍
    Zookeeper实现分布式锁服务(Chubby)
    java.lang.OutOfMemoryError: Java heap space错误及处理办
    关于分布式事务、两阶段提交协议、三阶提交协议
    Volatile
    寻找数组中只出现一次的数
    堆排序
    二叉树遍历 递归非递归
    redis 数据类型
  • 原文地址:https://www.cnblogs.com/xiaoshayu520ly/p/11007803.html
Copyright © 2011-2022 走看看