封装一个task到一个message,并发送到queue。consumer会去除task并执行这个task。
这里我们简化了操作,发送消息到队列中,consumer取出消息计算里面'.'号有几个就sleep几秒。
task.py
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
work.py
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
代码解释
channel.queue_declare(queue='task_queue', durable=True) 告诉rabbitmq永不丢失queue,即使rabbitmq server挂掉。
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello') 在先前例子中,如果consumer突然死掉,回丢失掉正在处理的信息。如何避免呢?如果consumer死掉,怎么将这条信息发送的其他consumer呢?就是使用上面代码
channel.basic_qos(prefetch_count=1) This tells RabbitMQ not to give more than one message to a worker at a time.
Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one.
Instead, it will dispatch it to the next worker that is not still busy. 直到消费完这个消息再给consumer派送新的消息,如果没消费完,将消息发送给另一个consumer.