需求: 实现消息的延迟通知,每5s, 30s,60s,120s 通知一次。 就是每隔一段时间执行一次方法,该方法做业务上的处理。
网上查rabbitmq原生是不支持延迟消息的。(rocketmq 支持), 但是可以换种方式实现: 利用其死信队列。
rabbitmq的队列或消息可以设置过期时间,过期后会将消息放入你设置的队列中,如
<rabbit:queue name="notify.use.delay" durable="true" auto-delete="false" exclusive="false"> <rabbit:queue-arguments> <!-- 消息过期根据重新路由 --> <entry key="x-dead-letter-exchange" value="notifyExchange"/> <entry key="x-dead-letter-routing-key" value="notify.use.active"/> </rabbit:queue-arguments> </rabbit:queue> <!-- 定义direct exchange,绑定queue --> <rabbit:direct-exchange name="notifyExchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="notify.use.delay" key="notify.use.delay"></rabbit:binding> <rabbit:binding queue="notify.use.active" key="notify.use.active"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <bean id="notifyListener" class="com.shdy.job.NotifyListener"/> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="notify.use.active" ref="notifyListener"/> </rabbit:listener-container>
消息过期后自动转入 notify.use.active 队列中。 然后设置一个监听,消费该队列既可以实现。
//模拟发送消息type的格式是 A_S.5 , 其中S 表示通知成功,通知成功就不在加入下个队列了,5 表示5s时间。 @Resource private AmqpTemplate amqpTemplate; @ResponseBody @RequestMapping(value = "/xx", method = RequestMethod.GET,produces = "application/json;charset=UTF-8") public String xx(String type){ String[] x = type.split("\."); amqpTemplate.convertAndSend("notifyExchange","notify.use.delay",type,message -> { MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setExpiration(Integer.parseInt(x[1])*1000 + ""); return message; }); return "success"; } public class NotifyListener implements MessageListener { @Resource private AmqpTemplate amqpTemplate; @Override public void onMessage(Message m) { String type = new String(m.getBody()); System.out.println("-------------------------------------------"+m); if(!type.contains("S")){ String[] x = type.split("\.");
// 模拟如果通知不成功就将过期时间乘以2(根据自己业务变动),再次放入延迟队列中。 amqpTemplate.convertAndSend("notifyExchange","notify.use.delay"+Integer.parseInt(x[1])*2,type,message -> { MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setExpiration(Integer.parseInt(x[1])*1000*2 + ""); //细节:使用该方法需spring-core4.2.6以上 return message; }); } } }
注意: 但是测试发现一个问题: 10s 消息比后来加入的5s 的消息先通知。网上查到原因是: 只有到达队列顶部的消息才会去验证队列过期时间,
因为10s 的消息是先加入的,所以在顶部,等待10s 到期后才执行,所以5s 反而在后面执行。
解决方式是定义多个延迟队列,每个队列只放一种过期时间的消息。 如
notify.use.delay_0(延迟5s),notify.use.delay_1(延迟30s),notify.use.delay_2 .
在存放的时候可以取出当前消息的延迟时间,如1 ,然后加1, 放入下一个队列, 这时候发送的消息类如 : A_S.0 , B_O.0 。
int kk = Integer.parseInt(x[1])+1
amqpTemplate.convertAndSend("notifyExchange","notify.use.delay_"+kk,x[0]+"."+kk)
整个队列的过期时间可以直接设置,不用每个消息单独设置:
<rabbit:queue name="notify.use.delay" durable="true" auto-delete="false" exclusive="false"> <rabbit:queue-arguments> <entry key="x-message-ttl"> <!-- 队列默认消息过期时间 --> <value type="java.lang.Long">5000</value> </entry> <!-- 消息过期根据重新路由 --> <entry key="x-dead-letter-exchange" value="notifyExchange"/> <entry key="x-dead-letter-routing-key" value="notify.use.active"/> </rabbit:queue-arguments> </rabbit:queue>