zoukankan      html  css  js  c++  java
  • RocketMQ之八:重试队列,死信队列,消息轨迹

    问题思考

    • 死信队列的应用场景?
    • 死信队列中的数据是如何产生的?
    • 如何查看死信队列中的数据?
    • 死信队列的读写权限?
    • 死信队列如何消费?
    • 重试队列和死信队列的配置
    • 消息轨迹

    1、应用场景

    一般应用在当正常业务处理时出现异常时,将消息拒绝则会进入到死信队列中,有助于统计异常数据并做后续的数据修复处理;

    2、数据是如何产生的?

    重试队列在重试16次(默认次数)将消息放入死信队列

    参考: https://blog.csdn.net/hqwang4/article/details/99971596

    3、如何查看死信队列中的数据?

     通过console查看死信队列的消息,报如下异常:
    org.apache.rocketmq.client.exception.MQClientException: Can not find Message Queue for this topic, %DLQ%RetryConsumer See http://rocketmq.apache.org/docs/faq/ for further details.

    但是在broker机器上通过命令行查看topic,死信队列确实存在。

    sh bin/mqadmin topiclist -n 10.200.110.46:9876;10.200.110.101:9876


    4、死信队列的读写权限

    4.1、查看该topic信息,发现perm为2

    sh bin/mqadmin topicRoute -n 10.200.110.46:9876 -n 10.200.110.101:9876 -t %DLQ%groupnamedef2

    [root@localhost rocketmq-4.5.2]# sh bin/mqadmin topicRoute -n 10.200.110.46:9876 -n 10.200.110.101:9876 -t %DLQ%groupnamedef2
    RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
    RocketMQLog:WARN Please initialize the logger system properly.
    {
        "brokerDatas":[
            {
                "brokerAddrs":{0:"10.200.110.46:10911"
                },
                "brokerName":"broker-b",
                "cluster":"rocketmq-cluster"
            }
        ],
        "filterServerTable":{},
        "queueDatas":[
            {
                "brokerName":"broker-b",
                "perm":2,
                "readQueueNums":1,
                "topicSynFlag":0,
                "writeQueueNums":1
            }
        ]
    }
    [root@localhost rocketmq-4.5.2]#

    4.2、修改死信队列的权限

    命令行方式:

    第一个broker机器执行:sh mqadmin updateTopic -b 10.200.110.46:10911 -n 10.200.110.46:9876 -t %DLQ% groupnamedef2 -p 6

    [root@localhost rocketmq-4.5.2]# sh bin/mqadmin updateTopic -b 10.200.110.46:10911 -n 10.200.110.46:9876 -t %DLQ% groupnamedef2 -p 6
    RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
    RocketMQLog:WARN Please initialize the logger system properly.
    create topic to 10.200.110.46:10911 success.
    TopicConfig [topicName=%DLQ%, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false][root@localhost rocketmq-4.5.2]# 

    第二个broker机器执行:sh mqadmin updateTopic -b 10.200.110.48:10911 -n 10.200.110.46:9876 -t %DLQ% groupnamedef2 -p 6

    [root@localhost rocketmq-4.5.2]# sh bin/mqadmin updateTopic -b 10.200.110.48:10911 -n 10.200.110.46:9876 -t %DLQ% groupnamedef2 -p 6
    RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
    RocketMQLog:WARN Please initialize the logger system properly.
    create topic to 10.200.110.48:10911 success.
    TopicConfig [topicName=%DLQ%, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false][root@localhost rocketmq-4.5.2]#

    或者在控制台中操作:
    查看死信队列的权限及修改该topic为写权限

    修改死信队列读写权限后,查询Message

    5、死信队列如何消费

    死信队列中的数据需要通过新订阅该topic进行消费。

    每个topic被消费后,如果消费失败超过次数会进入重试队列、死信队列等。名称会以

    • %RETRY%消费组名称
    • %DLQ%消费组名称

    例如:

    我的普通队列是:topic-0903

    消费组及消费者是:

    mq.rocketmq.consumers[3].topic: topic-0903~*

    mq.rocketmq.consumers[3].groupName: group0903

    多次消费失败后,会生成:

     如果要通过API读取死信队列的内容,即%DLQ%group0903的内容,则需要重新定义消费者:

    消费死信队列%DLQ%group0903的消费者定义:

    mq.rocketmq.consumers[4].topic: '%DLQ%group0903~*'

    mq.rocketmq.consumers[4].groupName: groupdlq0903

    mq.rocketmq.consumers[4].id: "rocketmq_consumer_dlq0903"

    6、重试队列和死信队列的配置

    消费端,一直不回传消费的结果。rocketmq认为消息没收到,consumer下一次拉取,broker依然会发送该消息。

    所以,任何异常都要捕获返回ConsumeConcurrentlyStatus.RECONSUME_LATER,rocketmq会放到重试队列。

    这个重试TOPIC的名字是

    %RETRY%+consumergroup的名字

    在控制台上过一会就可以查到。

    重试的消息在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,此时需要人工干预了。

    消息重试的间隔时间可以在broker端配置:

    在broker的配置文件里配置:messageDelayLevel =1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    例如:

    我的重试间隔时间为5秒,订阅死信队列的消费端收到消息并打印。

    7、消息轨迹

    1.Broker配置 首先看下broker.conf配置的两个属性

    属性

    默认值

    traceTopicEnable

    false

    msgTraceTopicName

    RMQ_SYS_TRACE_TOPIC

     

     

  • 相关阅读:
    网页设计~老生常谈~浏览器兼容2个主要问题的解决
    谈谈网页功能测试
    从PMP学习中浅谈公司行政工作
    肉肉谈对需求设计的想法到底是功能驱动界面?还是界面驱动功能?
    jndi和rmi学习
    mysql赋值变量:=的使用
    用Cookies和HashTable制作购物车
    nginx实现简单的反向代理
    .net Form认证扩展保存 Object 类型
    基于Docker搭建私有镜像仓库
  • 原文地址:https://www.cnblogs.com/duanxz/p/3896994.html
Copyright © 2011-2022 走看看