zoukankan      html  css  js  c++  java
  • epoll实现多路复用

    程序阻塞的过程

    假设系统目前运行了三个进程 A B C

    进程A正在运行一下socket程序

    server = socket.socket()
    server.bind(("127.0.0.1",1688))
    server.listen()
    server.accept()

    1.系统会创建文件描述符指向一个socket对象 ,其包含了读写缓冲区,已经进行等待队列

    2.当执行到accept / recv 时系统会讲进程A 从工作队列中移除

    3.将进程A的引用添加到 socket对象的等待队列中

    进程的唤醒

    1.当网卡收到数据后会现将数据写入到缓冲区

    2.发送中断信号给CPU

    3.CPU执行中断程序,将数据从内核copy到socket的缓冲区

    4.唤醒进程,即将进程A切换到就绪态,同时从socket的等待队列中移除这个进程引用

     用select来监控多个socket

    select的实现思路比较直接

    1.先将所有socket放到一个列表中,

     2.遍历这个列表将进程A 添加到每个socket的等待队列中 然后阻塞进程

    3.当数据到达时,cpu执行中断程序将数据copy给socket 同时唤醒处于等待队列中的进程A为了防止重复添加等待队列 还需要移除已经存在的进程A

     4.进程A唤醒后 由于不清楚那个socket有数据,所以需要遍历一遍所有socket列表

     缺点:

    1.select,需要遍历socket列表,频繁的对等待队列进行添加移除操作,

    2.数据到达后还需要遍历所有socket才能获知哪些socket有数据两个操作消耗的时间随着要监控的socket的数量增加而大大增加,处于效率考虑才规定了最大只能监视1024个socket

     epol可以解决的问题

    1.避免频繁的排队等待

    2.避免遍历所有socket

     第一个问题

     socket的处理方式

    while True:
        r_list,w_list,x_list = select.select(rlist,wlist,xlist)

    每次处理完一次读写后,都需要将所用过冲重复一遍,包括移除进程,添加进程,默认就会将进程添加到等待队列,并阻塞住进程,然而等待队列的更新操作并不频繁。

    所以对于第一个问题epoll,采取的方案是,将对等待队列的维护和,阻塞进程这两个操作进行拆分,

    import socket,select
    server = socket.socket()
    server.bind(("127.0.0.1",1688))
    server.listen(5)
    
    #创建epoll事件对象,后续要监控的事件添加到其中
    epoll = select.epoll()
    #注册服务器监听fd到等待读事件集合
    epoll.register(server.fileno(), select.EPOLLIN)
    
    # 等待事件发生
    while True:
        for sock,event in epoll.poll():
        pass

    在epoll中register 与 unregister函数用于维护等待队列

    epoll.poll则用于阻塞进程

     第二个问题

    epol为了解决这个问题,在内核中维护了一个就绪列表,

    1.创建epoll对象,epoll也会对应一个文件,由文件系统管理

    2.执行register时,将epoll对象 添加到socket的等待队列中

    3.数据到达后,CPU执行中断程序,将数据copy给socket

    4.在epoll中,中断程序接下来会执行epoll对象中的回调函数,传入就绪的socket对象

    5.将socket,添加到就绪列表中

    6.唤醒epoll等待队列中的进程,

    进程唤醒后,由于存在就绪列表,所以不需要再遍历socket了,直接处理就绪列表即可

    解决了这两个问题后,并发量得到大幅度提升,最大可同时维护上万级别的socket

     完整案例:

    linux中提供了epoll 这种多路复用的IO模型,注意其他平台没有相应的实现

    所以epoll仅在linux中可用

    客户端

    #coding:utf-8
    #客户端
    #创建客户端socket对象
    import socket
    clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #服务端IP地址和端口号元组
    server_address = ('127.0.0.1',1688)
    #客户端连接指定的IP地址和端口号
    clientsocket.connect(server_address)
    
    while True:
        #输入数据
        data = raw_input('please input:')
        if data == "q":
            break
        if not data:
          continue
        #客户端发送数据
        clientsocket.send(data.encode("utf-8"))
        #客户端接收数据
        server_data = clientsocket.recv(1024)
        print ('客户端收到的数据:',server_data)
    #关闭客户端socket
    clientsocket.close()

    服务器

    # coding:utf-8
    import socket, select
    
    server = socket.socket()
    server.bind(("127.0.0.1", 1688))
    server.listen(5)
    
    msgs = []
    
    
    fd_socket = {server.fileno(): server}
    epoll = select.epoll()
    # 注册服务器的 写就绪
    epoll.register(server.fileno(), select.EPOLLIN)
    
    while True:
        for fd, event in epoll.poll():
            sock = fd_socket[fd]
            print(fd, event)
            # 返回的是文件描述符 需要获取对应socket
            if sock == server:  # 如果是服务器 就接受请求
                client, addr = server.accept()
                # 注册客户端写就绪
                epoll.register(client.fileno(), select.EPOLLIN)
                # 添加对应关系
                fd_socket[client.fileno()] = client
    
            # 读就绪
            elif event == select.EPOLLIN:
                data = sock.recv(2018)
                if not data:
                    # 注销事件
                    epoll.unregister(fd)
                    # 关闭socket
                    sock.close()
                    # 删除socket对应关系
                    del fd_socket[fd]
                    print(" somebody fuck out...")
                    continue
    
                print(data.decode("utf-8"))
                # 读完数据 需要把数据发回去所以接下来更改为写就绪=事件
                epoll.modify(fd, select.EPOLLOUT)
                #记录数据
                msgs.append((sock,data.upper()))
            elif event == select.EPOLLOUT:
                for item in msgs[:]:
                    if item[0] == sock:
                        sock.send(item[1])
                        msgs.remove(item)
                # 切换关注事件为写就绪
                epoll.modify(fd,select.EPOLLIN)
  • 相关阅读:
    IntStack(存放int型值,带迭代器) 附模板化实现 p406
    Mule自带例子之stockquote
    Mule自带例子之loanbroker-simple
    Mule自带例子之flight-reservation
    hsqldb使用
    基于memcached中命令分析函数tokenize_command改造的split函数
    Memcached-1.4.4启动参数——手动设置chunk大小的上限
    Memcached源码分析——process_command函数解析
    Memcached源码分析——hash
    Memcached源码分析——slab的初始化
  • 原文地址:https://www.cnblogs.com/duGD/p/11006136.html
Copyright © 2011-2022 走看看