select实现socket server多并发服务器端
# -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import socket import select import queue server = socket.socket() server.bind(('127.0.0.1', 9999)) server.listen() server.setblocking(False) # 要设置为非阻塞 input_list = [server, ] # 本身也要检测 output_list = [] msg_dic = {} while True: stdinput, stdoutput, stderr = select.select(input_list, output_list, input_list) # 第一个input_list参数为要用的连接,ouput_list为可能返回的连接,第二个input_list为可能报错的连接 # stdinput为连接的地址, stdoutput为返回的连接, stderr为错误的连接 print(stdinput, stdoutput, stderr) try: for r in stdinput: if r is server: # 来了个新连接 conn, addr = server.accept() print('当前连接客户端:', addr) input_list.append(conn) msg_dic[conn] = queue.Queue() # 初始化一个队列,用来储存要返回给客户端的数据 else: data = r.recv(1024) print('收到数据', data) msg_dic[r].put(data) output_list.append(r) # 放入返回的连接队列里 except socket.error: print('客户端断开连接') break for w in stdoutput: # 要返回给客户端的连接列表 data_to_client = msg_dic[w].get() w.send(data_to_client) # 把数据发送给客户端 output_list.remove(w) # 确保下次循环的时候stdoutput不返回已经处理完的连接 for e in stderr: if e in output_list: output_list.remove(e) input_list.remove(e) server.close() del msg_dic[e]
客户端
# -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import socket client = socket.socket() client.connect(('127.0.0.1', 9999)) while True: msg = input('>>>:').strip() if len(msg) ==0:continue client.send(msg.encode('utf-8')) data = client.recv(1024) print(data)
selector模块
selector模块可以使用select和epoll,它会根据所处的平台来选出最适合的I/O多路复用机制,在windows下为select,在linux下为epoll
通过selector模块实现单线程上万并发的socket server
服务器端
# -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() print('当前连接客户端', addr) conn.setblocking(False) # 把连接设置为非阻塞 sel.register(conn, selectors.EVENT_READ, read) # 新连接注册read回调函数,如果新的连接发送数据就执行read函数 def read(conn, mask): data = conn.recv(1024) if data: print('收到数据:',data) conn.send(data) else: print('客户端关闭', conn) sel.unregister(conn) # 注销注册的事件 conn.close() server = socket.socket() server.bind(('localhost', 9999)) server.listen() server.setblocking(False) sel.register(server, selectors.EVENT_READ, accept) # 注册一个事件,如果来了连接,就调用accept函数 # EVENT_READ,表示可读,值mask为1,EVENT_WRITE表示可写,值mask为2 while True: events = sel.select() # 调用epoll或select,默认阻塞,有活动的连接就返回活动的连接列表 for key, mask in events: callback = key.data # 回调函数,即accept函数 callback(key.fileobj, mask) # key.fileobj为文件句柄,即还没建立连接的socket实例 sel.close() # 最后要关闭,确保所有的资源被释放
客户端
# -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import socket import sys msg = [ b'python', b'php', b'java', ] server_address = ('127.0.0.1', 9999) socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(300)] print('connecting to %s port %s' % server_address) for s in socks: s.connect(server_address) for m in msg: for s in socks: print('%s: sending "%s"' % (s.getsockname(), m)) s.send(m) for s in socks: data = s.recv(1024) print('%s: received "%s"' % (s.getsockname(), data)) if not data: print(sys.stderr, 'closing socket', s.getsockname())
服务器端运行结果
客户端运行结果
300个socket连接1秒左右就全结束了