zoukankan      html  css  js  c++  java
  • Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)

    应用场景

    通常在应用开发中我们会碰到定时任务的需求,比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。

    许多同学的第一反应就是通过spring的schedule定时任务轮询数据库来实现,这种方案有一下几点劣势:

    (1)消耗系统内存,由于定时任务一直在系统中占着进程,比较消耗内存

    (2)增加了数据库的压力,这个提现在两方面,一是长时间占着数据库的连接,而是查询基数大

    (3)存在较大的时间误差

    如果我们利用第三方插件如rabbitmq来实现,就可以解决以上几种问题。

    对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次。然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送。对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理。

    为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用。RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍以下如何利用Spring Cloud Stream以及RabbitMQ轻松的处理上述问题。

    RabbitMQ延迟消息的插件安装

    关于RabbitMQ延迟消息的插件介绍可以查看官方网站:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

    安装方式很简单,只需要在这个页面:http://www.rabbitmq.com/community-plugins.html 中找到rabbitmq_delayed_message_exchange插件,根据您使用的RabbitMQ版本选择对应的插件版本下载即可。

    注意:只有RabbitMQ 3.6.x以上才支持

    在下载好之后,解压得到.ez结尾的插件包,将其复制到RabbitMQ安装目录下的plugins文件夹。

     

     然后cd到sbin目录下,启动插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

     

     

     启动成功,不需要重启rabbitmq

     

    接下来,代码使用

    config中心加入配置

    # 消息队列配置
    spring:
      cloud:
    #    spring cloud strem  消息队列分组持久化(Input输入通道)
        stream:
          rabbit:
            bindings:
              commodityOrderInvalidInput:
                consumer:
                  delayed-exchange: true
              commodityOrderInvalidOutput:
                producer:
                  delayed-exchange: true
          bindings:
            #延时的待付款订单
            commodityOrderInvalidInput: #通道名
              group: commodityOrderInvalidGroup #组名
              destination: commodityOrderInvalidTheme #主题名
            commodityOrderInvalidOutput:
              destination: commodityOrderInvalidTheme #指定生产者的通道主题
    注意这里的一个新参数spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange,用来开启延迟消息的功能,这样在创建exchange的时候,会将其设置为具有延迟特性的exchange,也就是用到上面我们安装的延迟消息插件的功能。
    在消费端也一样,需要设置spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true

     

     

     

     

    消费端

     

     

     生产端

     

     一条消息的头信息中包含了x-delay字段,该字段用来指定消息延迟的时间,单位为毫秒。所以上述代码发送的消息会在7秒之后被消费

     

     

    可以发现,该通道的type不一样

    如果报以下错误

    ERROR [o.s.a.rabbit.connection.CachingConnectionFactory] CachingConnectionFactory.java:1517 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'commodityOrderInvalidTheme' in vhost '/': received ''x-delayed-message'' but current is 'topic', class-id=40, method-id=10)

     

    那是之前没配置正确,通道创建的type类型是topic的

    解决方式,检查配置是否正确,修改后,删除之前的通道或者修改通道的名称,重新启动创建通道,查看rabbitmq控制台,通道type类型为 x-delayed-message,说明成功了

     

  • 相关阅读:
    qt 问题及处理
    windows 依赖查看
    java基础知识
    Linux 相关
    OutLook中添加Exchange失败问题
    开源协议简介
    grail开发环境的搭建
    node+mongodb+WP构建的移动社交应用源码 分享
    INotifyPropertyChanged接口的详细说明
    WP8.1开发:后台任务详解(求推荐)
  • 原文地址:https://www.cnblogs.com/suruozhong/p/12010694.html
Copyright © 2011-2022 走看看