zoukankan      html  css  js  c++  java
  • SpringCloud之Spring Cloud Stream:消息驱动

    Spring Cloud Stream 是一个构建消息驱动微服务的框架,该框架在Spring Boot的基础上整合了Spring Integrationg来连接消息代理中间件(RabbitMQ, Kafka等),提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。
    应用程序通过input通道或者output通道来与Spring Cloud Stream中binder(绑定器)交互,通过配置来binding. 而Spring Cloud Stream的binder负责与中间件交互。

    开发工具:IntelliJ IDEA 2019.2.3

    一、服务器端

    1、创建项目

    IDEA中创建一个新的SpringBoot项目,名称为“spring-server”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Spring Cloud Discovery -> Eureka Server。
    pom.xml完整内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.10.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>spring-server</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>spring-server</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    View Code

    2、修改配置application.yml

    修改端口号为8761;取消将自己信息注册到Eureka服务器,不从Eureka服务器抓取注册信息。

    server:
      port: 8761
    eureka:
      client:
        register-with-eureka: false
        fetch-registry: false

    3、修改启动类代码

    增加注解@EnableEurekaServer

    package com.example.springserver;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
    
    @SpringBootApplication
    @EnableEurekaServer
    public class SpringServerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringServerApplication.class, args);
        }
    
    }
    View Code

    二、消息生产者

    1、创建项目
    IDEA中创建一个新的SpringBoot项目,名称为“spring-producer”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
    打开pom.xml,添加依赖spring-cloud-starter-stream-rabbit,会自动引入spring-cloud-stream和spring-cloud-stream-binder。
    pom.xml完整内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.10.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>spring-producer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>spring-producer</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-test-support</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    View Code

    2、修改配置application.yml

    pom.xml使用RabbitMQ,默认情况下,连接本地的5672端口。下面这段rabbitmq也可省略。

    server:
      port: 8081
    spring:
      application:
        name: spring-producer
    eureka:
      instance:
        hostname: localhost
      client:
        serviceUrl:
          defaultZone: http://localhost:8761/eureka/
    rabbitmq:
      host: localhost
      post: 5672
      username: guest
      password: guest

    3、编写发送服务

    方法sendOrder使用@Output("myInput")注解表示创建myInput的消息通道。调用该方法后,会向myInput通道投递消息。
    如果不使用参数myInput,则使用方法名作为通道名称。

    package com.example.springproducer;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface SendService {
        @Output("myInput")
        SubscribableChannel sendOrder();
    }

    4、修改启动类代码

    加入注解@EnableBinding以开启Spring容器的绑定功能,以SendService.class为参数,Spring容器启动时,会自动绑定SendService接口中定义的通道。

    package com.example.springproducer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    
    @SpringBootApplication
    @EnableEurekaClient
    @EnableBinding(SendService.class)
    public class SpringProducerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringProducerApplication.class, args);
        }
    
    }

    5、添加一个控制器类

    调用SendService的发送方法,往服务器发送消息。

    package com.example.springproducer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class ProducerController {
    
        @Autowired
        SendService sendService;
    
        @RequestMapping(value="/send",method= RequestMethod.GET)
        public String sendRequest(){
            //创建消息
            Message msg = MessageBuilder.withPayload("hello world".getBytes()).build();
            //发送消息
            sendService.sendOrder().send(msg);
            return "SUCCESS";
        }
    }

    三、消息消费者

    1、创建项目

    IDEA中创建一个新的SpringBoot项目,名称为“spring-consumer”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
    打开pom.xml,添加依赖spring-cloud-starter-stream-rabbit

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.10.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>spring-consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>spring-consumer</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    View Code

    2、修改配置application.yml

    server:
      port: 8080
    spring:
      application:
        name: spring-consumer
    eureka:
      instance:
        hostname: localhost
      client:
        serviceUrl:
          defaultZone: http://localhost:8761/eureka/
    rabbitmq:
      host: localhost
      post: 5672
      username: guest
      password: guest

    3、缩写接受消息的通道接口

    package com.example.springconsumer;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface ReceiveService {
        @Input("myInput")
        SubscribableChannel myInput();
    }

    4、修改启动类代码

    同样绑定消息通道

    package com.example.springconsumer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    
    @SpringBootApplication
    @EnableBinding(ReceiveService.class)
    public class SpringConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringConsumerApplication.class, args);
        }
    
        //订阅myInput通道的消息
        @StreamListener("myInput")
        public void receive(byte[] msg){
            System.out.println("接收到的消息:" + new String(msg));
        }
    }

    5、测试

    (1)检查服务里面的RabbitMQ是否有启动(默认启动);

    (2)启动spring-server(8761端口);

    (3)启动spring-producer(8081端口);

    (4)启动spring-consumer(8080端口);

    (5)浏览器访问http://localhost:8081/send,spring-consumer项目的控制台输出:

    接收到的消息:hello world

    说明消费者已经可以从消息代理中获取到消息。

    四、更换绑定器

    上面使用了RabbitMQ作为消息代理,如果使用Kafka,可以更换Maven依赖实现。
    在生产者和消费者的pom.xml中,将spring-cloud-starter-stream-rabbit修改为spring-cloud-starter-stream-kafka。

    五、Sink、Source与Processor接口

    为了简化开发,Spring Cloud Stream内置了3个接口:Sink、Source与Processor。接口定义如下:

    public interface Source {
        String OUTPUT = "output";
    
        @Output("output")
        MessageChannel output();
    }
    public interface Sink {
        String INPUT = "input";
    
        @Input("input")
        SubscribableChannel input();
    }
    public interface Processor extends Source, Sink {
    }

    通过上面内置接口,不必编写服务接口,生产者绑定通道时加入Source.class,消费者绑定通道时加入Sink.class,因为Processor继承于Sink与Source,也可以只使用Processor。

    1、消息生产者代码改写如下:

    (1)启动类代码

    绑定通道时加入Processor.class

    package com.example.springproducer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Processor;
    
    @SpringBootApplication
    @EnableEurekaClient
    @EnableBinding(Processor.class)
    public class SpringProducerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringProducerApplication.class, args);
        }
    
    }

    (2)修改配置application.yml

    绑定名为input的消息队列,否则消息队列默认为output。

    server:
      port: 8081
    spring:
      application:
        name: spring-producer
      cloud:
        stream:
          bindings:
            output:
              destination: input
    eureka:
      instance:
        hostname: localhost
      client:
        serviceUrl:
          defaultZone: http://localhost:8761/eureka/

    (3)删除服务接口SendService

    2、消息消费者代码改写如下:

    (1)启动类代码

    绑定通道时加入Processor.class

    package com.example.springconsumer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Processor;
    import org.springframework.messaging.handler.annotation.SendTo;
    
    @SpringBootApplication
    @EnableBinding(Processor.class)
    public class SpringConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringConsumerApplication.class, args);
        }
    
        @StreamListener(Processor.INPUT)
        public void receive(byte[] msg){
            System.out.println("接收到的消息:" + new String(msg));
        }
    }

    (2)删除服务接口ReceiveService 

    重新启动应用,浏览器访问http://localhost:8081/send,spring-consumer项目的控制台输出:

    接收到的消息:hello world

    、消费者组

    消费者组不同时,会发送给所有的消费者实例。
    消费者组相同时,对于发送过来的消息,仅由其中一个消费者实例处理。

    1、修改消费者spring-consumer项目的配置application.yml,配置消费者组为groupA

    server:
      port: 8080
    spring:
      application:
        name: spring-consumer
      cloud:
        stream:
          bindings:
            input:
              group:
                groupA
    eureka:
      instance:
        hostname: localhost
      client:
        serviceUrl:
          defaultZone: http://localhost:8761/eureka/

    2、仿照上面消费者spring-consumer项目,新建两个消费者项目spring-second-consumer(端口8082)和spring-third-consumer(端口8083),配置application.yml的消费者组都为groupB。

    重新启动应用,浏览器多次访问http://localhost:8081/send,可以看到原来消费者spring-consumer都能收到,新加的两个消费者spring-second-consumer(端口8082)和spring-third-consumer(端口8083)会轮询处理。

  • 相关阅读:
    严重: Parse error in application web.xml file at jndi:/localhost/ipws/WEBINF/web.xml java.lang.NoSuchMethodException: org.apache.catalina.deploy.WebXml
    Failed to install .apk on device 'emulator5554': timeout解决方法
    java.lang.NoClassDefFoundError:org.jsoup.Jsoup
    Conversion to Dalvik format failed: Unable to execute dex:解决方法
    apache Digest: generating secret for digest authentication ...
    Description Resource Path Location Type Project has no default.properties file! Edit the project properties to set one.
    android service随机自启动
    MVC3 安装部署
    EF 4.3 CodeBased 数据迁移演练
    SQL Server 2008开启sa账户
  • 原文地址:https://www.cnblogs.com/gdjlc/p/11920463.html
Copyright © 2011-2022 走看看