zoukankan      html  css  js  c++  java
  • java-设置手动消费的rabbit延时队列

    使用之前的预先工作

    使用rabbit支持的  x-delay-message 插件实现延时队列的方式
    rabbit版本3.7.10 erlang 23.1.3

    依赖包

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>1.5.2.RELEASE</version>
    </dependency>

    yml配置

    rabbitmq:
      username: aaa
      password: bbb
      host: ip
      port: 5672
      listener:
        simple:
    # 消费模式手动 acknowledge-mode: manual concurrency: 2
    # 最多同时在线 max-concurrency: 2 # 是否开启消费者重试(为false时关闭消费者重试) # retry: # enabled: true # 最大重试重新投递消息次数 # max-attempts: 2 # 重试重新投递消息的间隔时间(单位毫秒) # initial-interval: 600000ms

    rabbitMqConfig.java   队列的配置

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 队列配置
     *
     * @Author
     * @Date
     **/
    @Configuration
    public class XDelayConfig {
    
        /**
         * 立即消费的队列名称
         */
        public static final String IMMEDIATE_QUEUE_XDELAY = "queue.xdelay.immediate";
        /**
         * 延时的exchange交换机
         */
        public static final String DELAYED_EXCHANGE_XDELAY = "exchange.xdelay.delayed";
        public static final String DELAY_ROUTING_KEY_XDELAY = "routingkey.xdelay.delay";
    
        /**
         * 使用rabbit支持的  x-delay-message 插件实现延时队列的方式
         * rabbit版本3.7.10 erlang 23.1.3
         * 创建一个立即消费队列
         * @return
         */
        @Bean
        public Queue immediateQueue() {
            // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
            return new Queue(IMMEDIATE_QUEUE_XDELAY, true);
        }
    
        @Bean
        public CustomExchange delayExchange() {
            Map<String, Object> args = new HashMap<String, Object>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAYED_EXCHANGE_XDELAY, "x-delayed-message", true, false, args);
        }
    
        /**
         * 把立即消费的队列和延时消费的exchange绑定在一起
         *
         * @return
         */
        @Bean
        public Binding bindingNotify() {
            return BindingBuilder.bind(immediateQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY_XDELAY).noargs();
        }
    
    }

    生产者

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * 生产者
     *
     * @Author
     * @Date
     **/
    @Component
    @Slf4j
    public class XDelaySender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 生产者
         * @param msg 发送的消息
         * @param delayTime 延迟的毫秒
         */
        public void sender(String msg, int delayTime) {
            log.info("生产者,msg= " + msg + " .delayTime:" + delayTime);
            this.rabbitTemplate.convertAndSend(XDelayConfig.DELAYED_EXCHANGE_XDELAY, XDelayConfig.DELAY_ROUTING_KEY_XDELAY, msg, message -> {
                message.getMessageProperties().setDelay(delayTime);
                return message;
            });
        }
    
    }  

    消费者

    import com.alibaba.fastjson.JSONObject;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Map;
    
    /**
     * 消费者
     *
     * @Author
     * @Date
     **/
    @Component
    @EnableRabbit
    @Configuration
    @Slf4j
    public class XDelayReceiver {
    
        /**
         * 消费者
         * @param msg
         */
        @RabbitListener(queues = XDelayConfig.IMMEDIATE_QUEUE_XDELAY)
        public void waitReceiver(String msg, Channel channel, @Headers Map<String,Object> headers) throws Exception {
            try {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String getkey = (String) JSONObject.parseObject(msg).get("key");
                log.info("收到延时消息时间:" + sdf.format(new Date()) + " Delay sent. 收到延时消息:" + getkey);
                // 处理
                // todo 做想要消费的逻辑
    
            } catch (Exception e) {
                log.info("消费者,消费失败,消息:" + msg, e);
            }
            long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            log.info("waitReceiver Tag:" + deliveryTag);
            // 手动确认消费
            channel.basicAck(deliveryTag,false);
        }
    
    }
    

      

  • 相关阅读:
    八大排序
    链表的合并
    记录B站yxc的背包九讲相关代码
    C++中多态实现
    YOLOV4所用到的一些tricks
    C++中的string 和 stringstream 的知识
    博客园中插入视频
    博客园中插入网页
    面试前必须要知道的【可重入锁 自旋锁】
    面试前必须要知道的【乐观锁 悲观锁】
  • 原文地址:https://www.cnblogs.com/fatetop/p/14308344.html
Copyright © 2011-2022 走看看