zoukankan      html  css  js  c++  java
  • Python Select模型

    IO多路复用

    IO多路复用就是我们经常说的select epoll.select和epoll的好处是单个process就可以同时处理多个网络IO。基本原理是selectepoll会不断的轮询所负责的所有socket,当有某个socket数据到达了,就通知用户进程。
    下面是流程图:

      当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
    

    注意1:select函数返回结果中如果有文件可读了,那么进程就可以通过调用accept()或recv()来让kernel将位于内核中准备到的数据copy到用户区。

    注意2: select的优势在于可以处理多个连接,不适用于单个连接

    selectors

    基于select模块实现的IO多路复用

    `

    IO多路复用实现机制

    在不同的平台上是不一样的,win平台只有select,Linux平台有select poll epoll

    • win: select

    • linux : select poll epoll

    通常是用户空间创建fd,然后copy到内核空间

    如果是开fd的数量多,select的的效率低

    基于select模块实现的IO多路复用

    import selectors
    import socket
    
    sock = socket.socket()
    sock.bind(("127.0.0.1", 8810))
    
    sock.listen(5)  # 这里虽然设置了最大连接数,但是已经没有限制了
    sel = selectors.DefaultSelector()  # 实例化一个对象,会根据不同的平台自动设置优先级
    # epoll|kqueue|devpoll > poll > select.  所以Linux系统会自动设置成epoll  win 自动设置成select
    
    # 第二步
    def read(conn, mask):
        # pass
        try:  # win 检测异常  当有异常 如客户端断开的时候 
            data = conn.recv(1024)
            print(data)
            print(data.decode("utf-8"))
            data2 = input(">>>")
            conn.send(data2.encode("utf-8"))
        except Exception:
            sel.unregister(conn)  # 解除注册
    
    
    # 第一步
    def accept(sock, mask):  # mask 是没有用的
        conn, addr = sock.accept()
        # print(conn)
        sel.register(conn, selectors.EVENT_READ, read)  # 把conn 添加到列表中
    
    
    # 首先要注册 只是把sock和 accept绑定
    sel.register(sock, selectors.EVENT_READ, accept)  # 注册,但是没有监听accept函数
    
    # 监听
    while 1:
        print("waiting...")
        # event 就是那个r
        events = sel.select()  # [(key,mask) ,(key,mask) ,(key,mask) ,)]  # 活活动的对象会自动添加到这里
        for key, mask in events:  # events 是个列表 需要遍历
            print(key.data)  # 拿到accept函数
            print(key.fileobj)  # 当前的活动的对象 sock 文件句柄
            func = key.data  # 调用
            obj = key.fileobj  #
            func(obj, mask)  # 第一个参数是sock 对象
            # break
    

    select缺点:

    • 每次调用slect都要将所有的fd拷贝到内核空间(每次都要拷贝),导致效率下降
    • 监听的的实现是通过遍历所有的fd,(遍历消耗的时间消耗多)判断是否有数据访问
    • 最大连接数(input中放的文件描述符数量1024)

    poll:

    最大连接数没有限制了,除此之外,和select一样,所以基本不用

    epoll:

    内部通过3个函数实现(select是一个)

    • 第一个函数:
      创建epoll句柄,把所有的fd拷贝到内核空间,只需要拷贝一次

    • 第二个函数: 回掉函数
      某一个函数或者动作成功完成后,会自动触发一个函数
      为所有的fd绑定一个回调函数,一旦有数据访问,触发改回调函数,回调函数把fd放到链表中。(只要有活动,把fd放到链表中,动态监听)这样就提高了效率
      例子:交试卷

    • 第三个函数,判断链表是否为空

    selectors.DefaultSelector()
    selectors会根据自己的平台选择最佳IO多路复用,自动选择。win只有select
    linux的中epoll中的优先级最高

    队列queue

    和线程有关系的,在多线程的时候有用,保证信息安全的

    队列是一种数据类型

    优点:
    保证线程安全,不用自己加锁

    put get

    先进先出

    import queue
    
    q = queue.Queue(3) # 默认是先进先出  FIFO  设置参数是最大的存放数量5
    
    q.put(111)
    q.put("hello")
    q.put(222)
    
    q.put(333,False)  # 默认blocking = True ,False 是当存满的时候,自动报错,解除阻塞的状态
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get()) # 第4次已经拿不到了 取不到 默认阻塞
    q.get(False) # 解除阻塞状态
    

    先进后出

    q = queue.LifoQueue()
    q.put(111)
    q.put(222)
    print(q.get())
    print(q.get())
    
    

    优先级

    q = queue.PriorityQueue()
    
    q.put([4,"hello4"])
    q.put([1,"hello1"])
    q.put([2,"hello2"])
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    

    join 与task_done方法

    import queue
    q = queue.Queue(5)
    
    q.put(111)
    q.put(222)
    
    q.get()
    q.task_done()  #
    
    q.get()
    q.task_done()  #
    
    q.join()  # 等待task_done  和events是一个原理
    print('endnig')
    

    join 与task_done方法必须配合使用

    其他的用法

    q.qsize() 返回队列的大小
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)非阻塞 
    q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    q.join() 实际上意味着等到队列为空,再执行别的操作
    
    

    生产者消费者模型

    有生产数据的线程
    有消费数据的线程

    通过一个容器来解决生产者消费者强耦合的问题
    这个容器是用来解耦的(类似吃饭的时候的服务员)
    目录结构也是一种解耦

    下面是用队列模拟实现

    import time,random
    import queue,threading
    
    q = queue.Queue()
    
    def Producer(name):
      count = 0
      while count <10:
        print("making........")
        time.sleep(random.randrange(3))
        q.put(count)
        print('Producer %s has produced %s baozi..' %(name, count))
        count +=1
        #q.task_done()
        #q.join()
        print("ok......")
    def Consumer(name):
      count = 0
      while count <10:
        time.sleep(random.randrange(4))
        if not q.empty():
            data = q.get()
            #q.task_done()
            #q.join()
            print(data)
            print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
        else:
            print("-----no baozi anymore----")
        count +=1
    
    p1 = threading.Thread(target=Producer, args=('A',))
    c1 = threading.Thread(target=Consumer, args=('B',))
    # c2 = threading.Thread(target=Consumer, args=('C',))
    # c3 = threading.Thread(target=Consumer, args=('D',))
    p1.start()
    c1.start()
    # c2.start()
    # c3.start()
    
  • 相关阅读:
    juypter4.4.0 自动补全
    Pytorch安装教程(Windows)
    编写你的第一个油猴脚本
    油猴(Tampermonkey)安装教程
    解决Linux系统下每次打开终端自动进入base环境
    Pytorch安装教程(Linux)
    Miniconda软件安装教程(Linux)
    解决vscode中使用pytorch时pylint报错Module torch has no xxx member
    解决pytorch报错ImportError: numpy.core.multiarray failed to import
    Miniconda软件安装教程(Windows)
  • 原文地址:https://www.cnblogs.com/Python666/p/6841550.html
Copyright © 2011-2022 走看看