zoukankan      html  css  js  c++  java
  • RabbitMQ使用介绍2—Work queues

    Work queues

    接下来是part2

    在这一项中,我们创建一个工作队列,用于在多个工作者之间分配耗时的任务。

    Work Queues的主要思想是,避免立即执行资源密集的任务而不得不等待其执行完成。我们将任务封装为消息并将其发送到队列中,在后台运行的一个工作进程将会弹出任务并最终执行该任务,当你管理许多工作节点时,任务就会在他们之间共享。

    这个概念在web应用程序中尤其有用,因为在一个短HTTP请求窗口中不可能处理复杂的任务

    下面我们发送一个特殊的String,用Thread.sleep()辅助,来模拟一些耗时的工作,用点来简单表示一个任务的复杂度,例如Hello.表示此任务需要两秒进行处理。

    我们将在原来生产者的基础上修改一些代码,文件名叫send_work.py:

    import pika
    import sys
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))#默认端口5672,可不写
    
    #创建通道,声明一个管道,在管道里发送消息
    channel = connection.channel()
    #在管道里声明queue
    channel.queue_declare(queue='hello')
    message = ''.join(sys.argv[1:]) or "Hello World"
    #一条消息永远不能直接发送到队列,它总需要经过一个交换exchange
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=message)#设置routing_key(消息队列的名称)和body(发送的内容)
    print("[x] Sent %r" % message)
    connection.close()#关闭连接,队列关闭
    

    我们将在原来消费者的基础上修改一些代码,文件名叫receive_work.py:

    #receiving(消费者接收者)
    import pika
    import time
    #创建一个连接
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost'))#默认端口5672,可不写
    #创建通道,声明一个管道,在管道里发送消息
    channel = connection.channel()
    
    #把消息队列的名字为hello,把消费者和queue绑定起来,生产者和queue的也是hello
    #为什么又声明了一个hello队列
    #如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以要声明两次
    channel.queue_declare(queue='hello')
    
    #回调函数get消息体
    def callback(ch,method,properties,body):#四个参数为标准格式
        #管道内存对象,内容相关信息
        print("打印看下是什么:",ch,method,properties) #打印看下是什么
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print("[x] Done")
        
    #消费消息
    channel.basic_consume(
        queue='hello',#你要从那个队列里收消息
        on_message_callback=callback,#如果收到消息,就调用callback函数来处理消息
        auto_ack=True #写的话,如果接收消息,机器宕机消息就丢了
        #一般不写,宕机则生产者检测到发给其他消费者
    )
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming() #创建死循环,监听消息队列,可使用CTRL+C结束监听
    
    

    对于上面的,开启两个控制台的消费者,开启一个生产者的控制台,运行的结果如下:

    默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者,平均每个消费者将得到相同数量的消息。这种分发消息的方式称为循环(平均分配)

    消息确认(消息答复)

    完成一项任务可能需要一定的时间。你可能会想,如果一个消费者开始一项长时间的任务,并且只完成了一部分,那么会发生什么。在我们当前的代码中,一旦RabbitMQ向客户发送一条消息,它立即将其标记为删除。在这种情况下。如果我们强制关闭了一个工作节点。我们将丢失它正在处理的消息。我们还将丢失发送给这个特定工作者的所有消息,但是还没有处理

    通常我们不希望因为一个节点挂掉而丢失任何消息,而希望能够将这些消息传递给其他存活节点进行处理

    确保消息能够不丢失,

    RabbitMQ支持message acknowledgments(消息确认),一条特定的消息被接收后,返回一个ack告诉RabbitMQ可以随意地进行删除

    如果一个消费者挂了(通道关闭,TCP连接关闭)就不能发送ack给RabbitMQ,此时RabbitMQ就会意识到,某条消息没有被处理完成,那么就会将其重新发送到其他消费者。这种处理流程保证了信息不会丢失,即使偶尔有消费者挂掉

    没有任何消息超时,

    RabbitMQ将在某个消费者挂掉后重新传递消息,即使处理消息需要很长时间。缺省情况下,手动消息确认将被打开。在前面的例子中,我们通过autoAck=true标记显示地关闭了他们,此时将这个标志设置为false,并在完成任务后向员工发出适当的确认信息.ch.basic_ack(delivery_tag=method.delivery_tag)

    #receiving(消费者接收者)
    import pika
    import time
    #创建一个连接
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost'))#默认端口5672,可不写
    #创建通道,声明一个管道,在管道里发送消息
    channel = connection.channel()
    
    #把消息队列的名字为hello,把消费者和queue绑定起来,生产者和queue的也是hello
    #为什么又声明了一个hello队列
    #如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以要声明两次
    channel.queue_declare(queue='hello')
    
    #回调函数get消息体
    def callback(ch,method,properties,body):#四个参数为标准格式
        #管道内存对象,内容相关信息
        print("打印看下是什么:",ch,method,properties) #打印看下是什么
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print("[x] Done")
        ch.basic_ack(delivery_tag=method.delivery_tag)#消息确认
    
    #消费消息
    channel.basic_consume(
        queue='hello',#你要从那个队列里收消息
        on_message_callback=callback,#如果收到消息,就调用callback函数来处理消息
        auto_ack=True #写的话,如果接收消息,机器宕机消息就丢了
        #一般不写,宕机则生产者检测到发给其他消费者
    )
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming() #创建死循环,监听消息队列,可使用CTRL+C结束监听
    

    忘记消息确认

    错过basic_ack是一个常见的错误。这是一个简单的错误,但后果是严重的,当您的客户端退出时,消息将被重新发送(可能看起来像随机的重新发送),但是RabbitMQ将消耗越来越多的内存,因为它将无法释放任何为发送的消息。为了debug这种类型的错误,你可以使用rabbitmqctl去输出未确认消息的字段:

    sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
    

    On Windows, drop the sudo:

        rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
    

    消息持久化

    我们已经学会了如何确保即使消费者挂掉,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失

    当RabbitMQ退出或崩溃时,它将会忘记队列和消息,除非您告诉它不要这样做。需要有两件事来确保消息不会丢失,我们需要将队列和消息标记为持久的。

    首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了实现这一目的,我们需要将其声明为持久的:durable=True

    channel.queue_declare(queue='task_queue', durable=True)
    

    尽管这个命令本身是正确的,但它在我们当前的设置中是无效的,这是因为我们已经定义了一个名为hello的队列。它不是持久的,RabbitMQ不允许您重新定义具有不同参数的现有队列,并将返回任何试图执行此操作的程序的错误。但是有一个快速的解决方法——让我们声明一个有不同名称的队列。例如task_queue:

    channel.queue_declare(queue='task_queue', durable=True)
    

    这个队列名更改需要重新应用到"生产者"和"消费者"中

    此时,我们确信即使RabbitMQ重新启动,任务队列队列也不会丢失。通过提供一个值为2的delivery_mode属性

    #一条消息永远不能直接发送到队列,它总需要经过一个交换exchange
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2,#make message persistent
                                                          ))#设置routing_key(消息队列的名称)和body(发送的内容)
    

    将消息标记为持久性并不能完全保证消息丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是当RabbitMQ接受消息并没有保存它时,仍然有一个很短的时间窗口。另外RabbitMQ不会为每条消息执行fsync(2)——它可能只是保存到缓存中,而不是真正写到磁盘上。持久性保证并不强大,但对于我们的简单任务队列来说,这已经足够了。

    公平分配

    RabbitMQ在消息进入队列时仅发送一条消息。它不考虑消费者的未确认消息的数量。它只是盲目地将每个m个消息发送给第n个消费者(平均分配)

    这样可能导致一个消费者一直忙碌(很倒霉地接收到所有费时间的任务)而另一个消费者则一直很空闲(接收到都是轻任务)

    我们可以通过消费者设置prefetchCount=1来避免这个问题

    channel.basic_qos(prefetch_count=1)
    

    即将分配策略设为每次只分配一个任务,下一个任务交给首先完成第一个完成任务的消费者,如此类推

    官网:https://www.rabbitmq.com/tutorials/tutorial-two-python.html

  • 相关阅读:
    NSIS制作安装程序
    poj_1011木棒
    hdoj_1312Red and Black
    搜索题目推荐及解题报告
    应届生就职前要读的几本书
    poj_1564Sum It Up
    priority_queue用法
    hdoj_2952Counting Sheep
    poj_1154LETTERS
    poj_2362
  • 原文地址:https://www.cnblogs.com/venvive/p/11730016.html
Copyright © 2011-2022 走看看