zoukankan      html  css  js  c++  java
  • 异步通信rabbitmq——消息重试

    目标:

        利用RabbitMQ实现消息重试和失败处理,实现可靠的消费消费。在消息消费异常时,自动延时将消息重试,当重试超过一定次数后,则列为异常消息,等待后续特殊处理。

    准备:

        TTL:Time-To-Live,通过给消息、队列设置过期时间(单位:毫秒),来控制消息、队列的生命周期。在达到时间后,消息会变成dead message。

        Dead Letter Exchanges:同普通的exchange无区别

        消息重制本质是通过消息转发来实现的。消息转发的触发是:

    rejected - the message was rejected with requeue=false,
    expired - the TTL of the message expired; or
    maxlen - the maximum allowed queue length was exceeded.
    这里,我们使用expired来实现。给消息设置TTL,到期后消息未被消费,则会变成dead messager,转发到dead letter exchange。

    流程图:

    实现:

    1、创建三个exchange。没有特殊要求

    2、创建三个queue。

    clickQueue@retry作为重试队列,需要特殊处理:

    x-dead-letter-exchange: clickExchange
    x-dead-letter-routing-key: clickKey

    x-message-ttl: 30000

    3、处理代码

    public void retry() throws IOException {
    //消息消费
    GetResponse getResponse = null;
    try {
    getResponse = rabbitUtil.fetch(DQConstant.CLICK_QUEUE_NAME, false);
    /**
    * 业务处理
    */
    throw new RuntimeException("错粗了");
    } catch (Exception e) {
    if(null != getResponse) {
    long retryCount = getRetryCount(getResponse.getProps());
    if(retryCount > 3) {
    //重试超过3次的,直接存入失败队列
    AMQP.BasicProperties properties = getResponse.getProps();
    Map<String, Object> headers = properties.getHeaders();
    if(null == headers) {
    headers = new HashMap<>();
    }
    properties.builder().headers(headers);
    rabbitUtil.send(DQConstant.CLICK_FAILED_EXCHANGE_NAME, DQConstant.CLICK_FAILED_ROUTING_KEY, properties, getResponse.getBody());
    } else {
    //重试不超过3次的,加入到重试队列
    AMQP.BasicProperties properties = getResponse.getProps();
    Map<String, Object> headers = properties.getHeaders();
    if(null == headers) {
    headers = new HashMap<>();
    }
    properties.builder().headers(headers);
    rabbitUtil.send(DQConstant.CLICK_RETRY_EXCHANGE_NAME, DQConstant.CLICK_RETRY_ROUTING_KEY, properties, getResponse.getBody());
    }
    }
    }
    if(null != getResponse) {
    rabbitUtil.ack(getResponse);
    }
    }
    private long getRetryCount(AMQP.BasicProperties properties) {
    long retryCount = 0;
    Map<String, Object> headers = properties.getHeaders();
    if(null != headers) {
    if(headers.containsKey("x-death")) {
    List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death");
    if(!deathList.isEmpty()) {
    Map<String, Object> deathEntry = deathList.get(0);
    retryCount = (Long)deathEntry.get("count");
    }
    }
    }
    return retryCount;
    }
    4、x-death的使用:message在转换成dead letter时,会在其header里添加一个名为x-death的数组。数组元素就是一次dead lettering event的记录。包含count:消息几次变成了dead letter。

    总结:

    此处只是本人的拙见。如有更好的提议,欢迎拍砖。
    ---------------------
    作者:洛杉矶的管理局
    来源:CSDN
    原文:://blog.csdn.net/qq_18991441/article/details/80692255
    版权声明:本文为博主原创文章,转载请附上博文链接!

  • 相关阅读:
    肯恩·威尔伯:整合灵性途径的几大障碍
    金刚经里面的「应无所住而生其心」,这句话怎么理解?
    成熟是人一辈子的修养,与年龄无关
    杰克·康菲尔德:灵性成熟的十个特质
    杰克·康菲尔德:我是谁?(强烈推荐)
    x11vnc:利用远程机器上的X-Server来进行VNC连接(x11vnc 和 x-server 在不同的机器上)
    Linux ALL:Tigervnc-Server
    Centos 6/Redhat 6:远程图形桌面: tigervnc
    Fedora 31 :远程图形桌面: tigervnc
    win server 2019 :【2个?远程管理用户(单会话?)】升级为远程桌面服务【多用户(多会话)】
  • 原文地址:https://www.cnblogs.com/ExMan/p/10283338.html
Copyright © 2011-2022 走看看