zoukankan      html  css  js  c++  java
  • python之rabbitMQ二:队列、消息持久化

     

    一、队列持久化

     声明队列queue_declare方法的原型 :

    channel.queue_declare(queue='', passive=False, durable=False,
                          exclusive=False, auto_delete=False,
                          arguments=None):

    queue: 队列名称

    durable: 是否持久化, 队列的声明默认是False,即存放到内存中的,如果rabbitmq重启会丢失。

      如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库。

    exclusive:是否排外的,默认为False,不排外。有两个作用:

      一:当连接关闭时connection.close()该队列是否会自动删除;

      二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题;

      如果是排外的,会对当前队列加锁,只允许当前消费者可以访问,其他通道channel是不能访问的,如果强制访问会报异常:ShutdownSignalException: channel error

      一般等于true的话用于一个队列只能有一个消费者来消费的场景 。

    auto_delete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,默认为False。

      可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时,即没有任务消费者时,队列就会自动删除

    arguments: 
    队列中的消息什么时候会自动被删除?
    Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期),

    也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间:properties=pika.BasicProperties(.........)


    Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp

    Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim

    Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B

    Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX

    Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK

    Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,

    Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中
    Master locator(x-queue-master-locator) 

    关于队列的声明,如果使用同一套参数进行声明了,就不能再使用其他参数来声明,要么删除该队列重新删除,可以使用命令行删除也可以在RabbitMQ Management上删除,要么给队列重新起一个名字。

    队列持久化:

    重启RabbitMQ服务器(可以通过rabbitmqctl stop_app关闭服务器,rabbitmqctl start_app重启服务器),可以登录RabbitMQ Management—> Queues中。如果队列设置了持久化,则可以看到之前声明的队列还存在。

    二、消息持久化

    消息确认机制:

    如果消费消息时发生异常,队列中也没有了消息,服务器无法知道此消息是否成功,此时将发生丢失吗?

    不会,因为有消息确认机制:事实上,此条消息被消费者取出,队列中没有了此消息(但会暂时保存在其它地方)。但是rabbitMQ有消息消费成功与否的确认机制,如果消息异常,即失败,此条消息将从另外一个地方重新放回队列中;如果成功才会根据配置在多长时间内删除这条消息。

    消息持久化:

    如果消息服务器宕机,服务器中的队列和消息是否会被保存?

    如果没有启用消息持久化(默认值),消息是保存在内存中的,宕机将丢失队列 和消息。

    如果设置了队列、消息持久化,则会保存在erlang自带的数据库中,重启服务器后将恢复队列和消息。

     设置消息持久化必须先设置队列持久化,要不然队列不持久化,消息持久化,队列都不存在了,消息存在还有什么意义。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到持久化的目的。

    为单条消息设置持久化:发布消息时,设置参数properties=pika.BasicProperties(delivery_mode=2),2为持久化,1为非持久化。

    复制代码
    channel.basic_publish(exchange='',
                          routing_key="task_queue",
                          body=message,
                          properties=pika.BasicProperties(
                             delivery_mode = 2, # make message persistent
                          ))
    复制代码

    Message TTL消息剩余生存时间
    为该队列的所有消息统一设置相同的声明周期:统一设置队列中的所有消息的过期时间,例如设置10秒,10秒后这个队列的消息清零

    arguments.put("x-message-ttl", 10000);
    
    // 声明队列时指定队列中的消息过期时间
    channel.queue_declare(QUEUE_NAME, false, false, false, arguments); 

    Auto Expire自动过期
    x-expires用于当多长时间没有消费者访问该队列的时候,该队列会自动删除,可以设置一个延迟时间,如仅启动一个生产者,10秒之后该队列会删除,或者启动一个生产者,再启动一个消费者,消费者运行结束后10秒,队列也会被删除

    Max Length最大长度
    x-max-length:用于指定队列的长度,如果不指定,可以认为是无限长,例如指定队列的长度是4,当超过4条消息,前面的消息将被删除,给后面的消息腾位

    Max Length Bytes代码片段
    x-max-length-bytes: 用于指定队列存储消息的占用空间大小,当达到最大值是会删除之前的数据腾出空间

    Maximum priority最大优先级
    x-max-priority: 设置消息的优先级,优先级值越大,越被提前消费。

    正常情况下不适用优先级 
    Hello RabbitMQ: 1 
    Hello RabbitMQ: 2 
    Hello RabbitMQ: 3 
    Hello RabbitMQ: 4 
    Hello RabbitMQ: 5

    使用优先级顺序正好相反 
    Hello RabbitMQ: 5 
    Hello RabbitMQ: 4 
    Hello RabbitMQ: 3 
    Hello RabbitMQ: 2 
    Hello RabbitMQ: 1

     

    三、示例 

    一对一的生产者、消费者的消息队列模式:

    生产者:

    复制代码
    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue', durable=True)
    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 此消息持久化
                          ))
    print(" [x] Sent %r" % message)
    connection.close()
    复制代码

    消费者:

    复制代码
    #!/usr/bin/env python
    import pika
    import time
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        # 消息确认:消费者完成消费后,发送确认消息给服务器
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue', durable=True)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,
                          queue='task_queue',
                          # no_ack=False  # 默认为False
                          )
    
    channel.start_consuming()
    复制代码

    1.如果消费者,从队列中取到消息,但消费失败了?怎么保证此条消息会退还到队列中,能够被其它消息者获取到?

    消费者在消费成功时,发送消息确认即可;在代码中,必须这两个地方都要实现:

    • 在callback消费者函数中,发送确认消息:ch.basic_ack(delivery_tag=method.delivery_tag)
    • 消费都的channel.basic_consume中的no_ack参数使用默认值False

    2.如果rabbitMQ服务器挂了,怎么保证在服务器重启用,队列中的消息不丢失?

    队列持久化,且消息持久化。必须两者都持久化。代码:

    • 在声明队列时,使用参数durable=True使队列持久化:
      • channel.queue_declare(queue='task_queue', durable=True)
    • 设置队列的Message TTL消息剩余生存时间
    • 或者生产者在发布消息时,使用参数properties=pika.BasicProperties(delivery_mode=2)使消息持久化:channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 此消息持久化
                          )) 

    四、公平派遣

    您可能已经注意到调度仍然无法完全按照我们的要求工作。例如,在有两名工人的情况下,当所有奇怪的信息都很重,甚至信息很少时,一名工作人员会一直很忙, 另一名工作人员几乎不会做任何工作。那么,RabbitMQ不知道任何有关这一点,并仍将均匀地发送消息。

    发生这种情况是因为RabbitMQ只在消息进入队列时调度消息。它没有考虑消费者未确认消息的数量。它只是盲目地将第n条消息分发给第n位消费者。

    为了解决这个问题,我们可以使用basic.qos方法和设置prefetch_count = 1。这告诉RabbitMQ一次不要向工作人员发送多个消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。相反,它会将其分派给不是仍然忙碌的下一个工作人员。

    channel.basic_qos(prefetch_count=1)

    示例:

    生产者:new_task.py

    复制代码
    #!/usr/bin/env python
    import sys
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    
    message = ' '.join(sys.argv[1:]) or 'Hello World'
    
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 确保消息是持久的
                          ))
    print(" [x] Sent %r" % message)
    connection.close()
    复制代码

    消费者:worker.py

    复制代码
    #!/usr/bin/env python
    import time
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_consume(callback,
                          queue='hello')
    channel.basic_qos(prefetch_count=1)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    复制代码

    使用消息确认和prefetch_count,您可以设置一个工作队列。即使RabbitMQ重新启动,持久性选项也可让任务继续存在。

  • 相关阅读:
    数据库索引类型及实现方式
    MyBatis从入门到精通(十一):MyBatis高级结果映射之一对多映射
    解决克隆 centos虚拟机后修改克隆后的机器的ip、mac、uuid失败的问题
    多层表达式
    条件过滤
    复杂表达式
    生成列表
    迭代dict的key和value
    迭代dict的value
    索引迭代
  • 原文地址:https://www.cnblogs.com/fengff/p/12566689.html
Copyright © 2011-2022 走看看