zoukankan      html  css  js  c++  java
  • SpringBoot 整合 RabbitMQ实现延迟队列

    /** 短信发送队列 */
    	public static final String QUEUE_MSG_SMS_SEND = "msg:sms:send";
    
    	/** 短信发送队列 DLX */
    	public static final String DLX_MSG_SMS_SEND = "msg:sms:send:dlx";
    
    	/** 短信发送队列 延迟缓冲(按消息) */
    	public static final String QUEUE_DELAY_PER_MESSAGE_TTL_MSG_SMS_SEND = "delay:per:message:msg:sms:send";
    

      

    RabbitConfig
    /**
     * 消息配置
     * @author swt
     */
    @Configuration
    public class RabbitConfig {
    
        /**
         * 短信发送队列
         * @return
         */
        @Bean
        public Queue smsQueue() {
            return new Queue(QUEUE_MSG_SMS_SEND, true);
        }
    
        /**
         * 短信发送队列
         * @return
         */
        @Bean
        public Queue smsQueueDelayPerMessageTTL() {
            return QueueBuilder.durable(QUEUE_DELAY_PER_MESSAGE_TTL_MSG_SMS_SEND)
                    .withArgument("x-dead-letter-exchange", DLX_MSG_SMS_SEND)
                    .withArgument("x-dead-letter-routing-key", QUEUE_MSG_SMS_SEND)
                    .build();
        }
    
        @Bean
        public DirectExchange smsDelayExchange(){
            return new DirectExchange(DLX_MSG_SMS_SEND);
        }
    
        @Bean
        public Binding smsDelayBinding(Queue smsQueue, DirectExchange smsDelayExchange) {
            return BindingBuilder.bind(smsQueue)
                    .to(smsDelayExchange)
                    .with(QUEUE_MSG_SMS_SEND);
        }
    
    }
    

      

    package cn.ug.msg.mq;
    
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    
    /**
     * MessagePostProcessor 扩展类
     * @author swt
     */
    public class DelayMessagePostProcessor implements MessagePostProcessor {
    
        private long ttl = 0L;
    
        public DelayMessagePostProcessor(long ttl) {
            this.ttl = ttl;
        }
    
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setExpiration(Long.toString(ttl));
            return message;
        }
    
    }
    

      测试类

    package cn.ug;
    
    import cn.ug.msg.mq.DelayMessagePostProcessor;
    import cn.ug.msg.mq.Sms;
    import cn.ug.util.UF;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import javax.annotation.Resource;
    import java.util.HashMap;
    import java.util.Map;
    
    import static cn.ug.config.QueueName.QUEUE_DELAY_PER_MESSAGE_TTL_MSG_SMS_SEND;
    import static cn.ug.config.QueueName.QUEUE_MSG_SMS_SEND;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class MsgServiceApplicationTests {
    
    
    	@Resource
    	private AmqpTemplate amqpTemplate;
    
    	@Test
    	public void contextLoads() {
    
    		Sms sms = new Sms();
    		sms.setPhone("11111111");
    		sms.setType("222222222");
    
    		Map<String, String> paramMap = new HashMap<>(1);
    		paramMap.put("code", UF.getRandomUUID().substring(0, 5));
    		sms.setParamMap(paramMap);
    		for (int i=0; i<100; i++) {
    			amqpTemplate.convertAndSend(QUEUE_DELAY_PER_MESSAGE_TTL_MSG_SMS_SEND, sms, new DelayMessagePostProcessor(i * 1000));
    		}
    		System.out.println(UF.getFormatDateTime(UF.getDateTime()));
    	}
    
    }
    

      坑:

    1、测试之前,一定要把之前的队列(如果有的话)删除、删除、删除

  • 相关阅读:
    Java反射机制源码分析及知识点总结
    Dubbo admin 在Windows下的安装和服务发现
    Redis知识点总结
    Error:(xx) java: -source 1.5 中不支持
    Java中的线程间通信
    linux主机名显示bogon问题
    Linux(CentOS)上安装Apache Hadoop
    Java虚拟机(JVM)及其体系结构
    在微服务领域中处理分布式事务
    Redis持久化
  • 原文地址:https://www.cnblogs.com/song-wentao/p/8668774.html
Copyright © 2011-2022 走看看