zoukankan      html  css  js  c++  java
  • rabbitmq用x-delayed-message的exchange特性支持消息延迟消费

    rabbitmq版本:3.6.14

    spring-core 版本: 4.2.5.RELEASE     

    备注:如果springboot升级到比较高的版本,能用到更高版本的amqp依赖包和spring-core依赖包,有比以下实现delay延迟消费的更好的代码。

    声明exchange:

            Map<String, Object> argMaps = new HashMap<>();
            argMaps.put("x-delayed-type", "direct");
            CustomExchange exchange = new CustomExchange("ticket-exchange-joyce-test", "x-delayed-message", true, false, argMaps);
            admin.declareExchange(exchange);
            admin.declareBinding(BindingBuilder.bind(queueForPendingPayment).to(exchange).with(ticketRouteKeyPendingPayment).noargs());
    当消费者catch到异常时,触发delay延迟消费机制:
    private static final String MAX_RETRY_TIME = "max_retry_time";
    private static final String CURRENT_RETRY_ROUND = "current_retry_round";
    public static final int[] RETRY_INTERVAL = {0, 2, 4, 6, 8, 10, 12, 14};
    private static final String DELAY = "x-delay";
    Map<String, Object> header = message.getMessageProperties().getHeaders();
                int maxRetryTimes = header.containsKey(MAX_RETRY_TIME) ? (int) header.get(MAX_RETRY_TIME) : RETRY_INTERVAL.length;
                int currentRound = header.containsKey(CURRENT_RETRY_ROUND) ? Integer.valueOf(header.get(CURRENT_RETRY_ROUND).toString()) : 1;
                if (currentRound > maxRetryTimes) {
                    LOGGER.info("Message has retryed exceed max retry times, current round = {}, max retry times = {}, header current round = {}, header delay = {}" +
                                    ", quit! body: {}"
                            , currentRound, maxRetryTimes, header.get(DELAY), header.get(CURRENT_RETRY_ROUND), origin);
                } else {
                    int delay = RETRY_INTERVAL[Math.min(currentRound, RETRY_INTERVAL.length) - 1];
                    header.put(CURRENT_RETRY_ROUND, currentRound + 1);
                    header.put(DELAY, delay * 1000);
                    LOGGER.info("Retrying send message to rabbitmq, current round: {}, delay(ms): {}, header current round = {}, header delay = {}, body: {}"
                            , currentRound, delay * 1000, header.get(CURRENT_RETRY_ROUND), header.get(DELAY), origin);
                    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                            .deliveryMode(2) // message is persistent
                            .contentEncoding(StandardCharsets.UTF_8.name())
                            .headers(header)
                            .build();
                    try {
                        String exchange = message.getMessageProperties().getReceivedExchange();
                        String routekey = message.getMessageProperties().getReceivedRoutingKey();
                        LOGGER.info("retry info, exchange = {}, routekey = {}", exchange, routekey);
                        channel.basicPublish(
                                message.getMessageProperties().getReceivedExchange()
                                , message.getMessageProperties().getReceivedRoutingKey()
                                , properties
                                , message.getBody());
    
    
                    } catch (IOException ex) {
                        LOGGER.error("Rabbitmq channel error", ex);
                    }



    end.

  • 相关阅读:
    Eclipse与Tomcat
    乱入Spring+Mybatis
    windows一次无线网卡被关闭事件
    数列的考查角度收集整理2[三轮总结]
    数列的考查角度收集整理1[三轮总结]
    求函数的解析式
    不等式证明的那些事
    高中数学中最值素材整理【待编辑】
    函数与导数中常用的函数和不等关系
    坐标系与参数方程的考向整理[三轮总结]
  • 原文地址:https://www.cnblogs.com/zhuwenjoyce/p/14140003.html
Copyright © 2011-2022 走看看