multiprocessing模块提供了Queue类 和 Pipe函数 实现消息队列
1. Queue
In [1]: import multiprocessing In [2]: help(multiprocessing.Queue) Help on function Queue in module multiprocessing: Queue(maxsize=0) Returns a queue object In [3]: q = multiprocessing.Queue() //实例化一个对象,对象的方法的用法和Queue模块中对象的方法的用法一毛一样 In [4]: q. q.cancel_join_thread q.empty q.get q.join_thread q.put_nowait q.close q.full q.get_nowait q.put q.qsize
1 [root@web thread_process]# cat queue4.py 2 #!/usr/bin/env python 3 4 from multiprocessing import Process, Queue 5 def producer(q): 6 for i in xrange(5): 7 q.put(i) 8 print 'put {0} into queue'.format(i) 9 10 def consumer(q): 11 while 1: 12 result = q.get() 13 print 'get {0} from queue'.format(result) 14 if q.empty(): 15 break 16 17 18 if __name__ == '__main__': 19 q = Queue() 20 p = Process(target=producer, args=(q,)) 21 c = Process(target=consumer, args=(q,)) 22 p.start() 23 p.join() 24 c.start() 25 26 27 [root@web thread_process]# python queue4.py 28 put 0 into queue 29 put 1 into queue 30 put 2 into queue 31 put 3 into queue 32 put 4 into queue 33 get 0 from queue 34 get 1 from queue 35 get 2 from queue 36 get 3 from queue 37 get 4 from queue
2. Pipe
Pipe方法返回一个二元元组(conn1, conn2),两个元素分别是两个连接对象,代表管道的两端,Pipe(duplex=True) 函数有一个默认参数duplex,默认等于True,表示这个管道是全双工模式,也就是说conn1和conn2均可收发;如果duplex=False,那么conn2只负责发消息到消息队列,conn1只负责从消息队列中读取消息
- send() ---> 发送消息到管道
- recv() ---> 从管道中读取消息
- close() --->关闭管道
duplex=False 例子:
1 [root@web thread_process]# cat pipe.py 2 #!/usr/bin/env python 3 4 import time 5 from multiprocessing import Pipe, Process 6 7 def producer(p): 8 for i in xrange(5): 9 p.send(i) 10 print 'send {0} to pipe'.format(i) 11 time.sleep(1) 12 13 def consumer(p): 14 n = 5 15 while n>0: 16 result = p.recv() 17 print 'recv {0} from pipe'.format(result) 18 n -= 1 19 20 if __name__ == '__main__': 21 p = Pipe(duplex=False) 22 print p 23 p1 = Process(target=producer, args=(p[1],)) 24 p2 = Process(target=consumer, args=(p[0],)) 25 p1.start() 26 p2.start() 27 p1.join() 28 p2.join() 29 p[0].close() 30 p[1].close() 31 32 33 [root@web thread_process]# python pipe.py 34 (<read-only Connection, handle 3>, <write-only Connection, handle 4>) 35 send 0 to pipe 36 recv 0 from pipe 37 send 1 to pipe 38 recv 1 from pipe 39 send 2 to pipe 40 recv 2 from pipe 41 send 3 to pipe 42 recv 3 from pipe 43 send 4 to pipe 44 recv 4 from pipe
1 [root@web thread_process]# cat pipe1.py 2 #!/usr/bin/env python 3 4 import time 5 from multiprocessing import Pipe, Process 6 7 def producer(p): 8 for i in xrange(5): 9 p.se 10 print 'send {0} to pipe'.format(i) 11 time.sleep(1) 12 13 if __name__ == '__main__': 14 p = Pipe(duplex=True) 15 print p 16 p1 = Process(target=producer, args=(p[1],)) 17 p2 = Process(target=producer, args=(p[0],)) 18 p1.start() 19 p2.start() 20 p[0].close() 21 p[1].close() 22 23 24 [root@web thread_process]# python pipe1.py 25 (<read-write Connection, handle 5>, <read-write Connection, handle 6>) 26 send 0 to pipe 27 send 0 to pipe 28 send 1 to pipe 29 send 1 to pipe 30 send 2 to pipe 31 send 2 to pipe 32 send 3 to pipe 33 send 3 to pipe 34 send 4 to pipe 35 send 4 to pipe