zoukankan      html  css  js  c++  java
  • 实测:springboot和rabbitmq实现延迟队列

    1.在springboot中引入依赖

        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    2.application.yaml配置 

    spring:
     rabbitmq:
        host: 101.200.170.1
        listener:
          simple:
            acknowledge-mode: manual
            prefetch: 10
        password: abc
        port: 5672
        username: mq-u
        virtual-host: /

    3. 交互  队列  死信队列  死信交换 的配置

    package com.jeesite.modules.test.MQMange;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    //Fanout 类型 发布订阅模式
    @Component
    public class FanoutConfig {
    
        /**
         * 定义死信队列相关信息
         */
        public final static String deadQueueName = "dead_queue";
        public final static String deadRoutingKey = "dead_routing_key";
        public final static String deadExchangeName = "dead_exchange";
        /**
         * 死信队列 交换机标识符
         */
        public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
        /**
         * 死信队列交换机绑定键标识符
         */
        public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
    
        // 邮件队列
        private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
    
        // 短信队列
        private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
        // fanout 交换机
        private String EXCHANGE_NAME = "fanoutExchange";
    
        // 1.定义邮件队列
        @Bean
        public Queue fanOutEamilQueue() {
            // 将普通队列绑定到死信队列交换机上  这是重点:将普通队列绑定到死信队列上要设置 普通队列的x-dead-letter-exchange=死信交换名称 
    x-dead-letter-routing-key=死信队列的routing-key,这样普通队列的消息过期才会进入到死信队列
    Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args); return queue; } // 2.定义短信队列 @Bean public Queue fanOutSmsQueue() { return new Queue(FANOUT_SMS_QUEUE); } // 2.定义交换机 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } // 3.队列与交换机绑定邮件队列 @Bean Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange); } // 4.队列与交换机绑定短信队列 @Bean Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange); } /** * 创建配置死信邮件队列 * * @return */ @Bean public Queue deadQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } /* * 创建死信交换机 */ @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } /* * 死信队列与死信交换机绑定,此处用到了死信的RoutingKey */ @Bean public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey); } }

    4.创建消息生产者(设置5秒后失效)

    package com.jeesite.modules.test.MQMange;
    
    
    import java.util.UUID;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.alibaba.fastjson.JSONObject;
    
    @Component
    public class FanoutProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void send(String queueName) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("email", "xx@163.com");
            jsonObject.put("timestamp", 0);
            String jsonString = jsonObject.toJSONString();
            System.out.println("生产者jsonString:" + jsonString);
            // 设置消息唯一id 保证每次重试消息id唯一
            Message message = MessageBuilder.withBody(jsonString.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                    .setMessageId(UUID.randomUUID() + "").setExpiration("50000").build(); //消息id设置在请求头里面 用UUID做全局ID
            amqpTemplate.convertAndSend(queueName, message);
        }
    }
    View Code

    5.创建死信队列的消费者

    package com.jeesite.modules.test.MQMange;
    
    import java.util.Map;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.stereotype.Component;
    
    import com.rabbitmq.client.Channel;
    
    //死信队列
    @Component
    public class FanoutDeadEamilConsumer {
    
        @RabbitListener(queues = "dead_queue")
        public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
            // 获取消息Id
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("死信邮件消费者获取生产者消息msg:"+msg+",消息id"+messageId);
            // // 手动ack
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            // 手动签收
            channel.basicAck(deliveryTag, false);
    
            System.out.println("执行结束....");
        }
    
    }
    View Code

    6.在controller里调用生产消息的方法并指定队列名称

       @GetMapping("/put1")
        public Map<String,Object> start1(Long userId, Long seckillId){
            Map<String,Object> map = new HashMap<>();
            fanoutProducer.send("fanout_email_queue");
            map.put("ID",System.currentTimeMillis());
            return map;
        }
    View Code

    7. 5秒后控制台打印死信队列的消费日志

    8.完

  • 相关阅读:
    最全的 Twitter Bootstrap 开发资源清单
    jQuery布局插件UI Layout简介及使用方法
    SQLcode错误代码汇总和sqlstate=37000的解决方案
    JQUERY插件学习之jQuery UI
    如何判断/检查一个集合(List<string>)中是否有重复的元素
    反射原理及简介
    C# 获取文件夹下的所有文件的文件名
    委托编程指南
    模块封装与程序集
    Redis Lock
  • 原文地址:https://www.cnblogs.com/tiancai/p/13161218.html
Copyright © 2011-2022 走看看