zoukankan      html  css  js  c++  java
  • RocketMQ进阶-延时消息

    前言

    在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为 延时任务

    那么如何实现延迟任务呢?

    第一反应是利用cron方案来实现:

    启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。

    cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:

    • 当数据量大的时候轮询效率低;

    • 时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;

    • 如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;

    既然cron方案不是很理想,那就请出我们今天的主角,使用RocketMQ的延时消息解决。在创建订单的时候发送一条延时消息到RocketMQ,30分钟后消费者消费消息去检查订单的状态,如果发现订单未支付则取消订单释放库存。

    实现

    RocketMQ延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。

    注意:RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

    下面我们结合SprintBoot利用RocketMQ发送延时消息

    • 引入RocketMQ组件

    <dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-spring-boot-starter</artifactId>
    </dependency>
    
    • 增加RocketMQ的配置

    rocketmq:
      name-server: 172.31.0.44:9876
      producer:
        group: delay-group
    
    • 编写生产者

    @Component
    @Slf4j
    public class DelayProduce {
        @Autowired
        private RocketMQTemplate rocketMQTemplatet;
    
        public void sendDelayMessage(String topic,String message,int delayLevel){
           SendResult sendResult = rocketMQTemplatet.syncSend(topic, MessageBuilder.withPayload(message).build(), 2000, delayLevel);
            log.info("sendtime is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
            log.info("sendResult is{}",sendResult);
        }
    }
    
    • 编写消费者

    @Slf4j
    @Component
    @RocketMQMessageListener(
            topic = "delay-topic",
            consumerGroup = "delay-group"
    )
    public class DelayConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            log.info("received message time is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
            log.info("received message is {}",message);
        }
    }
    
    • 测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class DelayProduceTest {
        @Autowired
        private DelayProduce delayProduce;
    
        @Test
        public void sendDelayMessage() {
            delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",5);
        }
    }
    

    这里delayLevel设置成5,对应RocketMQ的延时等级就是1分钟后投递消息。

    • 运行结果发送时间消费时间

    修改延时级别

    RocketMQ的延迟等级可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持1天的延迟,修改最后一个level的值为1d,这个时候依然是18个level;也可以增加一个1d,这个时候总共就有19个level。

    • 打开RocketMQ的配置文件,修改 messageDelayLevel 属性

    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    storePathRootDir = /app/rocketmq/data
    messageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    

    这次将延时等级1修改成了90s,生产者发送消息后需要90s后再进行消息投递。修改完成后重启RocketMQ。nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &

    • 使用延时等级1发送消息

    public void sendDelayMessage() {
     delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",1);
    }
    
    • 测试发送时间消费时间

    通过比对发送时间与消费时间证明延时等级修改生效。

    RocketMQ 相关文章

    • RocketMQ 入门基础 - 环境 & 整合

    • RocketMQ进阶-事务消息

    好了,各位朋友们,本期的内容到此就全部结束啦,能看到这里的同学都是优秀的同学,下一个升职加薪的就是你了!
    如果觉得这篇文章对你有所帮助的话请扫描下面二维码加个关注。"转发" 加 "在看",养成好习惯!咱们下期再见!

    热文推荐

    http://www.javadaily.cn

  • 相关阅读:
    Centos7更改网卡名为eth0
    Centos7部署Open-Falcon监控
    centos6.x一键15项系统优化(转自努力哥)
    运维题目(十三)
    运维题目(十二)
    Mongodb的学习整理(下)
    Centos7下yum安装mongodb
    浏览器缓存
    控制反转
    js setTimeOut()
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13295879.html
Copyright © 2011-2022 走看看