八周五次课(12月15日)
16.1 多进程的消息队列
16.2 消息队列pipe
1. 消息队列
消息队列” 是在消息的传输过程中保存消息的容器。
消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不通的进程。生产者往管道中写消息,消费者从管道中读消息。
操作系统提供了很多机制来实现进程间的通信 ,multiprocessing 模块就提供了 Queue 和 Pipe 两种方法来实现。
通过 Mutiprocess 里面的 Pipe 来实现消息队列:
例 1:
from multiprocessing import Queue, Process
def write(q): ##向队列里循环写入列表的数据
for i in ['a', 'b', 'c', 'd']:
q.put(i)
print 'put {0} to queue.'.format(i)
def read(q): ##循环去队列里面的数据
while 1:
result = q.get()
print 'get {0} from queue.'.format(result)
def main():
q = Queue() #创建队列对象
pw = Process(target=write, args=(q,)) #创建写进程
pr = Process(target=read, args=(q,)) #创建读进程
pw.start()
pr.start()
pw.join()
pr.terminate() # pr进程里是死循环,无法等待其结束,只能强行终止;terminate方法实现
if __name__ == '__main__':
main()
#结果:
put a to queue.
put b to queue.
put c to queue.
put d to queue.
2. 消息队列 PIPE
- Pipe 方法返回 (conn1,conn2) 代表一个管道的两个端。Pipe 方法有 duplex 参数,如果 duplex 参数为 True(默认值),那么这个管道是全双工模式,也就是说 conn1 和 conn2 均可收发。duplex 为 False,conn1 只负责接受消息,conn2 只负责发送消息。
- send 和 recv 方法分别是发送和接受消息的方法。close 方法表示关闭管道,当消息接受结束以后,关闭管道。
例 2:
from multiprocessing import Pipe, Process
import time
def proc1(pipe):
for i in xrange(1, 10):
pipe.send(i)
print 'send {0} to pipe'.format(i)
time.sleep(1)
def proc2(pipe):
n = 9
while n > 0:
result = pipe.recv()
print 'recv {0} from pipe'.format(result)
time.sleep(1)
n -= 1
def main():
pipe = Pipe(duplex=False) #一边负责接收,一边只负责发送
print type(pipe)
p1 = Process(target=proc1, args=(pipe[1],))
p2 = Process(target=proc2, args=(pipe[0],))
p1.start()
p2.start()
p1.join()
p2.join()
pipe[0].close()
pipe[1].close()
if __name__ == '__main__':
main()
#结果:
<type 'tuple'>
send 1 to pipe
recv 1 from pipe
send 2 to pipe
recv 2 from pipe
send 3 to pipe
recv 3 from pipe
send 4 to pipe
recv 4 from pipe
send 5 to pipe
recv 5 from pipe
send 6 to pipe
recv 6 from pipe
send 7 to pipe
recv 7 from pipe
send 8 to pipe
recv 8 from pipe
send 9 to pipe
recv 9 from pipe