zoukankan      html  css  js  c++  java
  • python中RabbitMQ的使用(工作队列)

    消息可以理解为任务,消息发送者可以看成任务派送者(sender),消息接收者可以看成工作者(worker)。

    当工作者接收到一个任务,还没完任务时分配者又发一个任务,此时需要多个工作者来共同处理这些任务。

    任务分派结构图如下:

    注:此时有一个任务派送人P,两个工作接收者C1和C2。

    现在我们来模拟该情况:

    1.首先打开三个终端:

    2.分别在前两个终端运行receive1.py

     3.在第三个终端多次运行send1.py

     此时将会轮流向worker1和worker2分派任务。

    问题:

    在以上任务分配和完成情况中,有几个问题将会产生:

    1.工作者任务是否完成?

    2.工作者挂掉后,如何防止未完成的任务丢失,并且如何处理这些任务?

    3.RabbitMQ自身出现问题,此时如何防止任务丢失?

    4.任务有轻重之分,如何实现公平调度?

    方案:

    1.消息确认(Message acknowledgment)

    当任务完成后,工作者(receiver)将消息反馈给RabbitMQ:

    1 def callback(ch, method, properties, body):
    2     print " [x] Received %r" % (body,)
    3     #停顿5秒,方便ctrl+c退出
    4     time.sleep(5)
    5     print " [x] Done"
    6     #当工作者完成任务后,会反馈给rabbitmq
    7     ch.basic_ack(delivery_tag=method.delivery_tag)

    2.保留任务(no_ack=False)

    当工作者挂掉后,防止任务丢失:

    # 去除no_ack=True参数或者设置为False后可以实现
    # 一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
    channel.basic_consume(callback, queue='task_queue', no_ack=False)

    3.消息持久化存储(Message durability)

    声明持久化存储:

    # durable=True即声明持久化存储
    channel.queue_declare(queue='task_queue', durable=True)

    在发送任务时,用delivery_mode=2来标记任务为持久化存储:

    1 # 用delivery_mode=2来标记任务为持久化存储:
    2 channel.basic_publish(exchange='',
    3                       routing_key='task_queue',
    4                       body=message,
    5                       properties=pika.BasicProperties(
    6                           delivery_mode=2,
    7                       ))

    4.公平调度(Fair dispatch)

    使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务

    channel.basic_qos(prefetch_count=1)

    完整代码如下:

    receive1.py

     1 #!/usr/bin/env python3
     2 # -*- coding: utf-8 -*-
     3 import pika
     4 import time
     5 
     6 hostname = '192.168.1.133'
     7 parameters = pika.ConnectionParameters(hostname)
     8 connection = pika.BlockingConnection(parameters)
     9 
    10 # 创建通道
    11 channel = connection.channel()
    12 # durable=True后将任务持久化存储,防止任务丢失
    13 channel.queue_declare(queue='task_queue', durable=True)
    14 
    15 
    16 # ch.basic_ack为当工作者完成任务后,会反馈给rabbitmq
    17 def callback(ch, method, properties, body):
    18     print " [x] Received %r" % (body,)
    19     time.sleep(5)
    20     print " [x] Done"
    21     ch.basic_ack(delivery_tag=method.delivery_tag)
    22 
    23 # basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,
    24 # 即只有工作者完成任务之后,才会再次接收到任务。
    25 channel.basic_qos(prefetch_count=1)
    26 
    27 # 去除no_ack=True参数或者设置为False后可以实现
    28 # 一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
    29 channel.basic_consume(callback, queue='task_queue', no_ack=False)
    30 # 开始接收信息,按ctrl+c退出
    31 print ' [*] Waiting for messages. To exit press CTRL+C'
    32 channel.start_consuming()

    send1.py

     1 #!/usr/bin/env python3
     2 # -*- coding: utf-8 -*-
     3 import pika
     4 import random
     5 
     6 hostname = '192.168.1.133'
     7 parameters = pika.ConnectionParameters(hostname)
     8 connection = pika.BlockingConnection(parameters)
     9 
    10 # 创建通道
    11 channel = connection.channel()
    12 # 如果rabbitmq自身挂掉的话,那么任务会丢失。所以需要将任务持久化存储起来,声明持久化存储:
    13 channel.queue_declare(queue='task_queue', durable=True)
    14 
    15 number = random.randint(1, 1000)
    16 message = 'hello world:%s' % number
    17 
    18 # 在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
    19 channel.basic_publish(exchange='',
    20                       routing_key='task_queue',
    21                       body=message,
    22                       properties=pika.BasicProperties(
    23                           delivery_mode=2,
    24                       ))
    25 print " [x] Sent %r" % (message,)
    26 connection.close()

    示例如下:

    首先启动三个终端,两个先执行receive1.py,第三个多次执行rend1.py:

    终端3:

    此时分配三个任务,33分配给worker1,170分配给worker2,262分配给worker1

    终端1:

    worker1完成任务33后,开始任务262,我们在任务完成前使用(CRTL+C)使worker1挂掉

    终端2:

    worker2完成任务170,本来没有任务,但是worker1挂掉,此时接收他的任务262

  • 相关阅读:
    【第40套模拟题】【noip2011_mayan】解题报告【map】【数论】【dfs】
    【模拟题(63550802...)】解题报告【贪心】【拓扑排序】【找规律】【树相关】
    【模拟题(电子科大MaxKU)】解题报告【树形问题】【矩阵乘法】【快速幂】【数论】
    IMemoryBufferReference and IMemoryBufferByteAccess
    SoftwareBitmap and BitmapEncoder in Windows.Graphics.Imaging Namespace
    Windows UPnP APIs
    编译Android技术总结
    Windows函数转发器
    Two Ways in Delphi to Get IP Address on Android
    Delphi Call getifaddrs and freeifaddrs on Android
  • 原文地址:https://www.cnblogs.com/jfl-xx/p/7338657.html
Copyright © 2011-2022 走看看