zoukankan      html  css  js  c++  java
  • Python异步IO之select

    1. select模块的基本使用(以socket为例)

     1 # -*- coding:utf-8 -*-
     2 # Author:Wong Du
     3 
     4 import select
     5 import socket
     6 import queue
     7 
     8 HOST_PORT = ("0.0.0.0", 9000)
     9 print("33[31;1mServer Addr:{0}33[0m".format(HOST_PORT))
    10 
    11 server = socket.socket()
    12 server.bind(HOST_PORT)
    13 server.listen(10)
    14 
    15 inputs = [server, ]     # 监测服务端socket的活跃状态和新socket连接活跃状态列表
    16 outputs = []            # 发送数据给客户端的socket连接列表
    17 msg_queue = {}          # 发送给客户端的信息数据字典
    18 
    19 # 断开socket连接函数
    20 def disconnect():
    21     print("33[41;1mClient [{0}] disconnected...33[0m".format(r.getpeername()))
    22     if r in outputs:
    23         outputs.remove(r)   
    24     inputs.remove(r)  # 移除select监测连接
    25     del msg_queue[r]    # 删除消息队列
    26 
    27 
    28 
    29 if __name__ == '__main__':
    30     while True:
    31         print("Waiting for socket to active...")
    32 
    33         # 如果在select监测的对象里没有活跃状态的socket,则卡在这里;
    34         # 当有活跃的socket对象时,把socket对象赋给readable,writeable,excetional;
    35         readable, writeable, exceptional = select.select(inputs, outputs, inputs)
    36         print("active connect:", readable, writeable, exceptional)
    37 
    38         for r in readable:  # 循环readable里活跃的socket对象
    39             if r is server: # 如果活跃的是server的socket对象,则代表有新客户端socket连接进来了
    40                 conn, client_addr = server.accept()     # 接收client对象
    41                 print("33[32;1mConnected client addr:{0}33[0m".format(client_addr))   # 打印连接进来的对象身份
    42                 inputs.append(conn)                 # 加入到inputs列表让select监测处理
    43                 msg_queue[conn] = queue.Queue()     # 接收到的客户端连接,不立即返回,暂存在队列里,以后发送
    44             else:   # 如果活跃的不是server的socket对象,则代表客户端socket有数据发过来了
    45                 try:
    46                     data = r.recv(1024).decode()  # 接收客户端数据
    47                     if data:
    48                         # r.getpeername()获取客户端连接的身份信息
    49                         print("33[33;1mReceived [{0}] from the {1}33[0m".format(data, r.getpeername()))
    50                         if r not in outputs:
    51                             outputs.append(r)   # 把客户端连接加到outputs列表里
    52                         msg_queue[r].put(data.upper().encode()) # 把想要返回给客户端的数据放到相应的客户端连接队列里
    53                     else:
    54                         disconnect()    # 连接断开
    55                 except ConnectionResetError as e:       # 抓取客户端断开错误
    56                     disconnect()        # 连接断开
    57 
    58         for w in writeable:
    59             # print("33[34;1mReading to send to client {0}33[0m".format(w.getpeername()))
    60             try:
    61                 data = msg_queue[w].get_nowait()   # 获取队列中的数据
    62                 w.send(data)        # 发送给客户端
    63                 print("33[34;1mSend [{0}] to client {1}33[0m".format(data, w.getpeername()))
    64             except queue.Empty as e:
    65                 print("No data to send to the client [{0}]".format(w.getpeername()))
    66 
    67             outputs.remove(w)   # 移除列表中需要发送数据的连接
    68 
    69         for e in exceptional:
    70             print("33[41;1mClient [{0}] handles exception33[0m".format(e.getpeername())) #select处理异常连接
    71             if e in outputs:
    72                 outputs.remove(e)
    73 
    74             inputs.remove(e)
    75             del msg_queue[e]
    select_socket_server
     1 # -*- coding:utf-8 -*-
     2 # Author:Wong Du
     3 
     4 import socket
     5 
     6 HOST_PORT = ("localhost", 9000)
     7 client = socket.socket()
     8 client.connect(HOST_PORT)
     9 
    10 while True:
    11     cmd = input("%s#" % HOST)
    12     if not cmd:
    13         continue
    14     client.send(cmd.encode())
    15     data = client.recv(1024)
    16     print(data.decode())
    select_socket_client
     1 # -*- coding:utf-8 -*-
     2 # Author:Wong Du
     3 
     4 import socket
     5 import time
     6 
     7 server_addr = ("localhost", 9000)
     8 client_socket = [socket.socket() for i in range(100)]
     9 
    10 msg_data = "如果我是dj,你会爱我吗?"
    11 count = 0
    12 
    13 
    14 for client in client_socket:
    15     try:
    16         client.connect(server_addr)
    17         print("Success to connected...")
    18         client.send(msg_data.encode())
    19         data = client.recv(1024)
    20         print(data.decode())
    21         count += 1
    22     except ConnectionResetError:
    23         print(count)
    24 time.sleep(10)
    select_socket_client高并发测试

    2. selector模块的基本使用(以socket为例)

     1 # -*- coding:utf-8 -*-
     2 # Author:Wong Du
     3 
     4 import selectors
     5 import socket, queue
     6 
     7 sel = selectors.DefaultSelector()   # 生成一个sel对象
     8 send_msg = {}                       # 返回信息列表
     9 
    10 # 接收客户端连接函数
    11 def accept(sock, mask):
    12     conn, client_addr = sock.accept()
    13     print("33[33;1mClient {0} is connected...33[0m".format(client_addr))
    14     # sock.setbloking(False)
    15     # 将客户端socket对象和read方法注册到sel里面
    16     sel.register(conn, selectors.EVENT_READ, read)
    17     send_msg[conn] = queue.Queue()
    18 
    19 # 接收客户端发来数据函数
    20 def read(r, mask):
    21     try:
    22         data = r.recv(1024).decode()
    23         if data:
    24             print("Received [{0}] from the client {1}".format(data, r.getpeername()))
    25             # conn.send(data.upper())
    26             send_msg[r].put(data.upper().encode())
    27             sel.modify(r, selectors.EVENT_WRITE, write)
    28         else:
    29             disconnect(r)
    30     except ConnectionResetError:
    31         disconnect(r)
    32 
    33 # 发送数据给客户端函数
    34 def write(w, mask):
    35     # print("33[34;1mReading to send to client {0}33[0m".format(w.getpeername()))
    36     try:
    37         data = send_msg[w].get_nowait()  # 获取队列中的数据
    38         w.send(data)  # 发送给客户端
    39         print("33[34;1mSend [{0}] to client {1}33[0m".format(data.decode(), w.getpeername()))
    40     except queue.Empty as e:
    41         print("No data to send to the client [{0}]".format(w.getpeername()))
    42 
    43     sel.modify(w, selectors.EVENT_READ, read)
    44 
    45 # 断开socket连接函数
    46 def disconnect(conn):
    47     print("33[41;1mClient {0} disconnected...33[0m".format(conn.getpeername()))
    48     sel.unregister(conn)
    49     conn.close()
    50 
    51 # 初始化服务端连接
    52 HOST_PORT = ("localhost", 20000)
    53 server = socket.socket()
    54 server.bind(HOST_PORT)
    55 server.listen(10)
    56 print("33[31;1mServer Address: {0}33[0m".format(HOST_PORT))
    57 # 将服务端socket对象和accept方法注册到sel里面
    58 sel.register(server, selectors.EVENT_READ, accept)
    59 
    60 while True:
    61     # 开启事件监测(阻塞),类似select的select.select(inputs, outputs, inputs),当有活跃连接时,
    62     # 返回一个元组,元组内有SelectorKey对象和events事件编号
    63     # SelectorKey对象内包含fileobj嵌套字对象,fd柄, events事件编号,data回调函数内存空间指针
    64     print("33[32;1mWaiting to connections...33[0m")
    65     events = sel.select()
    66     print("Active events: %s" % events)
    67 
    68     for key, mask in events:
    69         # 获取socket回调函数内存指针,即accept函数
    70         callback = key.data
    71         # 调用函数并把socket和events传进去
    72         callback(key.fileobj, mask)
    selector_socket_server
     1 # -*- coding:utf-8 -*-
     2 # Author:Wong Du
     3 
     4 import socket
     5 HOST = "localhost"
     6 Port = 20000
     7 client = socket.socket()
     8 client.connect((HOST, Port))
     9 
    10 while True:
    11     cmd = input("%s#" % HOST)
    12     if not cmd:
    13         continue
    14     client.send(cmd.encode())
    15     data = client.recv(1024)
    16     print(data.decode())
    selector_socket_client
  • 相关阅读:
    如何设计web系统的监控
    RedisCluster的rename机制失败报错,解决又是数据倾斜问题
    学习大数据基础资源收集与分享
    用过滤器实现日志记录
    HttpClient 教程
    【公告】
    【2020赛季训练实录】
    【BZOJ5415&UOJ393】归程(Kruskal重构树,最短路)
    【BZOJ3545&BZOJ3551】Peaks(kruskal重构树,主席树,dfs序)
    【CF1263E】Editor(线段树,栈)
  • 原文地址:https://www.cnblogs.com/Caiyundo/p/9548975.html
Copyright © 2011-2022 走看看