zoukankan      html  css  js  c++  java
  • 微服务之消息驱动SpringCloudStream

    一、什么是消息驱动?

      消息驱动是一种统一消息编程模型,它的出现是为了屏蔽底层消息中间件的差异,降低切换消息中间件的成本。常见的消息中间件有ActiveMQ、RabbitMQ、RocketMQ、Kafka等,在这些技术之间切换需要花费大量的学习时间,我们可以通过消息驱动来适配绑定,在不同的消息中间件中切换,将不了解的中间件转换为自己拿手的消息中间件。同时也可以将微服务中多种消息中间件统一为一个中间件。

      SpringCloudStream是一种构建消息驱动微服务的框架,应用程序通过inputs 或者outputs来与Spring Cloud Stream中binder对象交互,binder对象负责与消息中间件交互,所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动。通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

      Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。但目前仅支持RabbitMQ、Kafka。以下是其架构图:

      理解了架构图之后,再来看看Spring Cloud Stream的标准执行流程图:

     二、Spring Cloud Stream的使用

      1、创建消息生产者(发送)模块:

        a.引入spring-cloud-starter-stream-rabbit包

    <dependency>
       <groupid>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

        b.配置application.yml文件

    server:
       port: 8801
    spring:
       application:
         name: cloud-stream-provider
    cloud: stream: binders: #在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: #表环定义的名称,用于于binding整合 type: rabbit #消息组件类型 environment: #设置rabbitmq的相关的环境配置
    spring: rabbitmq: host: localhost
    port:
    5672 username: guest
    password: guest bindings: #服务的整合处理 output: #这个名字是一个通道的名称 destination: studyExchange #表示要使用的Exchange名称定义 content
    -type: application/json #设置消息类型,本次为json,文本则设置"text/plain"
    binder: defaultRabbit #设置要绑定的消息服务的具体设置

        c.编写消息发送代码:新建一个ImessageProvider接口,在新建一个接口实现类MessageProviderImpl,编写如下代码,调用业务层调用sendmessage()方法即可发消息。

    /*消息发送者*/
    public interface IMessageProvider
    {
         public string sendmessage();
    }
    
    /*消息发送者实现类*/
    @EnableBinding(source.class) //定义消息的推送管道
    public class MessageProviderImpl implements IMessageProvider
    {
          @Resource
           private Messagechannel output; //消息发送管道
    
          @override
           public string sendmessage()
       {
           string serial = UUID.randomuuID().tostring();
           output.send(MessageBuilder.withPayLoad(serial).build()); //消息发送
           return null;
        }
    }

      2、创建消息消费者(接收)模块:

        a.消息接收模块也需要引入spring-cloud-starter-stream-rabbit包

        b.配置application.yml文件:配置方式与消息生产模块唯一的区别是bindings节点下,一个是output、一个是input。

    server:
       port: 8802
    spring:
       application:
         name: cloud-stream-consumer
       cloud:
         stream:
           binders: #在此处配置要绑定的rabbitmq的服务信息;
             defaultRabbit: #表环定义的名称,用于于binding整合
               type: rabbit #消息组件类型
               environment: #设置rabbitmq的相关的环境配置
                 spring:
                   rabbitmq:
                      host: localhost
                      port: 5672
                      username: guest
                      password: guest
           bindings: #服务的整合处理
             input: #这个名字是一个通道的名称
                destination: studyExchange #表示要使用的Exchange名称定义
                content-type: application/json #设置消息类型,本次为json,文本则设置"text/plain"
                binder: defaultRabbit #设置要绑定的消息服务的具体设置

        c.编写消息接收代码:新建一个消息接收业务类ReceiveMessageListenerController,编写如下代码。

    @Component
    @EnableBinding(sink.class)
    public class ReceiveMessageListenerController
    {
          @value("${server.port}")
          private string serverPort;
    
           @streamListener(sink.INPUT)
           public void input(Message<string> message) //消息参数类型要与发送端参数类型一致
      {
           system.out.println("接受消息:"+message.getPayload());
       }
    }

      完成以上步骤,则消息生产者和消费者都创建完成,启动微服务,调用生产者sendmessage()方法,消费者input()方法将会执行获取到消息。同时可以访问RabbitMQ可视化页面http://localhost:5672,登录进去将在Exchanges栏目查看到消息波峰图表如下。

     三、分组消费与持久化

      在以上消息消费的过程中,需要考虑重复消费、消息丢失和持久化的问题。

      重复消费:在多个集群的微服务架构中,每个模块都会从RabbitMQ中获取消息,如果一个消息同时被多个相同的微服务模块获取到,就会造成重复消费(比如同一订单同时被两个支付模块获取到后会造成重复扣款),我们有时候需要避免产生这种情况,于是Stream中的消息分组派上用场了。在stream中同一个group中的多个消费者存在竞争关系,一条消息只能被其中的一个消费者获取到,因此我们需要将不能同时消费的这些服务模块放到同一group中。默认情况下,每一个微服务都属于一个独立的组(不同的组的模块是可以重复消费同一消息的),因此他们会重复消费。配置多个消费者模块在同一个组的方式是修改这些模块的yml配置文件,配置 " bindings.input.group=相同的组名 "即可。

      持久化和消息丢失:

        a.如果一个消费者去掉分组后重启,则重启后不会再次获取未被消费的消息,发生消息丢失现象。

        b.如果一个消费者不移除分组直接重启,则重启后会重新获取未被消费的消息,这是就是消息的持久化。

  • 相关阅读:
    读 《异类》- 作者:[加拿大] 马尔科姆·格拉德威尔 有感
    docker常用操作命令
    MySQL 使用规范
    js 字符串转json对象
    Mybatis 工作原理
    JDBC连接配置
    Java 线程基础
    数组与链表
    Java 内部类
    MySQL 去重
  • 原文地址:https://www.cnblogs.com/zqhIndex/p/15408656.html
Copyright © 2011-2022 走看看