消息队列”是在消息的传输过程中保存消息的容器。
消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不通的进程。生产者往管道中写消息,消费者从管道中读消息。
操作系统提供了很多机制来实现进程间的通信 ,multiprocessing模块就提供了Queue和Pipe两种方法来实现。
使用multiprocessing里面的Queue来实现消息队列
from multiprocessing import Queue, Process #写进程 def write(q): for i in ["a","b","c","d"]: q.put(i) print("put {0} to queue".format(i)) #读进程 def read(q): while 1: result = q.get() print("get {0} from queue".format(result))
#主函数
def main():
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write,args=(q,))
pr = Process(target=read,args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pe,读入:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
if __name__ == "__main__":
main()
结果
put a to queue
get a from queue
put b to queue
get b from queue
put c to queue
get c from queue
put d to queue
get d from queue
通过Mutiprocess里面的Pipe来实现消息队列:
1, Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
2, send和recv方法分别是发送和接受消息的方法。close方法表示关闭管道,当消息接受结束以后,关闭管道。
from multiprocessing import Pipe,Process import time def proc1(pipe): for i in xrange(1,10): pipe.send(i) print("send {0} to pipe".format(i)) time.sleep(1) def proc2(pipe): n = 9 while n >0: result = pipe.recv() print("recv {0} from pipe".format(result)) n -= 1 def main(): pipe = Pipe(duplex=False) print(type(pipe)) p1 = Process(target=proc1,args=(pipe[1],)) p2 = Process(target=proc2,args=(pipe[0],)) p1.start() p2.start() p1.join() p2.join() pipe[0].close() pipe[1].close() if __name__ == '__main__': main() 输出结果: <type 'tuple'> send 1 to pipe recv 1 from pipe recv 2 from pipe send 2 to pipe send 3 to pipe recv 3 from pipe send 4 to pipe recv 4 from pipe send 5 to piperecv 5 from pipe recv 6 from pipe send 6 to pipe recv 7 from pipe send 7 to pipe recv 8 from pipe send 8 to pipe recv 9 from pipe send 9 to pipe