zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列(二): 工作队列

    1. 工作队列:

    对于资源密集型任务,我们等待其处理完成在很多情况下是不现实的,比如无法在http的短暂请求窗口中处理大量耗时任务,

    为了达到主线程无需等待,任务异步执行的要求,我们可以将任务加入任务队列,如图,多个workers可以共享

    同一个任务队列,同时对任务进行处理,主线程P将延后任务发送到队列之后即可返回,延迟任务由C1和C2处理完成;

    2. 轮询调度:

    队列会将消息轮询分发给worker,如上图两个worker,则首先发送消息到C1,然后发送消息到C2,然后在发送消息到C1,C2,C1...,

    队列按顺序发送,这样保证了每个worker收到的消息是均等的,默认设置情况下,队列并不会考虑worker当前的负载情况。

    3. 均衡调度:

    如2中所说,比如现在有两个队列,奇数消息都需要队列做大量繁重的处理,而偶数消息则需要处理的逻辑非常少,这样就会造成某个队列

    任务繁重,等待处理任务过多,从而使消息处理不均衡,处理能力下降。面对这样的情况,RabbitMQ提供了均衡调度机制,指定worker

    只能接收一条消息,当worker处理完毕,队列收到消息确认(4中描述)的时候,才会派发给该worker一条新消息。由此,达到对消息和队列处理能力的均衡调度。

    如下,我们可以使用basic_qos,并将perfetch_count设置为1,来告诉队列每次只发送一条消息给当前worker,直到收到完成确认才发下一条。

    channel.basic_qos(prefetch_count=1)

    4. 消息确认:

    当不使用消息确认的情况下,队列将消息投递给worker之后,会立即将消息从队列内存中删除;此时,如果woker被停掉或者崩溃,

    那么worker当前正在处理的消息和队列已经派发给worker的消息都会丢失。

    RabbitMQ提供了消息确认机制,worker完成处理消息之后发送ack,队列确认消息已处理完毕,才将其从内存中删除。但是这个过程没有

    超时,哪怕woker处理了很长时间也是没问题的。当worker挂掉,队列没有收到消息ack,如果有其他worker在运行,那么worker

    会将未确认的消息派发给其他运行中的worker。ack确认机制默认是开启的,当然可以在channel中关闭。

    注意,一定要确保在消息处理完成之后发送ack,否则队列内存将会随消息的增加而不断增加,甚至造成内存耗尽。

    5. 消息持久化:

    消息ack解决了worker挂掉时候消息的安全性,但是无法针对整个服务的重启或者挂掉,当RabbitMQ重启或者挂掉的时候,队列和消息都会消失,

    为了避免这种情况发生,我们需要设置队列和消息持久化。

    (1) 设置队列持久化:durable=True

    channel.queue_declare(queue='task_queue', durable=True)

    (2) 设置消息持久化:delivery_mode=2

    channel.basic_publish(exchange='',
                          routing_key="task_queue",
                          body=message,
                          properties=pika.BasicProperties(
                             delivery_mode = 2, # make message persistent
                          ))

    上述设置虽然一定程度上保证了消息持久化,但是在收到消息和持久化消息之间仍然有时间窗口存在,且并不是每条消息都会写一次磁盘,

    所以这个时间窗口内仍然可能丢失消息,如果要确保持久化足够健壮,请参考 https://www.rabbitmq.com/confirms.html

    6. 测试代码:

    new_task.py--用于发送消息到队列

     1 #!/usr/bin/env python
     2 import pika
     3 import sys
     4 
     5 connection = pika.BlockingConnection(pika.ConnectionParameters(
     6         host='localhost'))
     7 channel = connection.channel()
     8 
     9 # 设置队列持久化
    10 channel.queue_declare(queue='task_queue', durable=True)
    11 
    12 message = ' '.join(sys.argv[1:]) or "Hello World!"
    13 channel.basic_publish(exchange='',
    14                       routing_key='task_queue',
    15                       body=message,
    16                       properties=pika.BasicProperties(
    17                          delivery_mode = 2, # 设置消息持久化
    18                       ))
    19 print(" [x] Sent %r" % message)
    20 connection.close()

    worker.py--用于接收队列消息并完成消息处理

     1 #!/usr/bin/env python
     2 import pika
     3 import time
     4 
     5 connection = pika.BlockingConnection(pika.ConnectionParameters(
     6         host='localhost'))
     7 channel = connection.channel()
     8 
     9 # 设置队列持久化
    10 channel.queue_declare(queue='task_queue', durable=True)
    11 print(' [*] Waiting for messages. To exit press CTRL+C')
    12 
    13 def callback(ch, method, properties, body):
    14     print(" [x] Received %r" % body)
    15     time.sleep(body.count(b'.'))
    16     print(" [x] Done")
    17     # 完成消息处理,发送ack确认消息    
    18     ch.basic_ack(delivery_tag = method.delivery_tag)
    19 
    20 # 最多同时接受一条消息
    21 channel.basic_qos(prefetch_count=1)
    22 channel.basic_consume(callback,
    23                       queue='task_queue')
    24 
    25 channel.start_consuming()
  • 相关阅读:
    人月神话读书笔记
    读人月神话有感
    Codeforces 137D
    Codeforces 1138B
    <WFU暑假训练一> 解题报告
    Codeforces 1250B
    Codeforces 1038D
    Codeforces 1202D
    Codeforces 87B
    Codeforces 208C
  • 原文地址:https://www.cnblogs.com/wanpengcoder/p/5289097.html
Copyright © 2011-2022 走看看