zoukankan      html  css  js  c++  java
  • spring cloud延时队列的使用

    • 用户下单,需要在订单的有效截止时间前30分钟,提醒用户去使用。同时在到达有效截止时间,要将订单状态设置为失效。这时候可以用延时队列可以很好的解决,用户下单之后,计算出结束时间前半个小时的时长,发送一条延时消息提醒用户使用。订单结束的时长发送订单已经失效的消息。

    入口

    	/**
    	 * 爆品助力状态提醒
    	 *
    	 * @param req 爆品助力失败
    	 */
    	@RequestMapping(path = "/mq/product/sendProductHelpStatusMessage", method = RequestMethod.POST)
    	Integer sendProductHelpStatusMessage(@RequestBody HashMap<String,String> req);
    

    生产者

        @Override
        public Integer sendProductHelpStatusMessage(@RequestBody HashMap<String,String> req){
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            req.put("sendTime",sdf.format(new Date()));
            String beanToJson = JsonUtils.beanToJson(req);
            log.info("sendProductHelpStatusMessage:{}",beanToJson);
            productHelpStatusMessageChannel.productHelpStatusOutput().send(MessageBuilder.withPayload(beanToJson).setHeader("x-delay", req.get("delay")).build());
            return 1;
        }
    

    将消息发送出去,延时delay毫秒,同时记录下消息发送的时间。这样就可以根据传递的参数来确定延时的具体时长。

    消费者

    package org.xxx.mq.provider.consumer.product;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.xxx.mq.api.channel.consumer.ProductHelpStatusMessageChannel;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    @Slf4j
    @EnableBinding(value = {ProductHelpStatusMessageChannel.class})
    public class ProductHelpStatusConsumer {
        @StreamListener(target = ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_WX_INPUT)
        public void receiveProductHelpStatusWxMessage(String message){
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            log.info("receiveProductHelpStatusWxMessage:{},receiveTime,{}",message,sdf.format(new Date()));
    
        }
    }
    
    

    接受消息,同时记录下接受消息的时间。

    通道

    package org.xxx.mq.api.channel.consumer;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
    
    
    public interface ProductHelpStatusMessageChannel {
        //爆品助力
        String PRODUCT_HELP_STATUS_OUTPUT = "productHelpStatusOutput";
    
        String PRODUCT_HELP_STATUS_WX_INPUT = "productHelpStatusWxInput";
    
        /**
         * 爆品助力消息发送通道
         *
         * @return
         */
        @Output(ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_OUTPUT)
        MessageChannel productHelpStatusOutput();
    
    
        /**
         * 爆品助力消息订阅(微信消息)
         *
         * @return SubscribableChannel,消息订阅通道
         */
        @Input(ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_WX_INPUT)
        SubscribableChannel getProductHelpStatusWxInputChannel();
    }
    

    配置application.yml

    spring:
      datasource:
        hikari:
          maximum-pool-size: 50
          minimum-idle: 50
      cloud:
        stream:
          rabbit:
            bindings:
              #订阅通道 测试通道
              productHelpStatusOutput:
                producer:
                  delayed-exchange: true
              productHelpStatusWxInput:
                consumer:
                  auto-bind-dlq: true
                  republishToDlq: true
                  requeueRejected: true
                  delayed-exchange: true
                  dlq-ttl: ${queue.dlq.ttl}
                  dlq-dead-letter-exchange:
          bindings:
            #生产者 爆品助力状态消息发送通道
            productHelpStatusOutput:
              destination: productHelpStatusExchange
              group: productHelpFailQueueGroup
            #消费者
            productHelpStatusWxInput:
              destination: productHelpStatusExchange
              group: productHelpStatusWxGroup
              consumer:
                max-attempts: 3
                backOffInitialInterval: 1000
                backOffMaxInterval: 10000
                backOffMultiplier: 2.0
    

    需要配置等等。。。如上,
    spring.cloud.stream.rabbit.bindings.productHelpStatusOutput.producer.delayed-exchange=true
    spring.cloud.stream.bindings.productHelpStatusOutput.producer.delayed-exchange=true

    测试

    打开postman,请求接口,

    {
        "delay": 10000,
        "orderSn":1
    }
    

    orderSn订单和delay延时时间(单位为毫秒)。请求orderSn=1的延时10秒delay=10000,orderSn=2的延时5秒delay=5000

    2020-02-21 17:03:45.125  INFO [mq,b4e13b1dc86b0d7f,b4e13b1dc86b0d7f,true] 90220 --- [0.0-1205-exec-3] o.a.m.p.controller.ProductMqController   : sendProductHelpStatusMessage:{"delay":"10000","orderSn":"1","sendTime":"2020-02-21 17:03:45"}
    2020-02-21 17:03:51.769  INFO [mq,820007cc648d3e43,820007cc648d3e43,true] 90220 --- [0.0-1205-exec-4] o.a.m.p.controller.ProductMqController   : sendProductHelpStatusMessage:{"delay":"5000","orderSn":"2","sendTime":"2020-02-21 17:03:51"}
    2020-02-21 17:03:55.369  INFO [mq,b4e13b1dc86b0d7f,2b113702a181d49b,true] 90220 --- [StatusWxGroup-1] o.a.m.p.c.p.ProductHelpStatusConsumer    : receiveProductHelpStatusWxMessage:{"delay":"10000","orderSn":"1","sendTime":"2020-02-21 17:03:45"},receiveTime,2020-02-21 17:03:55
    2020-02-21 17:03:56.847  INFO [mq,820007cc648d3e43,60f5a71795043ac2,true] 90220 --- [StatusWxGroup-1] o.a.m.p.c.p.ProductHelpStatusConsumer    : receiveProductHelpStatusWxMessage:{"delay":"5000","orderSn":"2","sendTime":"2020-02-21 17:03:51"},receiveTime,2020-02-21 17:03:56
    
    

    如上订单1在 17:03:45发送消息,10秒后17:03:55消费者受到消息。订单2也在5秒后受到消息。

  • 相关阅读:
    preliminary->advanced exam selections
    Maven入门
    Ajax和Json
    过滤器和监听器
    JSTL标签库
    JSP与EL表达式
    dom4j与XML文档操作
    会话管理
    登录之验证码
    WEB之文件下载
  • 原文地址:https://www.cnblogs.com/mentalidade/p/12342111.html
Copyright © 2011-2022 走看看