zoukankan      html  css  js  c++  java
  • SpringCloud(七)Stream消息驱动

    Stream消息驱动

    概述

    屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
    官网:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.4.RELEASE/reference/html/

    官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架
    应用程序通过 inputs 或者 outputs 来与Spring Cloud Stream中的 binder对象交互,通过配置来binding(绑定),而 Spring Cloud Stream 的 binder 对象负责与消息中间件交互,所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式
    通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动
    Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已建立和熟悉的Spring和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念

    目前仅支持RabbitMQ、Kafka

    设计思想

    标准MQ

    生产者/消费者之间靠消息媒介(Message)传递信息内容
    消息必须走特定的通道(消息通道MessageChannel)
    消息通道里的消息如何被消费呢,谁负责收发处理消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

    使用原因

    比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区
    这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式
    在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性
    通过定义绑定器 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离
    通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件实现
    Stream中的消息通信方式遵循了发布-订阅模式,Topic主题进行广播,在RabbitMQ就是Exchange,在Kafka中就是Topic

    基本流程

    • Binder:很方便的连接中间件,屏蔽差异
    • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
    • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入

    常用API和注解

    • Middleware:中间件,目前只支持RabbitMQ和Kafka
    • Binder:Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
    • @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序
    • @Output:注解标识输出通道,发布的消息将通过该通道离开应用程序
    • @StreamListener:监听队列,用于消费者的队列的消息接收
    • @EnableBinding:指信道channel和exchange绑定在一起

    基本构建

    新建三个子模块,一个作为生产者进行发送消息模块,两个作为消息接收模块

    服务端

    1. 导入 pom 依赖
    <!-- spring-cloud-starter-stream-rabbit -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    
    1. 修改 yml 配置文件
    server:
      port: 8801
    
    spring:
      application:
        name: stream-rabbitmq-provider
      cloud:
        stream:
          # 配置要绑定的 rabbitmq 的服务信息
          binders:
            # 表示定义的名称,用于与 binding 整合
            defaultRabbit:
              # 消息组件类型
              type: rabbit
              # 设置 rabbitmq 相关配置环境
              enviroment:
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: admin
                    password: 123456
          # 服务整合处理
          bindings:
            # 通道名称
            output:
              # 表示要使用的 Exchange 名称定义
              destination: studyExchange
              # 设置消息类型,本次为为 json,文本则设置“text/plain”
              content-type: application/json
                binder: defaultRabbit
    
    1. 业务类
      发送消息的接口
    public interface MessageProvider {
        /**
         * 消息发送
         *  @return :返回值
         */
        Message<?> send();
    }
    

    发送消息接口的实现类

    @EnableBinding(Source.class)
    public class MessageProviderImpl implements MessageProvider {
    
        /**
         * @ InboundChannelAdapter
         * 作用:表示定义的方法能产生消息
         * fixedDelay:多少毫秒发送1次
         */
        @Override
        @InboundChannelAdapter(channel = Source.OUTPUT,poller = @Poller(fixedDelay = "10000"))  // 每隔10秒发送一次
        public Message<String> send() {
            String serial = UUID.randomUUID().toString();
            return MessageBuilder.withPayload(serial)
                    .build();
        }
    }
    

    Controller 层

    @RestController
    public class SendMessageController {
    
        @Resource
        private MessageProvider provider;
    
        @GetMapping(value = "/senMessage")
        public Message senMessage(){
            return provider.send();
        }
    }
    

    消费者

    1. 导入 pom 依赖
    <!-- spring-cloud-starter-stream-rabbit -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    
    1. 修改 yml 配置文件
    # 服务整合处理
    bindings:
      # 通道名称
      input:
        # 表示要使用的 Exchange 名称定义
        destination: studyExchange
        # 设置消息类型,本次为为 json,文本则设置“text/plain”
        content-type: application/json
        binder: defaultRabbit
    
    1. 业务类
    @RestController
    public class SendMessageController {
    
        @Resource
        private MessageProvider provider;
    
        @GetMapping(value = "/senMessage")
        public Message senMessage(){
            return provider.send();
        }
    }
    

    分组消费和持久化

    当使用两个消费者来进行接收消息时,会出现两个问题:重复消费和消息持久化的问题

    重复消费

    目前是8802/8803同时都收到了,存在重复消费问题

    解决方法:分组和持久化属性Group
    比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息
    那如果一个订单同时被两个服务获取到,那么就会造成数据错误,为了避免这种情况这时我们就可以使用Stream中的消息分组来解决

    注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次,不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费

    分组

    微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费
    自定义分组
    修改消费者的 yml 文件,新增一个 group 的属性

    # 服务整合处理
    bindings:
      # 通道名称
      input:
        # 表示要使用的 Exchange 名称定义
        destination: studyExchange
        # 设置消息类型,本次为为 json,文本则设置“text/plain”
        content-type: application/json
        binder: defaultRabbit
       group: Consumer
    

    分布式微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例,这里举例实现两个消费微服务
    多数情况,生产者发送消息给某个具体微服务时只希望被消费一次,按照上面启动两个应用的例子,虽然它们同属一个应用,但是这个消息出现了被重复消费两次的情况。为了解决这个问题,在Spring Cloud Stream中提供了消费组的概念
    实现轮询分组,每次只有一个消费者,生产者发送的消息只能被一个消费者接收到,避免重复消费

    将案例的两个消费者变成相同组
    同一个组的多个微服务实例,每次只会有一个拿到

    持久化

    配置好 group 这个属性后可以发现
    当消费者发生一些错误停止服务时,但是此时的生产者还在不断的发送消息,如果消费者没有配置 group ,那么这些消息就被错过了
    当配置好 group 这个属性,消费者就算发生一些错误停止服务,再启动时,就会获取到之前停止服务期间生产者发来的消息

  • 相关阅读:
    妹妹
    小猴和北极熊
    盛趣->盛大
    运维
    操之过急
    修马路
    博人传
    醉酒
    【跨域】SpringBoot跨域,拦截器中,第一次获取的请求头为NULL,发送两次请求的处理方式
    【Linux】Linux安装Tomcat
  • 原文地址:https://www.cnblogs.com/hewenhao-blogs/p/14733422.html
Copyright © 2011-2022 走看看