zoukankan      html  css  js  c++  java
  • RabibitMQ之Work Queues

    概述

      工作队列(任务队列)的主要思想就是避免立即执行资源密集型任务,而不得不等待它去完成,相反的我们安排任务在以后完成这个任务,我们只需要将消息发送到队列中,在后台运行的工作进程弹出任务最终执行这个任务。

     

     循环调度

      使用任务队列的优点就是能够轻松并行化工作(消费这个任务)

    news_task.py

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import pika
     5 import sys
     6 
     7 credentials = pika.PlainCredentials('admin', 'admin123456')
     8 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials))
     9 channel = connection.channel()
    10 channel.queue_declare(queue='hello')
    11 message = ' '.join(sys.argv[1:]) or "Hello World!"
    12 channel.basic_publish(exchange='',
    13                       routing_key='hello',
    14                       body=message)
    15 
    16 print('[x]发送%s' % message)
    17 connection.close()

    worker.py

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import pika
     5 import time
     6 
     7 credentials = pika.PlainCredentials('admin', 'admin123456')
     8 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials))
     9 channel = connection.channel()
    10 channel.queue_declare('hello')
    11 
    12 
    13 def callback(ch, method, properties, body):
    14     print('[x] Received %r' % body)
    15     time.sleep(20)
    16     print("[x] Done")
    17 
    18 
    19 channel.basic_consume(queue='hello',
    20                       auto_ack=True,
    21                       on_message_callback=callback)
    22 print(' [*] Waiting for messages.')
    23 channel.start_consuming()

    此时开三个控制台,一个news_task,两个worker,运行结果

    此时可以看出在我们单第一个worker任务没完成时候,如果再接新的任务的时候,第二个worker1就去接了。所以在这里RabbitMQ将每个消息依次发送给下一个消费者,平均的话,每个消费者收到的消息数量是相同的,这种分发消息的方式就是循环。

    消息确认

      执行任务的时候可能需要一段的时间,如果其中一个工作者只完成了部分的任务而被杀死,在这种情况下,RabbitMQ一旦将消息发送出去,便会立即标记删除,这样就会丢失正在处理的消息。但是又不想丢失任何消息,RabbitMQ支持消息确认,消费者发送回一个确认(acknowledgement)告知RabbitMQ已经接收,处理了这个任务,此时RabbitMQ就可以删除。

      如果工作者进程突然死掉,RabbitMQ知道消息未完全处理完,并将重新排队。如果有其它消费者在它会分发给另外一个消费者,确保任务不被丢失

      默认情况下,是处于打开消息确认的,但是在上边我们设置了auto_ack=True是关闭了确认。

    公平分发

       引发的问题:

      如果RabbitMQ不考虑负载的话,只管依次分发,平均分配消息的话,比如一个工作进程特别忙,而另外一个工作进程几乎没事干,这样就会消耗大量的时间。

      为了克服这个问题:我们可以配置basic_qos方法与prefetch_count=1,这样就是告诉RabbitMQ如果我这个工作进程没处理完任务就不要再给我发任务了,给其它的工作进程发。

    下面两张图可以说明任务未完成就会给recv2工作进程发任务了

     

    消息持久化+公平分发的代码

    sender.py

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import time
     5 import sys
     6 import pika
     7 
     8 credentials = pika.PlainCredentials('admin', 'admin123456')
     9 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials))
    10 channel = connection.channel()
    11 # 如果RabbitMQ服务器突然停止,任务也将会丢失,所有配置上durable=True 队列持久
    12 # queue_declare在这里申明队列的时候,生产者消费者都要进行设置
    13 channel.queue_declare(queue='task_queue', durable=True)
    14 message = ' '.join(sys.argv[1:]) or "hello world %s" % time.time()
    15 channel.basic_publish(exchange='',
    16                       routing_key='task_queue',
    17                       body=message,
    18                       # 将消息标记为持久性delivery_mode=2
    19                       properties=pika.BasicProperties(delivery_mode=2,)  # 消息持久化
    20                       )
    21 
    22 print('[x] Sent %r' % message)
    23 connection.close()

    recv1.py

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import time
     5 import pika
     6 
     7 credentials = pika.PlainCredentials('admin', 'admin123456')
     8 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials))
     9 channel = connection.channel()
    10 channel.queue_declare('task_queue', durable=True)
    11 
    12 
    13 def callback(ch, method, properties, body):
    14     print("received %s" % body)
    15     time.sleep(20)
    16     print('[x] Done')
    17     # 确认必须是建立在同一条通道上发送(如果不在同一通道上将会导致通道协议异常)
    18     ch.basic_ack(delivery_tag=method.delivery_tag)
    19 
    20 
    21 channel.basic_qos(prefetch_count=1)
    22 channel.basic_consume(queue='task_queue',
    23                       on_message_callback=callback,
    24                       )
    25 
    26 print(' [*] Waiting for messages. To exit press CTRL+C')
    27 channel.start_consuming()

    recv2.py

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import pika
     5 
     6 credentials = pika.PlainCredentials('admin', 'admin123456')
     7 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials))
     8 channel = connection.channel()
     9 channel.queue_declare('task_queue', durable=True)
    10 
    11 
    12 def callback(ch, method, properties, body):
    13     print("received %s" % body)
    14     print('[x] Done')
    15     # 确认必须是建立在同一条通道上发送(如果不在同一通道上将会导致通道协议异常)
    16     ch.basic_ack(delivery_tag=method.delivery_tag)
    17 
    18 
    19 channel.basic_qos(prefetch_count=1)
    20 channel.basic_consume(queue='task_queue',
    21                       on_message_callback=callback,
    22                       )
    23 
    24 print(' [*] Waiting for messages. To exit press CTRL+C')
    25 channel.start_consuming()

    重新启动rabbitmq任务依然存在

  • 相关阅读:
    用才情绽放的幸福之花
    我的爱车,你在哪里
    爱在网络,有没有错
    假如能抱着美女写诗
    只想爱你
    创业者和爱因斯坦的10大共同点(不是不可比的)
    心的感谢
    成大事必备9种能力.9种手段.9种心态
    一颗新星在陨落
    C++/C学习笔记(九)
  • 原文地址:https://www.cnblogs.com/Alexephor/p/11579474.html
Copyright © 2011-2022 走看看