zoukankan      html  css  js  c++  java
  • Python selectors实现socket并发

    selectors模块
    •   此模块允许基于选择模块原语构建高级别和高效的I / O多路复用。
    •   鼓励用户使用此模块,除非他们想要精确控制使用的os级别的原语。

    注:selectors也是包装了select高级的包装内置函数,它包装了select与epoll,优先使用epoll windos内只支持select。

    实现接收上万并发
    服务端:
    import selectors
    import socket
    
    # 生成select实例对象
    sel = selectors.DefaultSelector()
    
    def accept(sock, mask):
    
        # 接收链接
        conn, addr = sock.accept()  # Should be ready
        print('accepted', conn, 'from', addr)
    
        # 链接设置非阻塞模式
        conn.setblocking(False)
    
        # 注册conn,回调 read函数
        sel.register(conn, selectors.EVENT_READ, read)
    
    def read(conn, mask):
        data = conn.recv(1024)  # Should be ready
        if data:
            print('echoing', repr(data), 'to', conn)
            conn.send(data)  # Hope it won't block
        else:
            print('closing', conn)
    
            # 取消注册
            sel.unregister(conn)
    
            # 关闭链接
            conn.close()
    
    sock = socket.socket()
    sock.bind(('localhost', 9999))
    sock.listen(10000)
    sock.setblocking(False)
    
    # 注册server事件:
    # 参数一:sock 进行监听
    # 参数二:selectors.EVENT_READ 执行动作
    # 参数三:accept,只要来一个链接就回调这个函数
    sel.register(sock, selectors.EVENT_READ, accept)
    
    # 第一次调用server,register accept
    # 第二次调用client,register read
    while True:
    
        # 调用select:优先使用epoll
        events = sel.select()
    
        # 只要不阻塞就有调用的数据,返回一个列表
        # 默认阻塞,有活动链接就返回活动的链接列表
        for key, mask in events:
    
            # callback相当于调accept函数
            callback = key.data
    
            # 获取函数内存地址,加入参数
            # key.fileobj = 文件句柄
            callback(key.fileobj, mask)

    客户端:

    import socket
    import sys
    
    messages = [ b'This is the message. ',
                 b'It will be sent ',
                 b'in parts.',
                 ]
    
    # 传入链接参数
    server_address = ('localhost', 9999)
    
    # Create a TCP/IP socket
    # 使用列表生成式,生成多个请求。
    # Winuds使用select支持并发并不多 这里测试40个并发
    # Linux默认使用epoll可支持上万并发可修改10000
    socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM)
              for i in range(40)]
    
    print('connecting to %s port %s' % server_address)
    
    # 循环socks连续链接4次
    for s in socks:
    
        s.connect(server_address)
    
    # 循环发送数据
    for message in messages:
    
        # Send messages on both sockets
        # 每个客户端依次发送数据
        for s in socks:
            print('%s: sending "%s"' % (s.getsockname(), message) )
            s.send(message)
    
        # Read responses on both sockets
        # 服务端收数据
        for s in socks:
            data = s.recv(1024)
    
            # getsockname()服务端返回
            print( '%s: received "%s"' % (s.getsockname(), data) )
    
            # 没有数据打印客户端要关闭了
            if not data:
                print(sys.stderr, 'closing socket', s.getsockname() )
    注:建立一个socket就需要建立一个文件句柄,最大数65535
    注:Linux修改文件描述符数值 当前执行
    ulimit -SHn 65535    #修改链接端口
    ulimit -n                #查看链接端口   
    基于Linux下 需要修改配置

    epoll实现多并发socket

    import socket, logging
    import select, errno
    
    logger = logging.getLogger("network-server")
    
    def InitLog():
        logger.setLevel(logging.DEBUG)
    
        fh = logging.FileHandler("network-server.log")
        fh.setLevel(logging.DEBUG)
        ch = logging.StreamHandler()
        ch.setLevel(logging.ERROR)
    
        formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        ch.setFormatter(formatter)
        fh.setFormatter(formatter)
    
        logger.addHandler(fh)
        logger.addHandler(ch)
    
    
    if __name__ == "__main__":
        InitLog()
    
        try:
            # 创建 TCP socket 作为监听 socket
            listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        except socket.error as  msg:
            logger.error("create socket failed")
    
        try:
            # 设置 SO_REUSEADDR 选项
            listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        except socket.error as  msg:
            logger.error("setsocketopt SO_REUSEADDR failed")
    
        try:
            # 进行 bind -- 此处未指定 ip 地址,即 bind 了全部网卡 ip 上
            listen_fd.bind(('', 2003))
        except socket.error as  msg:
            logger.error("bind failed")
    
        try:
            # 设置 listen 的 backlog 数
            listen_fd.listen(10)
        except socket.error as  msg:
            logger.error(msg)
    
        try:
            # 创建 epoll 句柄
            epoll_fd = select.epoll()
            # 向 epoll 句柄中注册 监听 socket 的 可读 事件
            epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
        except select.error as  msg:
            logger.error(msg)
    
        connections = {}
        addresses = {}
        datalist = {}
        while True:
            # epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待
            epoll_list = epoll_fd.poll()
    
            for fd, events in epoll_list:
                # 若为监听 fd 被激活
                if fd == listen_fd.fileno():
                    # 进行 accept -- 获得连接上来 client 的 ip 和 port,以及 socket 句柄
                    conn, addr = listen_fd.accept()
                    logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno()))
                    # 将连接 socket 设置为 非阻塞
                    conn.setblocking(0)
                    # 向 epoll 句柄中注册 连接 socket 的 可读 事件
                    epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET)
                    # 将 conn 和 addr 信息分别保存起来
                    connections[conn.fileno()] = conn
                    addresses[conn.fileno()] = addr
                elif select.EPOLLIN & events:
                    # 有 可读 事件激活
                    datas = ''
                    while True:
                        try:
                            # 从激活 fd 上 recv 10 字节数据
                            data = connections[fd].recv(10)
                            # 若当前没有接收到数据,并且之前的累计数据也没有
                            if not data and not datas:
                                # 从 epoll 句柄中移除该 连接 fd
                                epoll_fd.unregister(fd)
                                # server 侧主动关闭该 连接 fd
                                connections[fd].close()
                                logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
                                break
                            else:
                                # 将接收到的数据拼接保存在 datas 中
                                datas += data
                        except socket.error as  msg:
                            # 在 非阻塞 socket 上进行 recv 需要处理 读穿 的情况
                            # 这里实际上是利用 读穿 出 异常 的方式跳到这里进行后续处理
                            if msg.errno == errno.EAGAIN:
                                logger.debug("%s receive %s" % (fd, datas))
                                # 将已接收数据保存起来
                                datalist[fd] = datas
                                # 更新 epoll 句柄中连接d 注册事件为 可写
                                epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
                                break
                            else:
                                # 出错处理
                                epoll_fd.unregister(fd)
                                connections[fd].close()
                                logger.error(msg)
                                break
                elif select.EPOLLHUP & events:
                    # 有 HUP 事件激活
                    epoll_fd.unregister(fd)
                    connections[fd].close()
                    logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
                elif select.EPOLLOUT & events:
                    # 有 可写 事件激活
                    sendLen = 0
                    # 通过 while 循环确保将 buf 中的数据全部发送出去
                    while True:
                        # 将之前收到的数据发回 client -- 通过 sendLen 来控制发送位置
                        sendLen += connections[fd].send(datalist[fd][sendLen:])
                        # 在全部发送完毕后退出 while 循环
                        if sendLen == len(datalist[fd]):
                            break
                    # 更新 epoll 句柄中连接 fd 注册事件为 可读
                    epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
                else:
                    # 其他 epoll 事件不进行处理
                    continue
  • 相关阅读:
    公司的首页
    ubuntu 无法在Eclipse中识别 设备
    Eclipse 和 Android Studio 并存
    Eclipse 和 Android Studio 并存
    mac 节约硬盘空间
    一公升的眼泪
    Mac Ogre
    代码大全 是极好的
    Ogre Ubuntu 环境搭建
    cocos2d-x 环境搭建 c++ 版本
  • 原文地址:https://www.cnblogs.com/xiangsikai/p/8215142.html
Copyright © 2011-2022 走看看