Python/selectors模块及队列
selectors模块是可以实现IO多路复用机制:
它具有根据平台选出最佳的IO多路机制,比如在win的系统上他默认的是select模式而在linux上它默认的epoll。
1
2
3
|
常用共分为三种: select、poll、epoll |
select的缺点:
1、每次调用都要将所有的文件描述符(fd)拷贝的内核空间,导致效率下降
2、遍历所有的文件描述符(fd)查看是否有数据访问
3、最大链接数限额(1024)
poll:
它就是select和epoll的过渡阶段,它没有最大链接数的限额
epoll:
1、第一个函数是创建一个epoll句柄,将所有的描述符(fd)拷贝到内核空间,但只拷贝一次。
2、回调函数,某一个函数或某一个动作成功完成之后会触发的函数为所有的描述符(fd)绑定一个回调函数,一旦有数据访问就是触发该回调函数,回调函数将(fd)放到链表中
3、函数判断链表是否为空
4、最大启动项没有限额
selsect实例:
1 服务端 2 import selectors #基于select模块实现的IO多路复用,建议大家使用 3 import socket 4 sock=socket.socket() 5 sock.bind(('127.0.0.1',8800)) 6 sock.listen(5) 7 sock.setblocking(False) 8 sel=selectors.DefaultSelector() #根据平台选择最佳的IO多路机制,比如linux就会选择epoll 9 10 def read(conn,mask): 11 try: 12 data=conn.recv(1024) 13 print(data.decode('utf8')) 14 data2=input('>>>>') 15 conn.send(data2.encode('utf8')) 16 except Exception: 17 sel.unregister(conn) 18 19 def accept(sock,mask): 20 conn,addr=sock.accept() 21 print('-------',conn) 22 sel.register(conn,selectors.EVENT_READ,read) 23 sel.register(sock, selectors.EVENT_READ, accept) #注册功能 24 while True: 25 print('wating....') 26 events=sel.select() #[(sock),(),()] 监听 27 28 for key,mask in events: 29 # print(key.data) #accept 找出有活动的绑定函数 30 # print(key.fileobj) #sock 找出有活动的文件描述符 31 32 func=key.data 33 obj=key.fileobj 34 35 func(obj,mask) #1 accept(sock,mask) 2read(conn,mask) 36 ------------------------------------------------------------------------------ 37 客户端 38 import socket 39 tin=socket.socket() 40 tin.connect(('127.0.0.1',8800)) 41 while True: 42 inp=input('>>>>') 43 tin.send(inp.encode('utf8')) 44 data=tin.recv(1024) 45 print(data.decode('utf8'))
队列:
1
2
3
4
5
|
队列分为(先进先出、后进先出) 队列是一个数据类型,可以进行数据储存功能 队列可以实现耦合的效果,比如有个人A把包子放到锅里,在有个人B把包子从锅里拿出来。现在的锅就是队列的效果。 |
实例如下:
1 import queue 2 q=queue.Queue() #默认的先进先出 3 q.put(111) #往管道里放一个值 4 q.put(222) #往管道里放一个值 5 q.put(333) #往管道里放一个值 6 7 print(q.get()) #从管道里拿一个值 8 print(q.get()) #从管道里拿一个值 9 print(q.get()) #从管道里拿一个值 10 ----------------------------------------------- 11 运行结果 12 111 13 222 14 333
join和tast_done
1 import queue 2 q=queue.Queue() #默认的先进先出 3 q.put(111) #往管道里放一个值 4 q.task_done() #解除阻塞 5 q.put(222) #往管道里放一个值 6 q.task_done() #解除阻塞 7 q.put(333) #往管道里放一个值 8 q.task_done() #解除阻塞 9 q.join() #队列阻塞功能 10 print(q.get()) #从管道里拿一个值 11 print(q.get()) #从管道里拿一个值 12 print(q.get()) #从管道里拿一个值
join和tast_done 功能
如果有俩个put功能,在get的前边有join功能的话,在俩个put的后边要进行一个task_done才能执行。
生产者消费者模型:
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。
1 import time,random 2 import queue,threading 3 4 q = queue.Queue() 5 6 def Producer(name): 7 count = 0 8 while count <10: 9 print("making........") 10 time.sleep(random.randrange(3)) 11 q.put(count) 12 print('Producer %s has produced %s baozi..' %(name, count)) 13 count +=1 14 #q.task_done() 15 #q.join() 16 print("ok......") 17 def Consumer(name): 18 count = 0 19 while count <10: 20 time.sleep(random.randrange(4)) 21 if not q.empty(): 22 data = q.get() 23 #q.task_done() 24 #q.join() 25 print(data) 26 print(' 33[32;1mConsumer %s has eat %s baozi... 33[0m' %(name, data)) 27 else: 28 print("-----no baozi anymore----") 29 count +=1 30 31 p1 = threading.Thread(target=Producer, args=('A',)) 32 c1 = threading.Thread(target=Consumer, args=('B',)) 33 # c2 = threading.Thread(target=Consumer, args=('C',)) 34 # c3 = threading.Thread(target=Consumer, args=('D',)) 35 p1.start() 36 c1.start() 37 # c2.start() 38 # c3.start()
这个生产消费者就是通过列表的形式把输入和获取进行耦合操作!