消息队列”是在消息的传输过程中保存消息的容器。
消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不通的进程。生产者往管道中写消息,消费者从管道中读消息。
操作系统提供了很多机制来实现进程间的通信 ,multiprocessing模块就提供了Queue和Pipe两种方法来实现。
使用multiprocessing里面的Queue来实现消息队列
from multiprocessing import Queue
q = Queue
q.put(data)
data = q.get(data)
通过Mutiprocess里面的Pipe来实现消息队列:
1, Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
2, send和recv方法分别是发送和接受消息的方法。close方法表示关闭管道,当消息接受结束以后,关闭管道。
Python提供了Queue模块来专门实现消息队列
Queue对象
Queue对象实现一个fifo队列(其他的还有lifo、priority队列,这里不再介绍)。queue只有maxsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:
Queue.qsize():返回消息队列的当前空间。返回的值不一定可靠。
Queue.empty():判断消息队列是否为空,返回True或False。同样不可靠。
Queue.full():类似上边,判断消息队列是否满
Queue.put(item, block=True, timeout=None):往消息队列中存放消息。block可以控制是否阻塞,timeout指定阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。
Queue.put_nowait(item):相当于put(item, False).
Queue.get(block=True, timeout=None):获取一个消息,其他同put。
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
'''写一个消费者和生产者,为了练习多线程,我们用多线程的方式来实现,并通过类的重写的方法来实现''' from Queue import Queue from threading import Thread import time class Proceduer(Thread): def __init__(self, queue): super(Proceduer, self).__init__() self.queue = queue def run(self): try: for i in xrange(1, 10): print("put data is: {0} to queue".format(i)) self.queue.put(i) except Exception as e: print("put data error!") raise e class Consumer_odd(Thread): def __init__(self, queue): super(Consumer_odd, self).__init__() self.queue = queue def run(self): try: while not self.queue.empty(): number = self.queue.get(block=True, timeout=3) if number%2 != 0: print("get {0} from queue ODD".format(number)) else: self.queue.put(number) time.sleep(1) except Exception as e: raise e class Consumer_even(Thread): def __init__(self, queue): super(Consumer_even, self).__init__() self.queue = queue def run(self): # print(self.queue.empty) while not self.queue.empty(): try: number = self.queue.get(block=True, timeout=3) if number%2 == 0: print("get {0} from queue Even, thread name is: {1}".format(number, self.getName())) else: self.queue.put(number) time.sleep(1) except Exception as e: raise e def main(): queue = Queue() p = Proceduer(queue=queue) p.start() p.join() time.sleep(1) c1 = Consumer_odd(queue=queue) c2 = Consumer_even(queue=queue) c1.start() c2.start() c1.join() c2.join() print("All threads terminate!") if __name__ == '__main__': main()