zoukankan      html  css  js  c++  java
  • SpringCloud之Spring Cloud Stream:消息驱动

    Spring Cloud Stream 是一个构建消息驱动微服务的框架,该框架在Spring Boot的基础上整合了Spring Integrationg来连接消息代理中间件(RabbitMQ, Kafka等),提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。
    应用程序通过input通道或者output通道来与Spring Cloud Stream中binder(绑定器)交互,通过配置来binding. 而Spring Cloud Stream的binder负责与中间件交互。

    开发工具:IntelliJ IDEA 2019.2.3

    一、服务器端

    1、创建项目

    IDEA中创建一个新的SpringBoot项目,名称为“spring-server”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Spring Cloud Discovery -> Eureka Server。
    pom.xml完整内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.10.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>spring-server</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>spring-server</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    View Code

    2、修改配置application.yml

    修改端口号为8761;取消将自己信息注册到Eureka服务器,不从Eureka服务器抓取注册信息。

    server:
      port: 8761
    eureka:
      client:
        register-with-eureka: false
        fetch-registry: false

    3、修改启动类代码

    增加注解@EnableEurekaServer

    package com.example.springserver;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
    
    @SpringBootApplication
    @EnableEurekaServer
    public class SpringServerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringServerApplication.class, args);
        }
    
    }
    View Code

    二、消息生产者

    1、创建项目
    IDEA中创建一个新的SpringBoot项目,名称为“spring-producer”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
    打开pom.xml,添加依赖spring-cloud-starter-stream-rabbit,会自动引入spring-cloud-stream和spring-cloud-stream-binder。
    pom.xml完整内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.10.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>spring-producer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>spring-producer</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-test-support</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    View Code

    2、修改配置application.yml

    pom.xml使用RabbitMQ,默认情况下,连接本地的5672端口。下面这段rabbitmq也可省略。

    server:
      port: 8081
    spring:
      application:
        name: spring-producer
    eureka:
      instance:
        hostname: localhost
      client:
        serviceUrl:
          defaultZone: http://localhost:8761/eureka/
    rabbitmq:
      host: localhost
      post: 5672
      username: guest
      password: guest

    3、编写发送服务

    方法sendOrder使用@Output("myInput")注解表示创建myInput的消息通道。调用该方法后,会向myInput通道投递消息。
    如果不使用参数myInput,则使用方法名作为通道名称。

    package com.example.springproducer;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface SendService {
        @Output("myInput")
        SubscribableChannel sendOrder();
    }

    4、修改启动类代码

    加入注解@EnableBinding以开启Spring容器的绑定功能,以SendService.class为参数,Spring容器启动时,会自动绑定SendService接口中定义的通道。

    package com.example.springproducer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    
    @SpringBootApplication
    @EnableEurekaClient
    @EnableBinding(SendService.class)
    public class SpringProducerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringProducerApplication.class, args);
        }
    
    }

    5、添加一个控制器类

    调用SendService的发送方法,往服务器发送消息。

    package com.example.springproducer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class ProducerController {
    
        @Autowired
        SendService sendService;
    
        @RequestMapping(value="/send",method= RequestMethod.GET)
        public String sendRequest(){
            //创建消息
            Message msg = MessageBuilder.withPayload("hello world".getBytes()).build();
            //发送消息
            sendService.sendOrder().send(msg);
            return "SUCCESS";
        }
    }

    三、消息消费者

    1、创建项目

    IDEA中创建一个新的SpringBoot项目,名称为“spring-consumer”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
    打开pom.xml,添加依赖spring-cloud-starter-stream-rabbit

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.10.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>spring-consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>spring-consumer</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    View Code

    2、修改配置application.yml

    server:
      port: 8080
    spring:
      application:
        name: spring-consumer
    eureka:
      instance:
        hostname: localhost
      client:
        serviceUrl:
          defaultZone: http://localhost:8761/eureka/
    rabbitmq:
      host: localhost
      post: 5672
      username: guest
      password: guest

    3、缩写接受消息的通道接口

    package com.example.springconsumer;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface ReceiveService {
        @Input("myInput")
        SubscribableChannel myInput();
    }

    4、修改启动类代码

    同样绑定消息通道

    package com.example.springconsumer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    
    @SpringBootApplication
    @EnableBinding(ReceiveService.class)
    public class SpringConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringConsumerApplication.class, args);
        }
    
        //订阅myInput通道的消息
        @StreamListener("myInput")
        public void receive(byte[] msg){
            System.out.println("接收到的消息:" + new String(msg));
        }
    }

    5、测试

    (1)检查服务里面的RabbitMQ是否有启动(默认启动);

    (2)启动spring-server(8761端口);

    (3)启动spring-producer(8081端口);

    (4)启动spring-consumer(8080端口);

    (5)浏览器访问http://localhost:8081/send,spring-consumer项目的控制台输出:

    接收到的消息:hello world

    说明消费者已经可以从消息代理中获取到消息。

    四、更换绑定器

    上面使用了RabbitMQ作为消息代理,如果使用Kafka,可以更换Maven依赖实现。
    在生产者和消费者的pom.xml中,将spring-cloud-starter-stream-rabbit修改为spring-cloud-starter-stream-kafka。

    五、Sink、Source与Processor接口

    为了简化开发,Spring Cloud Stream内置了3个接口:Sink、Source与Processor。接口定义如下:

    public interface Source {
        String OUTPUT = "output";
    
        @Output("output")
        MessageChannel output();
    }
    public interface Sink {
        String INPUT = "input";
    
        @Input("input")
        SubscribableChannel input();
    }
    public interface Processor extends Source, Sink {
    }

    通过上面内置接口,不必编写服务接口,生产者绑定通道时加入Source.class,消费者绑定通道时加入Sink.class,因为Processor继承于Sink与Source,也可以只使用Processor。

    1、消息生产者代码改写如下:

    (1)启动类代码

    绑定通道时加入Processor.class

    package com.example.springproducer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Processor;
    
    @SpringBootApplication
    @EnableEurekaClient
    @EnableBinding(Processor.class)
    public class SpringProducerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringProducerApplication.class, args);
        }
    
    }

    (2)修改配置application.yml

    绑定名为input的消息队列,否则消息队列默认为output。

    server:
      port: 8081
    spring:
      application:
        name: spring-producer
      cloud:
        stream:
          bindings:
            output:
              destination: input
    eureka:
      instance:
        hostname: localhost
      client:
        serviceUrl:
          defaultZone: http://localhost:8761/eureka/

    (3)删除服务接口SendService

    2、消息消费者代码改写如下:

    (1)启动类代码

    绑定通道时加入Processor.class

    package com.example.springconsumer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Processor;
    import org.springframework.messaging.handler.annotation.SendTo;
    
    @SpringBootApplication
    @EnableBinding(Processor.class)
    public class SpringConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringConsumerApplication.class, args);
        }
    
        @StreamListener(Processor.INPUT)
        public void receive(byte[] msg){
            System.out.println("接收到的消息:" + new String(msg));
        }
    }

    (2)删除服务接口ReceiveService 

    重新启动应用,浏览器访问http://localhost:8081/send,spring-consumer项目的控制台输出:

    接收到的消息:hello world

    、消费者组

    消费者组不同时,会发送给所有的消费者实例。
    消费者组相同时,对于发送过来的消息,仅由其中一个消费者实例处理。

    1、修改消费者spring-consumer项目的配置application.yml,配置消费者组为groupA

    server:
      port: 8080
    spring:
      application:
        name: spring-consumer
      cloud:
        stream:
          bindings:
            input:
              group:
                groupA
    eureka:
      instance:
        hostname: localhost
      client:
        serviceUrl:
          defaultZone: http://localhost:8761/eureka/

    2、仿照上面消费者spring-consumer项目,新建两个消费者项目spring-second-consumer(端口8082)和spring-third-consumer(端口8083),配置application.yml的消费者组都为groupB。

    重新启动应用,浏览器多次访问http://localhost:8081/send,可以看到原来消费者spring-consumer都能收到,新加的两个消费者spring-second-consumer(端口8082)和spring-third-consumer(端口8083)会轮询处理。

  • 相关阅读:
    Mix and Build(简单DP)
    Is It A Tree?(并查集)
    Paths on a Grid(简单组合数学)
    Code(组合数学)
    Round Numbers(组合数学)
    Inviting Friends(二分+背包)
    Communication System(dp)
    Human Gene Functions
    Pearls
    敌兵布阵(线段树HDU 1166)
  • 原文地址:https://www.cnblogs.com/gdjlc/p/11920463.html
Copyright © 2011-2022 走看看