消息队列指的是消息在传输过程中保存消息的容器
消息队列最经典的用法是消费者和生产者之间通过消息管道来传递消息。消费者和和生产者是不同的进程,生产者往管道中写消息,消费者从管道中读消息
multiprocessing模块提供了Queue类 和 Pipe函数 实现消息队列
1. Queue
用法:
In [1]: import multiprocessing In [2]: help(multiprocessing.Queue) Help on function Queue in module multiprocessing: Queue(maxsize=0) Returns a queue object In [3]: q = multiprocessing.Queue() //实例化一个对象,对象的方法的用法和Queue模块中对象的方法的用法一毛一样 In [4]: q. q.cancel_join_thread q.empty q.get q.join_thread q.put_nowait q.close q.full q.get_nowait q.put q.qsize
例子:
1 [root@web thread_process]# cat queue4.py 2 #!/usr/bin/env python 3 4 from multiprocessing import Process, Queue 5 def producer(q): 6 for i in xrange(5): 7 q.put(i) 8 print 'put {0} into queue'.format(i) 9 10 def consumer(q): 11 while 1: 12 result = q.get() 13 print 'get {0} from queue'.format(result) 14 if q.empty(): 15 break 16 17 18 if __name__ == '__main__': 19 q = Queue() 20 p = Process(target=producer, args=(q,)) 21 c = Process(target=consumer, args=(q,)) 22 p.start() 23 p.join() 24 c.start() 25 26 27 [root@web thread_process]# python queue4.py 28 put 0 into queue 29 put 1 into queue 30 put 2 into queue 31 put 3 into queue 32 put 4 into queue 33 get 0 from queue 34 get 1 from queue 35 get 2 from queue 36 get 3 from queue 37 get 4 from queue
2. Pipe
Pipe方法返回一个二元元组(conn1, conn2),两个元素分别是两个连接对象,代表管道的两端,Pipe(duplex=True) 函数有一个默认参数duplex,默认等于True,表示这个管道是全双工模式,也就是说conn1和conn2均可收发;如果duplex=False,那么conn2只负责发消息到消息队列,conn1只负责从消息队列中读取消息
连接对象的常用方法有三个:
- send() ---> 发送消息到管道
- recv() ---> 从管道中读取消息
- close() --->关闭管道
duplex=False 例子:
1 [root@web thread_process]# cat pipe.py 2 #!/usr/bin/env python 3 4 import time 5 from multiprocessing import Pipe, Process 6 7 def producer(p): 8 for i in xrange(5): 9 p.send(i) 10 print 'send {0} to pipe'.format(i) 11 time.sleep(1) 12 13 def consumer(p): 14 n = 5 15 while n>0: 16 result = p.recv() 17 print 'recv {0} from pipe'.format(result) 18 n -= 1 19 20 if __name__ == '__main__': 21 p = Pipe(duplex=False) 22 print p 23 p1 = Process(target=producer, args=(p[1],)) 24 p2 = Process(target=consumer, args=(p[0],)) 25 p1.start() 26 p2.start() 27 p1.join() 28 p2.join() 29 p[0].close() 30 p[1].close() 31 32 33 [root@web thread_process]# python pipe.py 34 (<read-only Connection, handle 3>, <write-only Connection, handle 4>) 35 send 0 to pipe 36 recv 0 from pipe 37 send 1 to pipe 38 recv 1 from pipe 39 send 2 to pipe 40 recv 2 from pipe 41 send 3 to pipe 42 recv 3 from pipe 43 send 4 to pipe 44 recv 4 from pipe
duplex=True例子:
1 [root@web thread_process]# cat pipe1.py 2 #!/usr/bin/env python 3 4 import time 5 from multiprocessing import Pipe, Process 6 7 def producer(p): 8 for i in xrange(5): 9 p.se 10 print 'send {0} to pipe'.format(i) 11 time.sleep(1) 12 13 if __name__ == '__main__': 14 p = Pipe(duplex=True) 15 print p 16 p1 = Process(target=producer, args=(p[1],)) 17 p2 = Process(target=producer, args=(p[0],)) 18 p1.start() 19 p2.start() 20 p[0].close() 21 p[1].close() 22 23 24 [root@web thread_process]# python pipe1.py 25 (<read-write Connection, handle 5>, <read-write Connection, handle 6>) 26 send 0 to pipe 27 send 0 to pipe 28 send 1 to pipe 29 send 1 to pipe 30 send 2 to pipe 31 send 2 to pipe 32 send 3 to pipe 33 send 3 to pipe 34 send 4 to pipe 35 send 4 to pipe