zoukankan      html  css  js  c++  java
  • 利用rabbitmq 实现消息的延迟发送

    需求: 实现消息的延迟通知,每5s, 30s,60s,120s 通知一次。 就是每隔一段时间执行一次方法,该方法做业务上的处理。

    网上查rabbitmq原生是不支持延迟消息的。(rocketmq 支持), 但是可以换种方式实现: 利用其死信队列。

    rabbitmq的队列或消息可以设置过期时间,过期后会将消息放入你设置的队列中,如

    <rabbit:queue name="notify.use.delay" durable="true" auto-delete="false" exclusive="false">
            <rabbit:queue-arguments>
                <!-- 消息过期根据重新路由 -->
                <entry key="x-dead-letter-exchange" value="notifyExchange"/>
                <entry key="x-dead-letter-routing-key" value="notify.use.active"/>
            </rabbit:queue-arguments>
        </rabbit:queue>
        <!-- 定义direct exchange,绑定queue -->
        <rabbit:direct-exchange name="notifyExchange" durable="true" auto-delete="false">
            <rabbit:bindings>
                <rabbit:binding queue="notify.use.delay" key="notify.use.delay"></rabbit:binding>
                <rabbit:binding queue="notify.use.active" key="notify.use.active"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:direct-exchange>
        <bean id="notifyListener" class="com.shdy.job.NotifyListener"/>
        <rabbit:listener-container connection-factory="connectionFactory">
            <rabbit:listener queues="notify.use.active" ref="notifyListener"/>
        </rabbit:listener-container>

    消息过期后自动转入 notify.use.active 队列中。 然后设置一个监听,消费该队列既可以实现。

    //模拟发送消息type的格式是 A_S.5 ,  其中S 表示通知成功,通知成功就不在加入下个队列了,5 表示5s时间。 
    @Resource
        private AmqpTemplate amqpTemplate;
        @ResponseBody
        @RequestMapping(value = "/xx", method = RequestMethod.GET,produces = "application/json;charset=UTF-8")
        public String xx(String type){
            String[] x = type.split("\.");
            amqpTemplate.convertAndSend("notifyExchange","notify.use.delay",type,message -> {
                MessageProperties messageProperties = message.getMessageProperties();
                messageProperties.setExpiration(Integer.parseInt(x[1])*1000 + "");
                return message;
            });
    
            return "success";
        }
    
    
    public class NotifyListener implements MessageListener {
        @Resource
        private AmqpTemplate amqpTemplate;
        @Override
        public void onMessage(Message m) {
            String type = new String(m.getBody());
            System.out.println("-------------------------------------------"+m);
            if(!type.contains("S")){
                String[] x = type.split("\.");
            // 模拟如果通知不成功就将过期时间乘以2(根据自己业务变动),再次放入延迟队列中。 amqpTemplate.convertAndSend(
    "notifyExchange","notify.use.delay"+Integer.parseInt(x[1])*2,type,message -> { MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setExpiration(Integer.parseInt(x[1])*1000*2 + ""); //细节:使用该方法需spring-core4.2.6以上 return message; }); } } }

    注意: 但是测试发现一个问题: 10s 消息比后来加入的5s 的消息先通知。网上查到原因是: 只有到达队列顶部的消息才会去验证队列过期时间,

    因为10s 的消息是先加入的,所以在顶部,等待10s 到期后才执行,所以5s 反而在后面执行。 

    解决方式是定义多个延迟队列,每个队列只放一种过期时间的消息。 如 

    notify.use.delay_0(延迟5s),notify.use.delay_1(延迟30s),notify.use.delay_2 . 

    在存放的时候可以取出当前消息的延迟时间,如1 ,然后加1, 放入下一个队列, 这时候发送的消息类如 : A_S.0 , B_O.0 。
    int kk = Integer.parseInt(x[1])+1
    amqpTemplate.convertAndSend("notifyExchange","notify.use.delay_"+kk,x[0]+"."+kk)

    整个队列的过期时间可以直接设置,不用每个消息单独设置:
    <rabbit:queue name="notify.use.delay" durable="true" auto-delete="false" exclusive="false">
            <rabbit:queue-arguments>
                <entry key="x-message-ttl">
                    <!-- 队列默认消息过期时间 -->
                    <value type="java.lang.Long">5000</value>
                </entry>
                <!-- 消息过期根据重新路由 -->
                <entry key="x-dead-letter-exchange" value="notifyExchange"/>
                <entry key="x-dead-letter-routing-key" value="notify.use.active"/>
            </rabbit:queue-arguments>
        </rabbit:queue>
     
  • 相关阅读:
    10个用jQuery实现图片幻灯片/画廊效果和源码
    老赵面试题参考答案(二)
    C#的显式接口和隐式接口
    老赵面试题参考答案(三)
    C#中的参数传递:值类型(value type)和引用类型(reference type)
    word转换成html的方法
    老赵面试题参考答案(四)
    五个Metro UI 风格的网页设计
    老赵面试题参考答案(六)
    概要设计怎么写?
  • 原文地址:https://www.cnblogs.com/zhangchenglzhao/p/11412514.html
Copyright © 2011-2022 走看看