zoukankan      html  css  js  c++  java
  • 十五、Spring Cloud Stream 消息驱动

    clipboard

    1、消息驱动概述

    1)是什么?

    一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型


    什么是SpringCloudStream

    官方定义的SpringCloudStream是一个构建消息驱动微服务的框架


    应用程序通过inputs或者outputs来与SpringCloudStream中的binder对象交互。

    通过我们来配置binding(绑定),Spring Cloud Stream的binder对象负责与消息中间件交互。

    所以,我们只需要弄清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式


    通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动

    SpringCloudStream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了 发布-订阅,消费组,分区的三个核心概念

    目前支持 RabbitMQ和 kafka


    2)Spring Cloud Stream 中文指导手册

    http://m.wang1314.com/doc/webapp/topic/20971999.html


    2、设计思想

    1)标准的MQ

    clipboard


    2)为什么需要Spring Cloud Stream?

    RabbitMQ,kafka这些消息中间件的差异性给我们实际项目开发造成了一定的困扰,如果我们使用两个消息队列其中的一种,后期因为业务需求,我们想往另外一种消息队列进行迁移,这无疑是灾难性的,一大堆东西要推倒重做,因为它

    和我们的系统耦合了,这时候SpringCloud Stream给我们提供了一种解耦的方式

    clipboard


    3)Spring Cloud Stream 标准流程套路

    clipboard

    Binder:方便的连接中间件,屏蔽差异

    Channel:通道,队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,

    通过Channel对队列进行配置

    Source 和 Sink:从stream 发布消息就是输出,接受消息就是输入


    4)编码api和常用注解

    clipboard


    3、编码实现demo

    新建三个工程

    clipboard

    1 )构建消息驱动的生产者

    项目结构

    clipboard

    ①、pom文件

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--基础配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>


    ②、application.xml

    server:
      port: 8801
    spring:
      application:
        name: clou-stream-provider
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            output: # 这个名字是一个通道的名称
              destination: studyExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
              binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    
    eureka:
      client: # 客户端进行Eureka注册的配置
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
        lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
        instance-id: 127.0.0.1  # 在信息列表时显示主机名称
        prefer-ip-address: true     # 访问的路径变为IP地址


    ③、主启动类

    @SpringBootApplication
    public class StreamMqMain8801 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMqMain8801.class,args);
        }
    }


    ④、业务类

    @RestController
    public class SendMessageController {
    
        @Autowired
        private IMessageProvider iMessageProvider;
    
        @GetMapping("/sengMessage")
        public String sendMessage(){
            return iMessageProvider.send();
        }
    }
    
    ====================================================================
    @EnableBinding(Source.class)  //定义消息的推送管道
    public class MessageProviderImpl implements IMessageProvider {
    
        @Resource
        private MessageChannel output;
    
        @Override
        public String send() {
    
            String serial = UUID.randomUUID().toString();
            output.send(MessageBuilder.withPayload(serial).build());
            System.out.println("serial******************"+serial);
            return null;
        }
    }


    ⑤、查看RabbitMQ,交换机上出现了我们yml文件上定义的 studyExchange

    clipboard

    访问http://localhost:8801/sengMessage

    clipboard


    2)消息驱动之消费者

    项目结构:

    clipboard

    ① 、pom文件

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--基础配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>


    ②、application.yml

    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            input: # 这个名字是一个通道的名称
              destination: studyExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
              binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    
    
    
    eureka:
      client: # 客户端进行Eureka注册的配置
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
        lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
        instance-id: 127.0.0.1  # 在信息列表时显示主机名称
        prefer-ip-address: true     # 访问的路径变为IP地址


    ③、主启动类

    @SpringBootApplication
    public class StreamMqMain8802 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMqMain8802.class, args);
        }
    }


    ④、业务类

    @Component
    @EnableBinding(Sink.class)
    public class ReceiveMessageController {
    
        @Value("${server.port}")
        private String serverPort;
    
        @StreamListener(Sink.INPUT)
        public void input(Message<String> message){
            System.out.println("消费者1号-"+serverPort+"接收到消息:"+message.getPayload());
        }
    }


    ⑤、测试:

    访问:http://localhost:8801/sengMessage

    clipboard

    clipboard


    4、分组消费与持久化

    1)重复消费问题

    生产者生产一条消息后,两个消费者 8802 和 8803 都收到了消息

    clipboard

    clipboard


    会有什么后果:

    clipboard


    8802服务和8803服务默认处于不同的组

    clipboard


    2)分组

    ①、

    微服务放置在同一个group中,就能保证消息只能被其中的一个应用消费一次,

    同一个group内会发生竞争关系,只有其中一个可以消费

    不同组的应用可以重复消费同一个消息


    ②、自定义分组:

    clipboard

    将8802 和 8803分配到同一个组:

    clipboard

    clipboard

    clipboard


    3)持久化

    添加了分组,就自动实现了持久化,当启动了服务消费者,会自动从交换机中读取消息

    当8802去掉group配置后,重启,不会读取交换机中缓存的消息

  • 相关阅读:
    Java正则表达式教程及示例
    MySQL 事务
    MySQL 正则表达式
    常用函数 __MySQL必知必会
    使用MySQL正则表达式 __MySQL必知必会
    首先使用flex制作table
    javascript原生调用摄像头
    网页背景图片随机
    网页背景视频的实现
    网站无法显示logo?
  • 原文地址:https://www.cnblogs.com/houchen/p/14907397.html
Copyright © 2011-2022 走看看