在RabbitMQ中消费者有2种方式获取队列中的消息:
a) 一种是通过basic.consume命令,订阅某一个队列中的消息,channel会自动在处理完上一条消息之后,接收下一条消息。(同一个channel消息处理是串行的)。除非关闭channel或者取消订阅,否则客户端将会一直接收队列的消息。
b) 另外一种方式是通过basic.get命令主动获取队列中的消息,但是绝对不可以通过循环调用basic.get来代替basic.consume,这是因为basic.get RabbitMQ在实际执行的时候,是首先consume某一个队列,然后检索第一条消息,然后再取消订阅。如果是高吞吐率的消费者,最好还是建议使用basic.consume。
简单总结一下就是说:
consume是只要队列里面还有消息就一直取。
get是只取了队列里面的第一条消息。
因为get开销大,如果需要从一个队列取消息的话,首选consume方式,慎用循环get方式。
转载于:https://www.cnblogs.com/SupPilot/p/10218377.html
def remove(self, item) -> list:
"""Removes the item from the queue. Goes through the entire queue, similar to view_queue, and acknowledges
the msg in the list that matches, and returns the msg.
If item matches more than one message on the queue, only one is deleted
"""
if isinstance(item, list):
if not (isinstance(i, bytes) for i in item):
print("Item must be a list of only byte objects")
if not isinstance(item, bytes):
print("Item must be a singe bytes object")
raise TypeError
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host))
msgs = []
while True:
chl = connection.channel()
method_frame, header_frame, body = chl.basic_get(queue='test')
if method_frame:
print('body: ', body)
if body == item:
print("item found!")
msgs.append(body)
chl.basic_ack(method_frame.delivery_tag)
connection.close()
return msgs
else:
print("Message not found")
connection.close()
break
return msgs