zoukankan      html  css  js  c++  java
  • Spring cloud stream【入门介绍】

    案例代码:https://github.com/q279583842q/springcloud-e-book

      在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了哪个中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,这时我们可以使用SpringCloudStream来整合我们的消息中间件,来降低系统和中间件的耦合性。

    一、什么是SpringCloudStream

      官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
      应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
      通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQKafka

    二、Stream 解决了什么问题?

      Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程

    官网结构图

    在这里插入图片描述

    组成 说明
    Middleware 中间件,目前只支持RabbitMQ和Kafka
    Binder Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
    @Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
    @Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
    @StreamListener 监听队列,用于消费者的队列的消息接收
    @EnableBinding 指信道channel和exchange绑定在一起

    三、消息驱动入门案例

      我们通过一个入门案例来演示下通过stream来整合RabbitMQ来实现消息的异步通信的效果,所以首先要开启RabbitMQ服务,RabbitMQ不清楚的请参考此文:https://dpb-bobokaoya-sm.blog.csdn.net/article/details/90409404

    1.创建消息发送者服务

    1.1 创建项目

      创建一个SpringCloud项目

    在这里插入图片描述

    1.2 pom文件

      pom文件中重点是要添加spring-cloud-starter-stream-rabbit这个依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.13.RELEASE</version>
    	</parent>
    	<groupId>com.bobo</groupId>
    	<artifactId>stream-sender</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>Dalston.SR5</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-eureka</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    

    1.3 配置文件

      配置文件中除了必要的服务名称端口Eureka的信息外我们还要添加RabbitMQ的注册信息

    spring.application.name=stream-sender
    server.port=9060
    #设置服务注册中心地址,指向另一个注册中心
    eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
    
    #rebbitmq 链接信息
    spring.rabbitmq.host=192.168.88.150
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=dpb
    spring.rabbitmq.password=123
    spring.rabbitmq.virtualHost=/
    

    1.4 创建消费发送者接口

      创建一个发送消息的接口。具体如下:方法名称自定义,返回类型必须是SubscribableChannel,在Output注解中指定交换器名称。

    /**
     * 发送消息的接口
     * @author dengp
     *
     */
    public interface ISendeService {
    
    	/**
    	 * 指定输出的交换器名称
    	 * @return
    	 */
    	@Output("dpb-exchange")
    	SubscribableChannel send();
    }
    

    1.5 启动类

      在启动类中通过@EnableBinding注解绑定我们创建的接口类。

    @SpringBootApplication
    @EnableEurekaClient
    // 绑定我们刚刚创建的发送消息的接口类型
    @EnableBinding(value={ISendeService.class})
    public class StreamSenderStart {
    
    	public static void main(String[] args) {
    		SpringApplication.run(StreamSenderStart.class, args);
    	}
    }
    

    2.创建消息消费者服务

    2.1 创建项目

    在这里插入图片描述

    2.2 pom文件

      添加的依赖和发送消息的服务是一致的

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.13.RELEASE</version>
    	</parent>
    	<groupId>com.bobo</groupId>
    	<artifactId>stream-receiver</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>Dalston.SR5</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-eureka</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    		</dependency>
    	</dependencies>
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    

    2.3 配置文件

      注意修改服务名称和端口

    spring.application.name=stream-receiver
    server.port=9061
    #设置服务注册中心地址,指向另一个注册中心
    eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
    
    #rebbitmq 链接信息
    spring.rabbitmq.host=192.168.88.150
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=dpb
    spring.rabbitmq.password=123
    spring.rabbitmq.virtualHost=/
    

    2.4 创建接收消息的接口

      此接口和发送消息的接口相似,注意使用的是@Input注解。

    /**
     * 接收消息的接口
     * @author dengp
     *
     */
    public interface IReceiverService {
    
    	/**
    	 * 指定接收的交换器名称
    	 * @return
    	 */
    	@Input("dpb-exchange")
    	SubscribableChannel receiver();
    }
    
    

    2.5 创建处理消息的处理类

      注意此类并不是实现上面创建的接口。而是通过@EnableBinding来绑定我们创建的接口,同时通过@StreamListener注解来监听dpb-exchange对应的消息服务

    /**
     * 具体接收消息的处理类
     * @author dengp
     *
     */
    @Service
    @EnableBinding(IReceiverService.class)
    public class ReceiverService {
    
    	@StreamListener("dpb-exchange")
    	public void onReceiver(byte[] msg){
    		System.out.println("消费者:"+new String(msg));
    	}
    }
    

    2.6 启动类

      同样要添加@EnableBinding注解

    @SpringBootApplication
    @EnableEurekaClient
    @EnableBinding(value={IReceiverService.class})
    public class StreamReceiverStart {
    
    	public static void main(String[] args) {
    		SpringApplication.run(StreamReceiverStart.class, args);
    	}
    }
    

    3.编写测试代码

      通过单元测试来测试服务。

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import com.bobo.stream.StreamSenderStart;
    import com.bobo.stream.sender.ISendeService;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes=StreamSenderStart.class)
    public class StreamTest {
    	
    	@Autowired
    	private ISendeService sendService;
    
    	@Test
    	public void testStream(){
    		String msg = "hello stream ...";
    		// 将需要发送的消息封装为Message对象
    		Message message = MessageBuilder
    								.withPayload(msg.getBytes())
    								.build();
    		sendService.send().send(message );
    	}
    }
    

    启动消息消费者后,执行测试代码。结果如下:

    在这里插入图片描述

    消息接收者获取到了发送者发送的消息,同时我们在RabbitMQ的web界面也可以看到相关的信息

    在这里插入图片描述

    总结

      我们同stream实现了消息中间件的使用,我们发现只有在两处地址和RabbitMQ有耦合,第一处是pom文件中的依赖,第二处是application.properties中的RabbitMQ的配置信息,而在具体的业务处理中并没有出现任何RabbitMQ相关的代码,这时如果我们要替换为Kafka的话我们只需要将这两处换掉即可,即实现了中间件和服务的高度解耦

  • 相关阅读:
    超赞!不容错过的5款实用网页开发和设计工具
    如何从平面设计转行到UI设计?
    线段树
    RMQ
    Splay
    Treap
    *模板--矩阵
    最小生成树
    hash
    ac自动机
  • 原文地址:https://www.cnblogs.com/dengpengbo/p/11103943.html
Copyright © 2011-2022 走看看