概述
工作队列(任务队列)的主要思想就是避免立即执行资源密集型任务,而不得不等待它去完成,相反的我们安排任务在以后完成这个任务,我们只需要将消息发送到队列中,在后台运行的工作进程弹出任务最终执行这个任务。
循环调度
使用任务队列的优点就是能够轻松并行化工作(消费这个任务)
news_task.py
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import pika 5 import sys 6 7 credentials = pika.PlainCredentials('admin', 'admin123456') 8 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials)) 9 channel = connection.channel() 10 channel.queue_declare(queue='hello') 11 message = ' '.join(sys.argv[1:]) or "Hello World!" 12 channel.basic_publish(exchange='', 13 routing_key='hello', 14 body=message) 15 16 print('[x]发送%s' % message) 17 connection.close()
worker.py
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import pika 5 import time 6 7 credentials = pika.PlainCredentials('admin', 'admin123456') 8 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials)) 9 channel = connection.channel() 10 channel.queue_declare('hello') 11 12 13 def callback(ch, method, properties, body): 14 print('[x] Received %r' % body) 15 time.sleep(20) 16 print("[x] Done") 17 18 19 channel.basic_consume(queue='hello', 20 auto_ack=True, 21 on_message_callback=callback) 22 print(' [*] Waiting for messages.') 23 channel.start_consuming()
此时开三个控制台,一个news_task,两个worker,运行结果
此时可以看出在我们单第一个worker任务没完成时候,如果再接新的任务的时候,第二个worker1就去接了。所以在这里RabbitMQ将每个消息依次发送给下一个消费者,平均的话,每个消费者收到的消息数量是相同的,这种分发消息的方式就是循环。
消息确认
执行任务的时候可能需要一段的时间,如果其中一个工作者只完成了部分的任务而被杀死,在这种情况下,RabbitMQ一旦将消息发送出去,便会立即标记删除,这样就会丢失正在处理的消息。但是又不想丢失任何消息,RabbitMQ支持消息确认,消费者发送回一个确认(acknowledgement)告知RabbitMQ已经接收,处理了这个任务,此时RabbitMQ就可以删除。
如果工作者进程突然死掉,RabbitMQ知道消息未完全处理完,并将重新排队。如果有其它消费者在它会分发给另外一个消费者,确保任务不被丢失
默认情况下,是处于打开消息确认的,但是在上边我们设置了auto_ack=True是关闭了确认。
公平分发
引发的问题:
如果RabbitMQ不考虑负载的话,只管依次分发,平均分配消息的话,比如一个工作进程特别忙,而另外一个工作进程几乎没事干,这样就会消耗大量的时间。
为了克服这个问题:我们可以配置basic_qos方法与prefetch_count=1,这样就是告诉RabbitMQ如果我这个工作进程没处理完任务就不要再给我发任务了,给其它的工作进程发。
下面两张图可以说明任务未完成就会给recv2工作进程发任务了
消息持久化+公平分发的代码
sender.py
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import time 5 import sys 6 import pika 7 8 credentials = pika.PlainCredentials('admin', 'admin123456') 9 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials)) 10 channel = connection.channel() 11 # 如果RabbitMQ服务器突然停止,任务也将会丢失,所有配置上durable=True 队列持久 12 # queue_declare在这里申明队列的时候,生产者消费者都要进行设置 13 channel.queue_declare(queue='task_queue', durable=True) 14 message = ' '.join(sys.argv[1:]) or "hello world %s" % time.time() 15 channel.basic_publish(exchange='', 16 routing_key='task_queue', 17 body=message, 18 # 将消息标记为持久性delivery_mode=2 19 properties=pika.BasicProperties(delivery_mode=2,) # 消息持久化 20 ) 21 22 print('[x] Sent %r' % message) 23 connection.close()
recv1.py
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import time 5 import pika 6 7 credentials = pika.PlainCredentials('admin', 'admin123456') 8 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials)) 9 channel = connection.channel() 10 channel.queue_declare('task_queue', durable=True) 11 12 13 def callback(ch, method, properties, body): 14 print("received %s" % body) 15 time.sleep(20) 16 print('[x] Done') 17 # 确认必须是建立在同一条通道上发送(如果不在同一通道上将会导致通道协议异常) 18 ch.basic_ack(delivery_tag=method.delivery_tag) 19 20 21 channel.basic_qos(prefetch_count=1) 22 channel.basic_consume(queue='task_queue', 23 on_message_callback=callback, 24 ) 25 26 print(' [*] Waiting for messages. To exit press CTRL+C') 27 channel.start_consuming()
recv2.py
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import pika 5 6 credentials = pika.PlainCredentials('admin', 'admin123456') 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials)) 8 channel = connection.channel() 9 channel.queue_declare('task_queue', durable=True) 10 11 12 def callback(ch, method, properties, body): 13 print("received %s" % body) 14 print('[x] Done') 15 # 确认必须是建立在同一条通道上发送(如果不在同一通道上将会导致通道协议异常) 16 ch.basic_ack(delivery_tag=method.delivery_tag) 17 18 19 channel.basic_qos(prefetch_count=1) 20 channel.basic_consume(queue='task_queue', 21 on_message_callback=callback, 22 ) 23 24 print(' [*] Waiting for messages. To exit press CTRL+C') 25 channel.start_consuming()
重新启动rabbitmq任务依然存在