1.队列的使用:
队列引用的前提: 多个进程对同一块共享数据的修改:要从硬盘读文件,慢,还要考虑上锁:
所以就出现了 队列 和 管道 都在内存中(快); 队列 = 管道 + 上锁
用队列的目的:
进程间通信(IPC),队列可以放任意类型的数据,应该放小东西,
q = Queue(3)
get put full empty
队列作用:
多个进程之间通信使用的,一个进程将数据放到队列里面,另外一个进程从队列里面取走数据,干的是进程之间通信的活
1 from multiprocessing import Queue
2
3 q = Queue(3)
4 q.put('hello')
5 q.put({'a':1})
6 q.put([3,3,3])
7
8 print(q.full()) # 查看队列是否满了
9 # q.put(2) # 这里会卡住,直到队列中被取走一个
10
11 print(q.get())
12 print(q.get())
13 q.put(2)
14 print(q.get())
15 print(q.get())
16 print(q.empty()) # 查看队列是否为空
17 print(q.get()) # 取完数据后,再取,就卡住了
2.生产者消费者模型:
生产者:
生产者指的是生产数据的任务
消费者:
消费者指的是处理数据的任务
生产者与消费者模型:
生产者与消费者之间引入一个容器(队列):
生产者《---》队列《---》消费者
好处:程序解开耦合,生产者与消费者不直接通信
平衡了生产者与消费者的速度差
生产者:一个进程
消费者:一个进程
进程间通信:队列(IPC)
如果生产者,消费者,队列组件都在一台机器上:
集中式:稳定性差,性能问题差
分布在多台机器上:
Rabbitmq 用它来实现生产者,消费者模型
1 from multiprocessing import Process,Queue
2 import time
3
4 def producer(q):
5 for i in range(10):
6 res = '包子%s'%i
7 time.sleep(0.5)
8 print('生产者生产了%s'%res)
9
10 q.put(res)
11
12 def consumer(q):
13 while True:
14 res = q.get()
15 if not res:break
16 time.sleep(1)
17 print('消费者吃了%s'%res)
18
19 if __name__ == "__main__":
20 # 容器
21 q = Queue()
22
23 # 生产者们
24 p1 = Process(target=producer,args=(q,))
25 p2 = Process(target=producer, args=(q,))
26 p3 = Process(target=producer, args=(q,))
27
28 # 消费者们
29 c1 = Process(target=consumer,args=(q,))
30 c2 = Process(target=consumer,args=(q,))
31
32 p1.start()
33 p2.start()
34 p3.start()
35 c1.start()
36 c2.start()
37
38 p1.join()
39 p2.join()
40 p3.join()
41 q.put(None) # 两个消费者,所以放两个None
42 q.put(None)
43
44 print('主')
# 生产者消费者模型,因为消费者一直在等待消费,会卡在那里,
# 首先应该确保生产者已经完全生产完,所以需要使用join方法
# 然后为了确保消费者知道已经接收完数据,所以需要将在列表后面加入None数据,有几个消费者就传入几个None
3.JoinableQueue:
q = JoinableQueue()
q.join()
q.task_done()
1 from multiprocessing import Process,Queue,JoinableQueue
2 import time
3
4 def producer(q):
5 for i in range(2):
6 res = '包子%s'%i
7 time.sleep(0.5)
8 print('生产者生产了%s'%res)
9
10 q.put(res)
11 q.join() # 等待队列为空
12
13 def consumer(q):
14 while True:
15 res = q.get()
16 if not res:break
17 time.sleep(1)
18 print('消费者吃了%s'%res)
19 q.task_done() # 消费者发信号,任务结束
20
21 if __name__ == "__main__":
22 # 容器
23 q = JoinableQueue()
24
25 # 生产者们
26 p1 = Process(target=producer,args=(q,))
27 p2 = Process(target=producer, args=(q,))
28 p3 = Process(target=producer, args=(q,))
29
30 # 消费者们
31 c1 = Process(target=consumer,args=(q,))
32 c2 = Process(target=consumer,args=(q,))
33 c1.daemon = True # 消费者没有存在的必要,设为守护进程
34 c2.daemon = True
35
36 p1.start()
37 p2.start()
38 p3.start()
39 c1.start()
40 c2.start()
41
42 p1.join()
43 p2.join()
44 p3.join()
45
46
47 print('主')