1. 生产者
#coding:utf8 import pika import json import sys message = ''.join(sys.argv[1:]) or "hello word" connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue',durable = True)# 消息持久化durable = True channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2,#make message persistent )) print " [x] Sent 'Hello World!'" connection.close()
2.消费者
# coding:utf8 import pika import MySQLdb import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue',durable = True) def callback(ch, method, properties, body): print("[x] received %r" % body) time.sleep(20) print("done") ch.basic_ack(delivery_tag=method.delivery_tag) # ack 消息成功确认 channel.basic_qos(prefetch_count=1)#公平分派 channel.basic_consume(callback, queue='task_queue') print("waiting for messages") channel.start_consuming()
3.注意:
被遗忘的确认
错过basic_ack是一个常见的错误。这是一个简单的错误,但后果是严重的。当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经处理的消息。
为了调试这种错误,您可以使用rabbitmqctl 来打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,删除sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
3.2
有关消息持久性的注释
将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且尚未保存消息时,仍然有一个短时间窗口。此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。持久性保证不强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,那么您可以使用 发布者确认。