zoukankan      html  css  js  c++  java
  • 【SpringCloud】Spring Cloud Stream 消息驱动(二十三)

    Spring Cloud Stream介绍

      Spring Cloud Stream,官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架

      应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中的binder对象交互,通过配置binding(绑定),而 Spring Cloud Stream 的binder对象负载与消息中间件交互,所以,我们只需要高清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式,通过使用Spring Integration 来连接消息代理中间件以实现消息事件驱动。

      Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念

      目前仅支持RabbitMQ、Kafka

      官网:https://spring.io/projects/spring-cloud-stream

      中文手册:https://www.springcloud.cc/spring-cloud-greenwich.html#spring-cloud-stream-overview-introducing

    Spring Cloud Stream处理架构

        

    组成说明
    Middleware 中间件,目前只支持RabbitMQ和Kafka
    Binder Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
    @Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
    @Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
    @StreamListener 监听队列,用于消费者的队列的消息接收
    @EnableBinding 指信道channel和exchange绑定在一起

    Spring Cloud Stream标准流程

        

    Spring Cloud Stream生产者

      环境准备

        使用Eureka作为注册中心,搭建参考:【SpringCloud】快速入门(一)

        使用RabbitMQ作为中间件,搭建参考:【RabbitMQ】 RabbitMQ安装

      1、新建一个Spring Cloud Stream生产者模块(springcloud-stream-rabbitmq-provider8801)

        

      2、编辑POM文件,引入stream依赖和eureka依赖

     1 <!-- spring cloud stream rabbit -->
     2 <dependency>
     3     <groupId>org.springframework.cloud</groupId>
     4     <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
     5 </dependency>
     6 
     7 <!-- eureka client -->
     8 <dependency>
     9     <groupId>org.springframework.cloud</groupId>
    10     <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    11 </dependency> 

      完整pom文件如下:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <parent>
     6         <artifactId>test-springcloud</artifactId>
     7         <groupId>com.test</groupId>
     8         <version>1.0-SNAPSHOT</version>
     9     </parent>
    10     <modelVersion>4.0.0</modelVersion>
    11 
    12     <artifactId>springcloud-stream-rabbitmq-provider8801</artifactId>
    13 
    14     <dependencies>
    15 
    16         <!-- spring cloud stream rabbit -->
    17         <dependency>
    18             <groupId>org.springframework.cloud</groupId>
    19             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    20         </dependency>
    21 
    22         <!-- eureka client -->
    23         <dependency>
    24             <groupId>org.springframework.cloud</groupId>
    25             <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    26         </dependency>
    27 
    28         <!-- spring boot -->
    29         <dependency>
    30             <groupId>org.springframework.boot</groupId>
    31             <artifactId>spring-boot-starter-web</artifactId>
    32         </dependency>
    33         <dependency>
    34             <groupId>org.springframework.boot</groupId>
    35             <artifactId>spring-boot-starter-actuator</artifactId>
    36         </dependency>
    37 
    38         <dependency>
    39             <groupId>org.springframework.boot</groupId>
    40             <artifactId>spring-boot-devtools</artifactId>
    41             <scope>runtime</scope>
    42             <optional>true</optional>
    43         </dependency>
    44 
    45         <dependency>
    46             <groupId>org.projectlombok</groupId>
    47             <artifactId>lombok</artifactId>
    48             <optional>true</optional>
    49         </dependency>
    50         <dependency>
    51             <groupId>org.springframework.boot</groupId>
    52             <artifactId>spring-boot-starter-test</artifactId>
    53             <scope>test</scope>
    54         </dependency>
    55 
    56     </dependencies>
    57 </project>
    pom.xml

      3、编辑配置文件,application.yml

     1 # 端口
     2 server:
     3   port: 8801
     4 
     5 spring:
     6   application:
     7     name: cloud-stream-provider
     8   cloud:
     9     stream:
    10       binders:
    11         # 表示定义的名称,用于binding的服务信息
    12         defaultRabbit:
    13           # 消息组件类型
    14           type: rabbit
    15           # 设置rabbitmq的相关配置的环境配置
    16           environment:
    17             spring:
    18               rabbitmq:
    19                 host: 127.0.0.1
    20                 port: 5672
    21                 username: guest
    22                 password: guest
    23       # 服务的整合处理
    24       bindings:
    25         # 这个名字是一个通道的名称
    26         output:
    27           # 表示要使用的Exchange 名称定义
    28           destination: studyExchange
    29           # 设置消息类型,本次为json,文本则设置"text/plain"
    30           content-type: application/json
    31           # 设置要绑定的消息服务的具体设置
    32           binder: defaultRabbit
    33 
    34 eureka:
    35   client:
    36     service-url:
    37       defaultZone: http://localhost:8761/eureka

      4、编写主启动方法类

    1 @SpringBootApplication
    2 public class StreamMQMain8801 {
    3 
    4     public static void main(String[] args) {
    5         SpringApplication.run(StreamMQMain8801.class, args);
    6     }
    7 }

      5、编辑业务接口,IMessageProvider用来定义发送消息方法

    1 public interface IMessageProvider {
    2     public  String send();
    3 }

      6、编辑业务接口实现,MessageProviderImpl

     1 import org.springframework.messaging.support.MessageBuilder;
     2 
     3 import java.util.UUID;
     4 
     5 // 定义消息的推送管道
     6 @EnableBinding(Source.class)
     7 public class MessageProviderImpl implements IMessageProvider {
     8 
     9     @Autowired
    10     // 消息发送通道
    11     private MessageChannel output;
    12 
    13     public String send() {
    14         String serial = UUID.randomUUID().toString();
    15         output.send(MessageBuilder.withPayload(serial).build());
    16         System.out.println("====serial: " + serial);
    17         return null;
    18     }
    19 }

       7、编写controller

     1 @RestController
     2 public class SendMessageController {
     3 
     4     @Autowired
     5     private IMessageProvider messageProvider;
     6 
     7     @RequestMapping(value = "/sendMessage")
     8     public String sendMessage(){
     9         return messageProvider.send();
    10     }
    11 }

       8、测试

        1)启动Eureka注册中心,启动RabbitMQ消息中间件,启动Stream生产者项目

        2)查看RabbitMQ的web界面,在Exchang模块中,可以看到里面新增了一个名为studyExchange的交互器,类型为topic

          

        3)新建一个queue,名为:test.news,且绑定到studyExchange。

          

        4)访问地址:http://localhost:8801/sendMessage,发送消息,查看RabbitMQ后台

          可以看到test.news此queue,已经收到消息

    Spring Cloud Stream消费者

      1、新建一个Spring Cloud Stream消费者模块(springcloud-stream-rabbitmq-consumer8802) 

        

      2、编辑POM文件,引入stream依赖和eureka依赖,同上

        完整pom文件如下:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <parent>
     6         <artifactId>test-springcloud</artifactId>
     7         <groupId>com.test</groupId>
     8         <version>1.0-SNAPSHOT</version>
     9     </parent>
    10     <modelVersion>4.0.0</modelVersion>
    11 
    12     <artifactId>springcloud-stream-rabbitmq-consumer8802</artifactId>
    13 
    14     <dependencies>
    15 
    16         <!-- spring cloud stream rabbit -->
    17         <dependency>
    18             <groupId>org.springframework.cloud</groupId>
    19             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    20         </dependency>
    21 
    22         <!-- eureka client -->
    23         <dependency>
    24             <groupId>org.springframework.cloud</groupId>
    25             <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    26         </dependency>
    27 
    28         <!-- spring boot -->
    29         <dependency>
    30             <groupId>org.springframework.boot</groupId>
    31             <artifactId>spring-boot-starter-web</artifactId>
    32         </dependency>
    33         <dependency>
    34             <groupId>org.springframework.boot</groupId>
    35             <artifactId>spring-boot-starter-actuator</artifactId>
    36         </dependency>
    37 
    38         <dependency>
    39             <groupId>org.springframework.boot</groupId>
    40             <artifactId>spring-boot-devtools</artifactId>
    41             <scope>runtime</scope>
    42             <optional>true</optional>
    43         </dependency>
    44 
    45         <dependency>
    46             <groupId>org.projectlombok</groupId>
    47             <artifactId>lombok</artifactId>
    48             <optional>true</optional>
    49         </dependency>
    50         <dependency>
    51             <groupId>org.springframework.boot</groupId>
    52             <artifactId>spring-boot-starter-test</artifactId>
    53             <scope>test</scope>
    54         </dependency>
    55 
    56     </dependencies>
    57 </project>
    pom.xml

      3、便捷配置文件application.yml

     1 # 端口
     2 server:
     3   port: 8802
     4 
     5 spring:
     6   application:
     7     name: cloud-stream-consumer
     8   cloud:
     9     stream:
    10       binders:
    11         # 表示定义的名称,用于binding的服务信息
    12         defaultRabbit:
    13           # 消息组件类型
    14           type: rabbit
    15           # 设置rabbitmq的相关配置的环境配置
    16           environment:
    17             spring:
    18               rabbitmq:
    19                 host: 127.0.0.1
    20                 port: 5672
    21                 username: guest
    22                 password: guest
    23       # 服务的整合处理
    24       bindings:
    25         # 这个名字是一个通道的名称
    26         input:
    27           # 表示要使用的Exchange 名称定义
    28           destination: studyExchange
    29           # 设置消息类型,本次为json,文本则设置"text/plain"
    30           content-type: application/json
    31           # 设置要绑定的消息服务的具体设置
    32           binder: defaultRabbit
    33           # 消费分组
    34 #          group: testA
    35 
    36 eureka:
    37   client:
    38     service-url:
    39       defaultZone: http://localhost:8761/eureka

      4、编写主启动方法类

    1 @SpringBootApplication
    2 public class StreamMQMain8802 {
    3     public static void main(String[] args) {
    4         SpringApplication.run(StreamMQMain8802.class, args);
    5     }
    6 }

      5、编辑消息监听组件

     1 @Component
     2 @EnableBinding(Sink.class)
     3 public class ReceiveMessageListenerController {
     4 
     5     @Value("${server.port}")
     6     private String serverPort;
     7 
     8     @StreamListener(Sink.INPUT)
     9     public void input(Message<String> message){
    10         System.out.println("消费者" + serverPort + ",消费信息:" + message.getPayload());
    11     }
    12 }

      6、测试

        1)启动Eureka注册中心,启动RabbitMQ消息中间件,启动Stream生产者项目,以及启动Stream消费者项目

        2)查看RaibbitMQ的Web后台,发现Queue中多了队列,即Stream消费者项目监听的队列,且此队列绑定了studyExchange

          

        3)访问地址:http://localhost:8801/sendMessage,发送消息,查看RabbitMQ后台

          可以看到test.news此queue,已经收到消息,且Stream消费者项目也收到消息,并处理了

          

      

  • 相关阅读:
    AcWing356 次小生成树(lca)
    牛客 Rinne Loves Edges(dp)
    软件的生命周期和测试流程
    软件测试的学习经历回顾-第一天
    java List集合
    c#Socket通信
    c#线程2
    c#线程1
    c#Linq联合查询
    c#拓展方法
  • 原文地址:https://www.cnblogs.com/h--d/p/12840086.html
Copyright © 2011-2022 走看看