zoukankan      html  css  js  c++  java
  • RabbitMQ死循环-延长ACK时间

    一、应用背景

      今天做一个需求,要将RabbitMQ中的任务取出并执行,为防止任务执行期间出错,设置NO_ACK=FALSE标志,这样、一旦任务没有应答的话,相应的任务就会被RabbitMQ自动Re-Queue,避免丢失任务。然而、由于任务执行时间较长,通常需要五、六分钟,甚至更长;我们都知道一旦一个任务被取出执行,该任务就从Ready状态更改成Unacked状态。如图所示:

      当这个任务执行完之后,程序将向RabbitMQ发送ACK消息确认,RabbitMQ在收到ACK消息后,会将该任务移出队列;然而、问题出在任务尚未执行完毕【执行时间太久】,RabbitMQ再等了一段时间【大约两三分钟】后,一直没有收到ACK确认消息,就将该任务自动Re-Queue了【我是一个生产者,一个消费者模式】,也就是说、我们这里发生了死循环【任务永远也执行不完,因为会一直Re-Queue】。

    二、延长RabbitMQ ACK应答时间

      到这里,我们急需解决的问题就是,怎么能设置RabbitMQ延长等待ACK的时间,百度一下、两下,各种读网络文档,研究操作RabbitMQ工作的文档,查了一圈资料也没查出怎么延长RabbitMQ ACK时间【废柴啊】。至此、一直查不出来,就想问一下网友的你,你知道怎么延长RabbitMQ接受ACK应答时间么?

    三、改变解决问题方式

      在查不出如何延长ACK应答时间后,我将注意力转向如何检测当前任务操作超时的,后来在官网看到这么一段话:

      链接官网位置:http://www.rabbitmq.com/heartbeats.html#heartbeats-timeout

      

       后面、就简单测试下将heartbeat参数设置为0,以禁用心跳检测,这样基本解决了我的问题;虽然官方不建议这么做,但也是一种解决思路,如果大家有什么更好的解决办法,烦请在下面留言【先谢谢啦】。

      至此、这个问题基本阐述清楚了,如果有遇到的小伙伴,也请参考下上面的操作。

      测试代码:

    # import json
    # from concurrent.futures import ThreadPoolExecutor
    from queue import Queue
    # from threading import Thread
    
    from pika import BasicProperties, BlockingConnection, URLParameters
    from pika.exceptions import ConnectionClosed
    
    
    # from automation.aiclient.aiclient import AsyncAIRequestManager
    
    
    class RabbitMQManager:
        def __init__(self, host = 'localhost', qname = 'queue'):
            self.params = URLParameters(host)
            self.qname = qname
            self.prod_conn = None
            self.prod_chan = None
            self.cons_conn = None
            self.cons_chan = None
            self.ai_signton = None
    
        def init_prod_conn(self):
            # create send connection
            self.prod_conn = BlockingConnection(self.params)
            self.prod_chan = self.prod_conn.channel()
            self.prod_chan.queue_declare(queue = self.qname, durable = True)
    
        def init_cons_conn(self):
            # create receive connection
            self.cons_conn = BlockingConnection(self.params)
            self.cons_chan = self.cons_conn.channel()
            self.cons_chan.basic_qos(prefetch_count = 1)
            self.cons_chan.basic_consume(self.callback, queue = self.qname)
    
        def produceMessages(self, msg = None):
            try:
                if isinstance(msg, str):
                    self.prod_chan.basic_publish(exchange = '',
                                                 routing_key = self.qname,
                                                 body = msg,
                                                 properties = BasicProperties(
                                                     delivery_mode = 2,  # make message persistent
                                                 ))
                elif isinstance(msg, Queue):
                    while 0 != msg.qsize():
                        item = msg.get()
                        self.prod_chan.basic_publish(exchange = '',
                                                     routing_key = self.qname,
                                                     body = item,
                                                     properties = BasicProperties(
                                                         delivery_mode = 2,  # make message persistent
                                                     ))
                else:
                    pass
    
            except Exception as e:
                if isinstance(e, ConnectionClosed):
                    print('Reconnection established!')
                    self.init_prod_conn()
                    # last connection close, re-produce msg
                    self.produceMessages(msg)
                else:
                    print('Produce msg exception Occur, please check following error message:')
                    print(e)
    
        def consumeMessages(self):
            try:
                self.cons_chan.start_consuming()
            except Exception as e:
                print('Consume msg exception Occur, please check following error message:')
                print(e)
                if isinstance(e, ConnectionClosed):
                    print('Reconnection established!')
                    self.init_cons_conn()
                    self.consumeMessages()
    
        def callback(self, ch, method, properties, body):
            # handle message body
            print('callback....')
            print(body)
            try:
                print('Consuming....')
                self.cons_conn.process_data_events()
                # 模拟处理任务时间
                import time
                time.sleep(300)
                # if None == self.ai_signton:
                #     self.ai_signton = AsyncAIRequestManager()
                # self.ai_signton.run(eval(json.loads(json.dumps(body.decode('utf-8')), encoding = 'utf-8')))
                ch.basic_ack(delivery_tag = method.delivery_tag)
                # t = Thread(target = self.ai_signton.syncToDatabase())
                # t.start()
    
            except Exception as e:
                if isinstance(e, ConnectionClosed):
                    raise ConnectionClosed('Connection has been closed, send to reconnection.')
                else:
                    print('Current error msg:')
                    print(e)
    
        def close_prod_conn(self):
            if None != self.prod_conn:
                self.prod_conn.close()
    
        def close_cons_conn(self):
            if None != self.cons_conn:
                self.cons_conn.close()
    
        def close(self):
            self.close_prod_conn()
            self.close_cons_conn()
    

      

  • 相关阅读:
    禁用Clusterware在系统启动后自己主动启动
    码农的产品思维培养第4节----听用户饿但不要照着做《人人都是产品经理》
    android RecycleView复杂多条目的布局
    【shell脚本练习】网卡信息和简单日志分析
    Java太阳系小游戏分析和源代码
    《你是我的眼》,歌曲非常好听
    hdu 1856 More is better(并查集)
    Python 中的isinstance函数
    Python中的 isdigit()方法
    Python中的split()函数的使用方法
  • 原文地址:https://www.cnblogs.com/julygift/p/9445107.html
Copyright © 2011-2022 走看看