zoukankan      html  css  js  c++  java
  • python中RabbitMQ的使用(工作队列)

    消息可以理解为任务,消息发送者可以看成任务派送者(sender),消息接收者可以看成工作者(worker)。

    当工作者接收到一个任务,还没完任务时分配者又发一个任务,此时需要多个工作者来共同处理这些任务。

    任务分派结构图如下:

    注:此时有一个任务派送人P,两个工作接收者C1和C2。

    现在我们来模拟该情况:

    1.首先打开三个终端:

    2.分别在前两个终端运行receive1.py

     3.在第三个终端多次运行send1.py

     此时将会轮流向worker1和worker2分派任务。

    问题:

    在以上任务分配和完成情况中,有几个问题将会产生:

    1.工作者任务是否完成?

    2.工作者挂掉后,如何防止未完成的任务丢失,并且如何处理这些任务?

    3.RabbitMQ自身出现问题,此时如何防止任务丢失?

    4.任务有轻重之分,如何实现公平调度?

    方案:

    1.消息确认(Message acknowledgment)

    当任务完成后,工作者(receiver)将消息反馈给RabbitMQ:

    复制代码
    1 def callback(ch, method, properties, body):
    2     print " [x] Received %r" % (body,)
    3     #停顿5秒,方便ctrl+c退出
    4     time.sleep(5)
    5     print " [x] Done"
    6     #当工作者完成任务后,会反馈给rabbitmq
    7     ch.basic_ack(delivery_tag=method.delivery_tag)
    复制代码

    2.保留任务(no_ack=False)

    当工作者挂掉后,防止任务丢失:

    # 去除no_ack=True参数或者设置为False后可以实现
    # 一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
    channel.basic_consume(callback, queue='task_queue', no_ack=False)

    3.消息持久化存储(Message durability)

    声明持久化存储:

    # durable=True即声明持久化存储
    channel.queue_declare(queue='task_queue', durable=True)

    在发送任务时,用delivery_mode=2来标记任务为持久化存储:

    复制代码
    1 # 用delivery_mode=2来标记任务为持久化存储:
    2 channel.basic_publish(exchange='',
    3                       routing_key='task_queue',
    4                       body=message,
    5                       properties=pika.BasicProperties(
    6                           delivery_mode=2,
    7                       ))
    复制代码

    4.公平调度(Fair dispatch)

    使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务

    channel.basic_qos(prefetch_count=1)

    完整代码如下:

    receive1.py

    复制代码
     1 #!/usr/bin/env python3
     2 # -*- coding: utf-8 -*-
     3 import pika
     4 import time
     5 
     6 hostname = '192.168.1.133'
     7 parameters = pika.ConnectionParameters(hostname)
     8 connection = pika.BlockingConnection(parameters)
     9 
    10 # 创建通道
    11 channel = connection.channel()
    12 # durable=True后将任务持久化存储,防止任务丢失
    13 channel.queue_declare(queue='task_queue', durable=True)
    14 
    15 
    16 # ch.basic_ack为当工作者完成任务后,会反馈给rabbitmq
    17 def callback(ch, method, properties, body):
    18     print " [x] Received %r" % (body,)
    19     time.sleep(5)
    20     print " [x] Done"
    21     ch.basic_ack(delivery_tag=method.delivery_tag)
    22 
    23 # basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,
    24 # 即只有工作者完成任务之后,才会再次接收到任务。
    25 channel.basic_qos(prefetch_count=1)
    26 
    27 # 去除no_ack=True参数或者设置为False后可以实现
    28 # 一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
    29 channel.basic_consume(callback, queue='task_queue', no_ack=False)
    30 # 开始接收信息,按ctrl+c退出
    31 print ' [*] Waiting for messages. To exit press CTRL+C'
    32 channel.start_consuming()
    复制代码

    send1.py

    复制代码
     1 #!/usr/bin/env python3
     2 # -*- coding: utf-8 -*-
     3 import pika
     4 import random
     5 
     6 hostname = '192.168.1.133'
     7 parameters = pika.ConnectionParameters(hostname)
     8 connection = pika.BlockingConnection(parameters)
     9 
    10 # 创建通道
    11 channel = connection.channel()
    12 # 如果rabbitmq自身挂掉的话,那么任务会丢失。所以需要将任务持久化存储起来,声明持久化存储:
    13 channel.queue_declare(queue='task_queue', durable=True)
    14 
    15 number = random.randint(1, 1000)
    16 message = 'hello world:%s' % number
    17 
    18 # 在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
    19 channel.basic_publish(exchange='',
    20                       routing_key='task_queue',
    21                       body=message,
    22                       properties=pika.BasicProperties(
    23                           delivery_mode=2,
    24                       ))
    25 print " [x] Sent %r" % (message,)
    26 connection.close()
    复制代码

    示例如下:

    首先启动三个终端,两个先执行receive1.py,第三个多次执行rend1.py:

    终端3:

    此时分配三个任务,33分配给worker1,170分配给worker2,262分配给worker1

    终端1:

    worker1完成任务33后,开始任务262,我们在任务完成前使用(CRTL+C)使worker1挂掉

    终端2:

    worker2完成任务170,本来没有任务,但是worker1挂掉,此时接收他的任务262

  • 相关阅读:
    AAC音频格式分析与解码
    SIGPIPE信号
    可变参数的宏定义
    Makefile条件编译debug版和release版
    Linux下查看内存使用情况
    Trie树 字典树
    C/C++随机数生成 rand() srand()
    关于编译安装Thrift找不到libthriftnb.a的问题
    Linux下使用popen()执行shell命令
    WebSocket协议分析
  • 原文地址:https://www.cnblogs.com/ExMan/p/11772293.html
Copyright © 2011-2022 走看看