zoukankan      html  css  js  c++  java
  • springcloud12-spring cloud stream

    1.基础说明

      官网:https://spring.io/projects/spring-cloud-stream#overview

        文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/

        中文文档:https://m.wang1314.com/doc/webapp/topic/20971999.html

      spring cloud stream是一个用于构建消息驱动微服务的框架。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置
      简单来说,spring cloud stream使我们能够更为方便的操作消息中间件,他现在支持RabbitMQ Kafka。在开发中 ,只需要通过cloud stream就可以操作两个消息中间件。

    1.1消息中间件架构

     1.2cloud stream架构

      在没有cloud stream的时候,我们需要直接和消息中间件进行交互。由于各种消息中间件的实现细节存在较大的差异,这些中间件的差异导致我们实际项目开发给我们造成了一定的困扰,如果我们适应了两个消息中间件中的一种,后面的业务需求,我们向往另外一种消息中间件进行迁移,这时候是很复杂的,一大堆东西需要重做,因为它和系统耦合了,spring cloud stream给我们提供了一种解耦的方式。

      使用Stream,通过定义绑定器作为中间层,实现了应用和消息中间件细节之间的隔离。通过向应用程序暴露同意的channel通道,使得应用程序不需要考虑各种不同的消息中间件的实现。

      应用程序通过input 和output来和binder交互,bingder对象负责和消息中间件进行交互。

     2.基本使用

    2.1说明

      消息中间件采用的rabbitmq

      一个生产者cloud-stream-rabbitmq-provider8801,两个消费者(消费同样的主题)cloud-stream-rabbitmq-consumer8802和cloud-stream-rabbitmq-consumer8803

    2.2创建生产者cloud-stream-rabbitmq-provider8801

    2.2.1项目

      

     2.2.2依赖

      

        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>

    完整

     <dependencies>
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
    
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
    
    
            <dependency>
                <groupId>com.atguigu.springcloud</groupId>
                <artifactId>cloud-api-commons</artifactId>
                <version>${project.version}</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web  -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <scope>runtime</scope>
                <optional>true</optional>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
        </dependencies>

    2.2.3配置文件

    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider  #将会作为注册到注册中心的服务名称
      cloud:
        stream:
          binders:  #在此处配置要绑定的rabbitmq的服务信息;  可以配置多个
            mydefaultRabbit:  #表示定义的名称,用于于binding整合   这个名称随意
              type: rabbit  #消息组件类型
              environment:  # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
    
          bindings: # 服务的整合处理
            myoutput1: # 这个名字是一个通道的名称  名字随意 可以是多个
               destination: studyExchange1  # 表示要使用的Exchange名称定义
               content-type: application/json  # 设置消息类型,本次为json,文本则设置“text/plain”
               #group: studyExchange1_group1
    
            myoutput2: # 这个名字是一个通道的名称  名字随意 可以是多个
              destination: studyExchange2  # 表示要使用的Exchange名称定义
              content-type: application/json  # 设置消息类型,本次为json,文本则设置“text/plain”
              #group: studyExchange2_group1
    
    eureka:
      client:
        service-url:  #注册中心地址
           defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka  #集群版
      instance:
        instance-id: send-8801.com   #服务显示名称
        prefer-ip-address: true #是否显示ip
        lease-renewal-interval-in-seconds: 20 #服务项注册中心发送心跳的默认间隔时间,单位为秒(默认是30秒)
        lease-expiration-duration-in-seconds: 80 #注册中心最后一次收到心跳,等待的最长时间,单位是秒,默认90秒

    stream相关配置信息说明:

      spring.cloud.stream.binders:

        mydefaultRabbit:这个就是自己随便取一个名字

          type:消息组件类型,这里是rabbit

          environment: 设置消息中间件相关信息

            spring.rabbit:

              host:rabbit地址

              port:rabbit端口

              username:rabbit账号

              username:rabbit密码

        

      spring.cloud.stream.bindings: 定义主题通道,可以定义多个,这里定义了两个myoutput1和myoutput2

        myoutput1:通道名称

          destinationExchange名称定义

          content-type:消息格式

           group:分组,这个和消息重复消费有关,后面会说明

    2.2.4主启动类

    package com.atguigu.springcloud;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    
    @SpringBootApplication
    @EnableEurekaClient  //标识自己是Eureka客户端(也就是一个服务)
    public class StreamMQMain8801 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8801.class,args);
        }
    }

    2.2.5自定义通道

    package com.atguigu.springcloud.service;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.stereotype.Component;
    import org.springframework.stereotype.Service;
    
    /**
     * @Classname IMessageProvider
     * @Description TODO
     * @Date 2021/5/21 0021 下午 3:20
     * @Created by jcc
     */
    
    
    public interface IMessageProvider1 {  //定义通道
    
        public String TEST_OUT_PUT1 = "myoutput1";  //这里对应的值是配置文件里面的myoutput1:通道名称
    
        @Output(TEST_OUT_PUT1)
        MessageChannel output1();
    
    
    
        public String TEST_OUT_PUT2 = "myoutput2";
    
        @Output(TEST_OUT_PUT2)
        MessageChannel output2();
    
    
    }
    public String TEST_OUT_PUT1 = "myoutput1";
    public String TEST_OUT_PUT2 = "myoutput2";
    这两个值要和配置文件中对应,表示自定义的两个通道

     2.2.6业务类发送消息

    package com.atguigu.springcloud.service;
    
    /**
     * @Classname MessageProviderService
     * @Description TODO
     * @Date 2021/5/24 0024 上午 10:08
     * @Created by jcc
     */
    public interface MessageProviderService {
    
        String send1();
    
        String send2();
    
    }
    
    
    package com.atguigu.springcloud.service.impl;
    
    import com.atguigu.springcloud.service.IMessageProvider1;
    import com.atguigu.springcloud.service.MessageProviderService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.util.UUID;
    
    
    
    /**
     * @Classname MessageProviderImpl
     * @Description TODO
     * @Date 2021/5/21 0021 下午 3:20
     * @Created by jcc
     */
    
    
    @EnableBinding(IMessageProvider1.class) //定义消息的推送管道
    @Service("messageProviderServiceImpl1")
    public class MessageProviderServiceImpl1 implements MessageProviderService {
    
         @Resource
         private IMessageProvider1 iMessageProvider1;
    
        @Override
        public String send1() {
            String serial = UUID.randomUUID().toString();
            Message<String> build = MessageBuilder.withPayload(serial).build();
            iMessageProvider1.output1().send(build);
            System.out.println("*****通道1: "+serial);
            return null;
        }
    
    
        @Override
        public String send2() {
            String serial = UUID.randomUUID().toString();
            Message<String> build = MessageBuilder.withPayload(serial).build();
            iMessageProvider1.output2().send(build);
            System.out.println("*****通道2: "+serial);
            return null;
        }
    
    }
    
    
    @EnableBinding(IMessageProvider1.class) //定义消息的推送管道,里面存入的是我们刚才自定义通道的类的class对象
    在send1和send2方法中分别调用iMessageProvider1output1output2方法,表示使用通道myoutput1和通道myoutput2来发消息

    package com.atguigu.springcloud.controller;
    
    import com.atguigu.springcloud.service.MessageProviderService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.web.bind.annotation.*;
    
    import javax.annotation.Resource;
    
    @RestController
    @Slf4j
    public class PaymentController {
    
        @Autowired
        @Qualifier("messageProviderServiceImpl1")
        private com.atguigu.springcloud.service.MessageProviderService messageProviderService;
    
        @GetMapping(value = "/sendMessage1")
        public String sendMessage1()
        {
            return messageProviderService.send1();
         }
    
    
        @GetMapping(value = "/sendMessage2")
        public String sendMessage2()
        {
            return messageProviderService.send2();
        }
    }
    
    

    2.3创建消费者cloud-stream-rabbitmq-consumer8802

    2.3.1项目

    2.3.2依赖

        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>

    完整

    <dependencies>
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
    
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
    
    
            <dependency>
                <groupId>com.atguigu.springcloud</groupId>
                <artifactId>cloud-api-commons</artifactId>
                <version>${project.version}</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web  -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <scope>runtime</scope>
                <optional>true</optional>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
        </dependencies>

    2.3.3配置文件

      server:
        port: 8802
    
    
      spring:
        application:
          name: cloud-stream-consumer  #将会作为注册到注册中心的服务名称
        cloud:
          stream:
            binders:  #在此处配置要绑定的rabbitmq的服务信息;  可以配置多个
              mydefaultRabbit:  #表示定义的名称,用于于binding整合   这个名称随意
                type: rabbit  #消息组件类型
                environment:  # 设置rabbitmq的相关的环境配置
                  spring:
                    rabbitmq:
                      host: localhost
                      port: 5672
                      username: guest
                      password: guest
    
            bindings: # 服务的整合处理
              myintput1: # 这个名字是一个通道的名称  名字随意 可以是多个
                destination: studyExchange1  # 表示要使用的Exchange名称定义
                content-type: application/json  # 设置消息类型,本次为json,文本则设置“text/plain”
                #group: studyExchange1_group1
    
              myintput2: # 这个名字是一个通道的名称  名字随意 可以是多个
                destination: studyExchange2  # 表示要使用的Exchange名称定义
                content-type: application/json  # 设置消息类型,本次为json,文本则设置“text/plain”
                #group: studyExchange2_group1
    
    
    
      eureka:
        client:
          service-url:  #注册中心地址
            defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka  #集群版
        instance:
          instance-id: send-8801.com   #服务显示名称
          prefer-ip-address: true #是否显示ip
          lease-renewal-interval-in-seconds: 20 #服务项注册中心发送心跳的默认间隔时间,单位为秒(默认是30秒)
          lease-expiration-duration-in-seconds: 80 #注册中心最后一次收到心跳,等待的最长时间,单位是秒,默认90秒
    这里配置和生成者差不多,只需要注意destination属性要和生产者的destination属性对应,才能接收到生产者发送的消息

    2.3.4主启动类
    package com.atguigu.springcloud;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    
    @SpringBootApplication
    @EnableEurekaClient  //标识自己是Eureka客户端(也就是一个服务)
    public class StreamMQMain8802 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8802.class,args);
        }
    }
    
    

    2.3.5自定义通道

    package com.atguigu.springcloud.controller;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    /**
     * @Classname InputService
     * @Description TODO
     * @Date 2021/5/21 0021 下午 5:10
     * @Created by jcc
     */
    public interface IMessageProvider1 {
    
        String TEST_IN_PUT1 = "myintput1";
    
        @Input(TEST_IN_PUT1)
        SubscribableChannel testInPut1();
    
    
        String TEST_IN_PUT2 = "myintput2";
    
        @Input(TEST_IN_PUT2)
        SubscribableChannel testInPut2();
    
    }

    2.3.6业务类接收消息

    package com.atguigu.springcloud.controller;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    /**
     * @Classname ReceiveMessageListenerController
     * @Description TODO
     * @Date 2021/5/21 0021 下午 3:36
     * @Created by jcc
     */
    
    @Component
    @EnableBinding(IMessageProvider1.class)
    public class ReceiveMessageListenerController1 {
    
        @Value("${server.port}")
        private String serverPort;
    
        @StreamListener(IMessageProvider1.TEST_IN_PUT1)
        public void input1(Message<String> message) {
            System.out.println("消费者1号-input1,接受:"+message.getPayload()+"	 port:"+serverPort);
        }
    
        @StreamListener(IMessageProvider1.TEST_IN_PUT2)
        public void input2(Message<String> message) {
            System.out.println("消费者1号-input2,接受:"+message.getPayload()+"	 port:"+serverPort);
        }
    }
    @EnableBinding(IMessageProvider1.class) 里面传入刚才的自定义通道类
    @StreamListener 传入通道类的里面定义的额通道名称

    2.3.7生产者 cloud-stream-rabbitmq-consumer8803
      
    和8802一模一样,除了端口改为8803


    3.测试
    3.1访问 http://localhost:8801/sendMessage1
    3.1.1生产者控制台打印

     
    3.1.2消费者1控制台打印

     3.1.3消费者2控制台打印

     3.1.4mq控制页面

     4.存在问题

      在这里,我们使用生产者发送了一个消息,两个消费者都接收到了,如果这两个消费者是同一个服务的集群,只需要一个消费者受到消息就可以了,那么这里就重复消费了

    4.1处理分组

    4.1.1生产者配置修改 

    bindings: # 服务的整合处理
            myoutput1: # 这个名字是一个通道的名称  名字随意 可以是多个
               destination: studyExchange1  # 表示要使用的Exchange名称定义
               content-type: application/json  # 设置消息类型,本次为json,文本则设置“text/plain”
               group: studyExchange1_group1
    
            myoutput2: # 这个名字是一个通道的名称  名字随意 可以是多个
              destination: studyExchange2  # 表示要使用的Exchange名称定义
              content-type: application/json  # 设置消息类型,本次为json,文本则设置“text/plain”
              group: studyExchange2_group1

    接入group属性

    4.1.2两个消费者哦诶之修改

    bindings: # 服务的整合处理
              myintput1: # 这个名字是一个通道的名称  名字随意 可以是多个
                destination: studyExchange1  # 表示要使用的Exchange名称定义
                content-type: application/json  # 设置消息类型,本次为json,文本则设置“text/plain”
                group: studyExchange1_group1
    
              myintput2: # 这个名字是一个通道的名称  名字随意 可以是多个
                destination: studyExchange2  # 表示要使用的Exchange名称定义
                content-type: application/json  # 设置消息类型,本次为json,文本则设置“text/plain”
                group: studyExchange2_group1

    加入group属性,注意,这里的group的属性和生产者对应上

    配置完成后,生成者通道1的group是studyExchange1_group1,两个消费者通道1的group也是studyExchange1_group1

    此时,连个消费者就会轮询接收消息

    分组后,还有一个好处,如果生产者发送消息,此时消费者刚好挂了,消费者重新启动后,会根据分组去获取消息接收。如果不分组,这个消息就会丢失

    4.2测试

    4.2.1访问

      http://localhost:8801/sendMessage2

    4.2.2生产者打印

    4.2.3消费者1打印

    4.2.4消费者2打印

      

    4.2.5再次访问

      http://localhost:8801/sendMessage2

    4.2.6生产者打印

    4.2.7消费者1打印

    4.2.4消费者2打印

      

    可以看到,此时两个消费者轮询消费

    
    
    
    
    
     

      

  • 相关阅读:
    jquery 兼容的滚轮事件
    HTML5的manifest 本地离线缓存
    jquery.qrcode.js 生成二维码
    bootstrap modal垂直居中 (转)
    require.js Javascript模块化
    基于特征检测(SURF,SIFT方法)与特征匹配(Feature Matching)(FLANN方法)来寻找目标
    自动跟踪足球场上所有的选手
    python3.7+opencv3.4.1
    神经网络
    使用Python+OpenCV进行图像模板匹配(Match Template)
  • 原文地址:https://www.cnblogs.com/jthr/p/14793877.html
Copyright © 2011-2022 走看看