zoukankan      html  css  js  c++  java
  • 持久化和公平分发.py

    1、消息持久化
    在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,
    这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,
    RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,
    则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。 这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑…千面是因为我们在消费者端标记了ACK=True关闭了它们,如果你没有增加ACK=True或者没有回执就会出现这个问题

    生产者需要在发送消息的时候标注属性为持久化
    # 在队列中添加消息
    for i in range(100):
    message = '%s Meassage '% i or "Hello World!"
    # 发送消息
    channel.basic_publish(exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(delivery_mode=2, )) # 标记属性消息为持久化消息需要客户端应答

    # 发送消息结束,并关闭通道
    print(" [x] Sent %r" % message)


    消费者需要发送消息回执
    # 订阅回调函数,这个订阅回调函数是由pika库来调用
    def callback(ch, method, properties, body):
    """
    :param ch: 通道对象
    :param method: 消息方法
    :param properties:
    :param body: 消息内容
    :return: None
    """
    print(" [x] Received %r" % (body,))
    time.sleep(2)
    print(" [x] Done")

    # 发送消息确认,确认交易标识符
    ch.basic_ack(delivery_tag=method.delivery_tag)

    我们通过命令查看哪些消费者没有回复ack确认
    # Linux
    rabbitmqctl list_queues name messages_ready messages_unacknowledged
    # Windows
    rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

    2、队列持久化
    如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。
    但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。
    由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。 这里我们需要修改下生产者和消费者设置RabbitMQ消息的持久化**[生产者/消费者]都需要配置**

    channel.queue_declare(queue='task_queue', durable=True) # 队列持久化

    3、公平分发
    默认情况下RabitMQ会把队列里面的消息立即发送到消费者,无论该消费者有多少消息没有应答,也就是说即使发现消费者来不及处理,新的消费者加入进来也没有办法处理已经堆积的消息,因为那些消息已经被发送给老消费者了。类似下面的
    在消费者中增加:`channel.basic_qos(prefetch_count=1)`
    prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack。
    这样做的好处是,如果系统处于高峰期,消费者来不及处理,消息会堆积在队列中,新启动的消费者可以马上从队列中取到消息开始工作。
    公平分发.png

    工作过程如下:
    1. 消费者1接收到消息后处理完毕发送了ack并接收新的消息并处理
    2. 消费者2接收到消息后处理完毕发送了ack并接收新的消息并处理
    3. 消费者3接收到消息后一直处于消息中并没有发送ack不在接收消息一直等到消费者3处理完毕后发送ACK后再接收新消息

  • 相关阅读:
    VTK初学一,b_PolyVertex多个图形点的绘制
    VTK初学一,a_Vertex图形点的绘制
    Python基础学习之集合
    Apache
    NTP时间同步服务和DNS服务
    NFS服务及DHCPD服务
    samba服务及vsftpd服务
    Linux rpm和yum软件管理
    Linux网络技术管理及进程管理
    Linux RAID磁盘阵列
  • 原文地址:https://www.cnblogs.com/luoyan01/p/9734119.html
Copyright © 2011-2022 走看看