/** 短信发送队列 */ public static final String QUEUE_MSG_SMS_SEND = "msg:sms:send"; /** 短信发送队列 DLX */ public static final String DLX_MSG_SMS_SEND = "msg:sms:send:dlx"; /** 短信发送队列 延迟缓冲(按消息) */ public static final String QUEUE_DELAY_PER_MESSAGE_TTL_MSG_SMS_SEND = "delay:per:message:msg:sms:send";
RabbitConfig
/** * 消息配置 * @author swt */ @Configuration public class RabbitConfig { /** * 短信发送队列 * @return */ @Bean public Queue smsQueue() { return new Queue(QUEUE_MSG_SMS_SEND, true); } /** * 短信发送队列 * @return */ @Bean public Queue smsQueueDelayPerMessageTTL() { return QueueBuilder.durable(QUEUE_DELAY_PER_MESSAGE_TTL_MSG_SMS_SEND) .withArgument("x-dead-letter-exchange", DLX_MSG_SMS_SEND) .withArgument("x-dead-letter-routing-key", QUEUE_MSG_SMS_SEND) .build(); } @Bean public DirectExchange smsDelayExchange(){ return new DirectExchange(DLX_MSG_SMS_SEND); } @Bean public Binding smsDelayBinding(Queue smsQueue, DirectExchange smsDelayExchange) { return BindingBuilder.bind(smsQueue) .to(smsDelayExchange) .with(QUEUE_MSG_SMS_SEND); } }
package cn.ug.msg.mq; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; /** * MessagePostProcessor 扩展类 * @author swt */ public class DelayMessagePostProcessor implements MessagePostProcessor { private long ttl = 0L; public DelayMessagePostProcessor(long ttl) { this.ttl = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(Long.toString(ttl)); return message; } }
测试类
package cn.ug; import cn.ug.msg.mq.DelayMessagePostProcessor; import cn.ug.msg.mq.Sms; import cn.ug.util.UF; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import static cn.ug.config.QueueName.QUEUE_DELAY_PER_MESSAGE_TTL_MSG_SMS_SEND; import static cn.ug.config.QueueName.QUEUE_MSG_SMS_SEND; @RunWith(SpringRunner.class) @SpringBootTest public class MsgServiceApplicationTests { @Resource private AmqpTemplate amqpTemplate; @Test public void contextLoads() { Sms sms = new Sms(); sms.setPhone("11111111"); sms.setType("222222222"); Map<String, String> paramMap = new HashMap<>(1); paramMap.put("code", UF.getRandomUUID().substring(0, 5)); sms.setParamMap(paramMap); for (int i=0; i<100; i++) { amqpTemplate.convertAndSend(QUEUE_DELAY_PER_MESSAGE_TTL_MSG_SMS_SEND, sms, new DelayMessagePostProcessor(i * 1000)); } System.out.println(UF.getFormatDateTime(UF.getDateTime())); } }
坑:
1、测试之前,一定要把之前的队列(如果有的话)删除、删除、删除