zoukankan      html  css  js  c++  java
  • SpringCloud Stream消息驱动

    一、概述

    1、简介

    ​ Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动型微服务。该框架提供了一个灵活的编程模型,该模型建立在已经建立并熟悉的Spring习惯用法和最佳实践的基础上,包括对持久性pub / sub语义,使用者组和有状态分区的支持。可以屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。官方文档地址:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.3.RELEASE/reference/html/

    2、设计思想

    ​ 通过定义绑定器作为中间件,实现应用程序与消息中间件细节之间的隔离,遵循发布/订阅模式。架构如下:

    说明:

    组成 说明
    Middleware 中间件,目前只支持RibbitMQ和Kafaka
    Binder Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
    @Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
    @Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
    @StreamListener 监听队列,用于消费者的队列的消息接收
    @EnableBinding 指信道channel和exchange绑定在一起

    二、实现步骤

    1、生产者配置

    • 新建一个模块,在pom中添加以下依赖;

      <dependencies>
          <dependency>
              <groupId>org.springframework.cloud</groupId>
              <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
          </dependency>
          <dependency>
              <groupId>org.springframework.cloud</groupId>
              <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
          </dependency>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
          </dependency>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-actuator</artifactId>
          </dependency>
          <!--热部署-->
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-devtools</artifactId>
              <scope>runtime</scope>
              <optional>true</optional>
          </dependency>
          <dependency>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
              <optional>true</optional>
          </dependency>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
          </dependency>
      </dependencies>
      
    • 配置application.yml文件

      server:
        port: 8801
      
      spring:
        application:
          name: cloud-stream-provider
        cloud:
          stream:
            binders: #配置需要绑定的rabbitmq 的服务信息
              defaultRabbit: #表示定义的名称,用于与binding整合
                type: rabbit #消息组件类型
                environment: #设置rabbitmq的相关环境
                  spring:
                    rabbitmq:
                      host: localhost
                      port: 5672
                      username: guest
                      password: guest
            bindings: #服务的整合处理
              output: #通道名称
                destination: studyExchange #要使用的Exchange名称定义
                content-type: application/json #设置消息类型
                binder: defaultRabbit #设置要绑定的消息服务的具体设置
      
      eureka:
        client:
          service-url:
            defaultZone: http://localhost:7001/eureka
        instance:
          lease-renewal-interval-in-seconds: 2 #设置心跳的间隔时间(默认30秒)
          lease-expiration-duration-in-seconds: 5 #间隔时间
          instance-id: sned-8801.com #信息列表显示主机名称
          prefer-ip-address: true #显示ip地址
      
    • 定义一个消息推送类;

      @EnableBinding(Source.class)//定义消息的推送管道
      public class MessageProviderImpl implements IMessageProvider {
      
          @Resource
          private MessageChannel output;//消息发送管道
          public String send() {
              String serial = UUID.randomUUID().toString();
              output.send(MessageBuilder.withPayload(serial).build());
              System.out.println("************"+serial);
              return serial;
          }
      }
      
    • 控制类调用上面的消息推送类;

      @Resource
      private IMessageProvider messageProvider;
      
      @GetMapping("/sendMessage")
      public String sendMessage(){
          return messageProvider.send();
      }
      

    2、消费者配置

    • 新建一个模块,在pom中添加依赖(同生产者一样);

    • 配置application.yml文件,只需修改 output 为 input;

      server:
        port: 8802
      
      spring:
        application:
          name: cloud-stream-consumer
        cloud:
          stream:
            binders: #配置需要绑定的rabbitmq 的服务信息
              defaultRabbit: #表示定义的名称,用于与binding整合
                type: rabbit #消息组件类型
                environment: #设置rabbitmq的相关环境
                  spring:
                    rabbitmq:
                      host: localhost
                      port: 5672
                      username: guest
                      password: guest
            bindings: #服务的整合处理
              input: #通道名称
                destination: studyExchange #要使用的Exchange名称定义
                content-type: application/json #设置消息类型
                binder: defaultRabbit #设置要绑定的消息服务的具体设置
      
      eureka:
        client:
          service-url:
            defaultZone: http://localhost:7001/eureka
        instance:
          lease-renewal-interval-in-seconds: 2 #设置心跳的间隔时间(默认30秒)
          lease-expiration-duration-in-seconds: 5 #间隔时间
          instance-id: receive-8802.com #信息列表显示主机名称
          prefer-ip-address: true #显示ip地址
      
    • 消费者业务类编写,需要开启绑定,定义消息接收管道 @EnableBinding(Sink.class),使用@StreamListener(Sink.INPUT) 监听队列,用于消费者的队列的消息接收。

      @Component
      @EnableBinding(Sink.class)
      public class ReceiveMessageListenerController {
      
          @Value("${server.port}")
          private String serverPort;
      
          @StreamListener(Sink.INPUT)
          public void input(Message<String> message){
              System.out.println("消费者1号,--------------接收到的消息:"+message.getPayload()+"	 port"+serverPort);
          }
      }
      

    3、分组消费与持久化

    ​ 在集群服务中,一个消息可能会被消费多次,将其放入同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。只需要自定义分组就可以完成分组与持久化,在客户端加上配置 group。

    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          bindings: #服务的整合处理
            input: #通道名称
              destination: studyExchange #要使用的Exchange名称定义
              content-type: application/json #设置消息类型
              binder: defaultRabbit #设置要绑定的消息服务的具体设置
              group: groupA #自定义分组
    

    案例代码地址:https://github.com/xhanglog/springcloud-learning

  • 相关阅读:
    详解机器学习中的熵、条件熵、相对熵、交叉熵
    使用Keras进行深度学习:(三)使用text-CNN处理自然语言(上)
    粒子群优化算法(PSO)之基于离散化的特征选择(FS)(一)
    DNN模型训练词向量原理
    TensorFlow 实战卷积神经网络之 LeNet
    五大经典卷积神经网络介绍:LeNet / AlexNet / GoogLeNet / VGGNet/ ResNet
    Oracle 查询版本号
    C# 递归获取 文件夹的 所有文件
    SQL Server 常用语句
    Oracle 导入大量数据
  • 原文地址:https://www.cnblogs.com/Mhang/p/12587539.html
Copyright © 2011-2022 走看看