zoukankan      html  css  js  c++  java
  • Python Select模型

    IO多路复用

    IO多路复用就是我们经常说的select epoll.select和epoll的好处是单个process就可以同时处理多个网络IO。基本原理是selectepoll会不断的轮询所负责的所有socket,当有某个socket数据到达了,就通知用户进程。
    下面是流程图:

      当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
    

    注意1:select函数返回结果中如果有文件可读了,那么进程就可以通过调用accept()或recv()来让kernel将位于内核中准备到的数据copy到用户区。

    注意2: select的优势在于可以处理多个连接,不适用于单个连接

    selectors

    基于select模块实现的IO多路复用

    `

    IO多路复用实现机制

    在不同的平台上是不一样的,win平台只有select,Linux平台有select poll epoll

    • win: select

    • linux : select poll epoll

    通常是用户空间创建fd,然后copy到内核空间

    如果是开fd的数量多,select的的效率低

    基于select模块实现的IO多路复用

    import selectors
    import socket
    
    sock = socket.socket()
    sock.bind(("127.0.0.1", 8810))
    
    sock.listen(5)  # 这里虽然设置了最大连接数,但是已经没有限制了
    sel = selectors.DefaultSelector()  # 实例化一个对象,会根据不同的平台自动设置优先级
    # epoll|kqueue|devpoll > poll > select.  所以Linux系统会自动设置成epoll  win 自动设置成select
    
    # 第二步
    def read(conn, mask):
        # pass
        try:  # win 检测异常  当有异常 如客户端断开的时候 
            data = conn.recv(1024)
            print(data)
            print(data.decode("utf-8"))
            data2 = input(">>>")
            conn.send(data2.encode("utf-8"))
        except Exception:
            sel.unregister(conn)  # 解除注册
    
    
    # 第一步
    def accept(sock, mask):  # mask 是没有用的
        conn, addr = sock.accept()
        # print(conn)
        sel.register(conn, selectors.EVENT_READ, read)  # 把conn 添加到列表中
    
    
    # 首先要注册 只是把sock和 accept绑定
    sel.register(sock, selectors.EVENT_READ, accept)  # 注册,但是没有监听accept函数
    
    # 监听
    while 1:
        print("waiting...")
        # event 就是那个r
        events = sel.select()  # [(key,mask) ,(key,mask) ,(key,mask) ,)]  # 活活动的对象会自动添加到这里
        for key, mask in events:  # events 是个列表 需要遍历
            print(key.data)  # 拿到accept函数
            print(key.fileobj)  # 当前的活动的对象 sock 文件句柄
            func = key.data  # 调用
            obj = key.fileobj  #
            func(obj, mask)  # 第一个参数是sock 对象
            # break
    

    select缺点:

    • 每次调用slect都要将所有的fd拷贝到内核空间(每次都要拷贝),导致效率下降
    • 监听的的实现是通过遍历所有的fd,(遍历消耗的时间消耗多)判断是否有数据访问
    • 最大连接数(input中放的文件描述符数量1024)

    poll:

    最大连接数没有限制了,除此之外,和select一样,所以基本不用

    epoll:

    内部通过3个函数实现(select是一个)

    • 第一个函数:
      创建epoll句柄,把所有的fd拷贝到内核空间,只需要拷贝一次

    • 第二个函数: 回掉函数
      某一个函数或者动作成功完成后,会自动触发一个函数
      为所有的fd绑定一个回调函数,一旦有数据访问,触发改回调函数,回调函数把fd放到链表中。(只要有活动,把fd放到链表中,动态监听)这样就提高了效率
      例子:交试卷

    • 第三个函数,判断链表是否为空

    selectors.DefaultSelector()
    selectors会根据自己的平台选择最佳IO多路复用,自动选择。win只有select
    linux的中epoll中的优先级最高

    队列queue

    和线程有关系的,在多线程的时候有用,保证信息安全的

    队列是一种数据类型

    优点:
    保证线程安全,不用自己加锁

    put get

    先进先出

    import queue
    
    q = queue.Queue(3) # 默认是先进先出  FIFO  设置参数是最大的存放数量5
    
    q.put(111)
    q.put("hello")
    q.put(222)
    
    q.put(333,False)  # 默认blocking = True ,False 是当存满的时候,自动报错,解除阻塞的状态
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get()) # 第4次已经拿不到了 取不到 默认阻塞
    q.get(False) # 解除阻塞状态
    

    先进后出

    q = queue.LifoQueue()
    q.put(111)
    q.put(222)
    print(q.get())
    print(q.get())
    
    

    优先级

    q = queue.PriorityQueue()
    
    q.put([4,"hello4"])
    q.put([1,"hello1"])
    q.put([2,"hello2"])
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    

    join 与task_done方法

    import queue
    q = queue.Queue(5)
    
    q.put(111)
    q.put(222)
    
    q.get()
    q.task_done()  #
    
    q.get()
    q.task_done()  #
    
    q.join()  # 等待task_done  和events是一个原理
    print('endnig')
    

    join 与task_done方法必须配合使用

    其他的用法

    q.qsize() 返回队列的大小
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)非阻塞 
    q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    q.join() 实际上意味着等到队列为空,再执行别的操作
    
    

    生产者消费者模型

    有生产数据的线程
    有消费数据的线程

    通过一个容器来解决生产者消费者强耦合的问题
    这个容器是用来解耦的(类似吃饭的时候的服务员)
    目录结构也是一种解耦

    下面是用队列模拟实现

    import time,random
    import queue,threading
    
    q = queue.Queue()
    
    def Producer(name):
      count = 0
      while count <10:
        print("making........")
        time.sleep(random.randrange(3))
        q.put(count)
        print('Producer %s has produced %s baozi..' %(name, count))
        count +=1
        #q.task_done()
        #q.join()
        print("ok......")
    def Consumer(name):
      count = 0
      while count <10:
        time.sleep(random.randrange(4))
        if not q.empty():
            data = q.get()
            #q.task_done()
            #q.join()
            print(data)
            print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
        else:
            print("-----no baozi anymore----")
        count +=1
    
    p1 = threading.Thread(target=Producer, args=('A',))
    c1 = threading.Thread(target=Consumer, args=('B',))
    # c2 = threading.Thread(target=Consumer, args=('C',))
    # c3 = threading.Thread(target=Consumer, args=('D',))
    p1.start()
    c1.start()
    # c2.start()
    # c3.start()
    
  • 相关阅读:
    P1121 环状最大两段子段和
    无题
    cdoj 1485 柱爷搞子串 sam treap
    自然数幂和
    Gym 100341C AVL Trees NTT
    线性筛分解质因子
    codeforces 366 Ant Man dp
    UVALive 6914 Maze Mayhem 轮廓线dp
    hdu 5790 Prefix 字典树 主席树
    莫比乌斯反演个人小结
  • 原文地址:https://www.cnblogs.com/Python666/p/6841550.html
Copyright © 2011-2022 走看看