1.解决的痛点
那么有没有一种新技术,能让我们不在关注具体的MQ细节,我们只需要用一种适配绑定的方式,自动的给我们在MQ内进行切换。这个时候就是Springcloud Stream要大显身手的时候。
2.概述
一句话:屏蔽底层消息中间件的差异,降低切换成本,同一消息的编程模型。
官方定义:springcloud Stream 是一个构建消息驱动的微服务框架。
通过我们配置binding(绑定),而springcloud Stream 的binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与springcloud stream交互就可以方便的使用消息驱动方式。
通过spring integration 来连接消息代理中间件,以实现消息消息事件驱动。
springcloud stream为一些供应商的消息中间件产品提供了个性化自动配置实现,引用了发布-订阅、消费组、分区三个核心概念,目前仅支持RabbitMQ、 Kafka
https://spring.io/projects/spring-cloud-stream
4.stream凭什么可以统一差异?
通过定义绑定器作为中间层,完美的实现了应用程序与消息中间件细节之间的隔离。
通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
input对应消费者;
output对应生产者;
6.Stream消息通信方式是什么?
它遵循了发布-订阅模式,通过Topic主题进行广播推送。
7.Stream遵循的标准流程
1.binder 很方便的连接中间件,屏蔽差异。
2.Channel 通道,是队列QUEUE的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel队列进行配置。
3.source和sink 简单的可以理解为参照对象是springcloud stream自身,从stream发布消息就是输出,接受消息就是输入。
8.API和常用的注解
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder | binder是应用于消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收。 |
@EnableBinding |
9.代码
第一步:创建eureka服务模块cloud-eureka-server7001、cloud-eureka-server7002
1.POM文件:
<dependencies> <!--eureka-server--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--引入热部署--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.创建application.yml
server: port: 7001 # eureka配置 eureka: instance: hostname: eureka7001.com #eureka服务端的实例名称 client: # false表示不想注册中心注册自己 register-with-eureka: false # false表示自己端就是注册中心,我的职责就是维护服务实例,并不需要检索服务 fetch-registry: false service-url: # 设置Eureka Server交互的地址查询服务和注册服务都需要依赖这个地址 #defaultZone: http://eureka7002.com:7002/eureka/ # 单机就是7001自己守望自己 defaultZone: http://eureka7001.com:7001/eureka/ server: enable-self-preservation: false # 禁用自我保护模式 eviction-interval-timer-in-ms: 2000
3.创建主入口函数
package com.seegot.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-09 22:15 */ @SpringBootApplication @EnableEurekaServer // 表示自己就是注册中心 public class EurekaMain7001 { public static void main(String[] args) { SpringApplication.run(EurekaMain7001.class,args); } }
7002和上面配置基本一致,只是端口号不一致,直接clone就行。
第二步:新建消息生产者模块cloud-stream-rabbitmq-provider8801
1.POM
<dependencies> <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> <version>3.0.6.RELEASE</version> </dependency> <!--springcloud stream--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- 客户端 config --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency> <!--注入eureka client 依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--引入热部署--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.创建application.yml
1 server: 2 port: 8801 3 spring: 4 application: 5 name: cloud-stream-provider 6 cloud: 7 stream: 8 binders: # 在此处配置需要绑定的rabbitmq的服务消息 9 defaultRabbit: # 表示定义的名称,用于binding整合 10 type: rabbit # 消息组件类型 11 environment: # 设置rabbitmq的相关环境配置 12 spring: 13 rabbitmq: 14 host: localhost 15 port: 5672 16 username: guest 17 password: guest 18 bindings: # 服务的整合处理 19 output: # 这个名字是一个通道的名称 20 destination: studyExchange # 标识要使用的Exchange名称定义 21 content-type: application/json #设置消息类型,本次为json,本文则设置为“text/plain” 22 binder: defaultRabbit # 设置要绑定的消息服务的具体设置 23 eureka: 24 client: 25 service-url: 26 defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka # 集群版 27 instance: 28 lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔,默认是30秒 29 lease-expiration-duration-in-seconds: 5 # 如果超过了5秒间隔,默认是90秒 30 instance-id: send-8801.com # 在信息列表时显示主机名称 31 prefer-ip-address: true #访问路径变为IP地址
3.创建主入口函数
package com.seegot.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-30 10:45 */ @SpringBootApplication @EnableEurekaClient public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class,args); } }
4.创建服务类 IMessageProvider
package com.seegot.springcloud.service; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-30 10:47 */ public interface IMessageProvider { public String send(); }
创建服务实现类
package com.seegot.springcloud.service.impl; import com.seegot.springcloud.service.IMessageProvider; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import javax.annotation.Resource; import java.util.UUID; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-30 10:48 */ @EnableBinding(Source.class) // 定义消息的推送管道。类似output public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; // 属于消息发送管道。 // 绑定消息 @Override public String send() { String uuid = UUID.randomUUID().toString();// 发送的内容 output.send(MessageBuilder.withPayload(uuid).build()); System.out.println("@@@@@uuid: "+ uuid); return null; } }
5.创建业务类
1 package com.seegot.springcloud.controller; 2 3 import com.seegot.springcloud.service.IMessageProvider; 4 import lombok.extern.slf4j.Slf4j; 5 import org.springframework.web.bind.annotation.GetMapping; 6 import org.springframework.web.bind.annotation.RestController; 7 8 import javax.annotation.Resource; 9 10 /** 11 * @program: cloud2020 12 * @description: 13 * @author: PP Zhang 14 * @create: 2020-06-30 10:56 15 */ 16 @RestController 17 @Slf4j 18 public class SendMessageController { 19 @Resource 20 private IMessageProvider messageProvider; 21 @GetMapping(value = "/sendMessage") 22 public String sendMessage(){ 23 return messageProvider.send(); 24 } 25 }
6.测试。http://localhost:8801/sendMessage
2020-06-30 13:22:54.601 INFO 2980 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration @@@@@uuid: baef8eab-67c7-40ce-b6ad-f290ad5ffeba @@@@@uuid: c457a864-45a4-4af1-99f8-ed0cf430429c @@@@@uuid: 803cfd67-ab44-4d63-98c0-b08880142316 @@@@@uuid: 818e3d27-6853-4878-9d24-6c73f7349480 @@@@@uuid: 45a28a16-d074-43ce-afd9-74c74d79259f @@@@@uuid: bcaad91e-fa08-4f5f-98d0-2232843f052b @@@@@uuid: 89b6b301-ee96-41e8-a99a-5f2c0e579cfa @@@@@uuid: 55d8c5aa-2093-44f5-ac8c-58392ec45249 @@@@@uuid: 8d79cfe9-e0fa-4880-aad4-bc1342919ec7 @@@@@uuid: 73202336-05b0-4682-bcfc-22627a3141c3 @@@@@uuid: 40206a09-5e06-4f02-95bb-68f7c78e141e @@@@@uuid: c768c1d1-961c-4a3c-8250-cba4052b046f
第三步:新建消费者模块cloud-stream-rabbitmq-consumer8802、cloud-stream-rabbitmq-consumer8803
1.POM
<dependencies> <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> <version>3.0.6.RELEASE</version> </dependency> <!--springcloud stream--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- 客户端 config --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency> <!--注入eureka client 依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--引入热部署--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.新建application.yml
在分布式环境下,一条消息只希望被消费一次,避免重复消费,此时需要注意对group的使用,要将消费者统一在同一个组group内,也就是group名称一致。另外,group还可以实现消息的持久化。可以通过停用消息消费者,然后通过消息生产者发送多条消息,然后重新启动消息消费者,会看到消息消费者接收到了相应的消息,这就是持久化;反之如果删除group,那么重新停用消息消费者,然后在通过消息生产者发送消息,再次启动消息消费者,此时就接收不到相应的消息。
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置需要绑定的rabbitmq的服务消息 defaultRabbit: # 表示定义的名称,用于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 标识要使用的Exchange名称定义 content-type: application/json #设置消息类型,本次为json,本文则设置为“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: seegotB #分组并且还可以实现消息持久化。 eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka # 集群版 instance: lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔,默认是30秒 lease-expiration-duration-in-seconds: 5 # 如果超过了5秒间隔,默认是90秒 instance-id: receive-8802.com # 在信息列表时显示主机名称 prefer-ip-address: true #访问路径变为IP地址
3.新建主入口函数
package com.seegot.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-30 11:19 */ @SpringBootApplication @EnableEurekaClient public class StreamMQMain8802 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class,args); } }
4.新建业务类
package com.seegot.springcloud.controller; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.web.bind.annotation.RestController; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-30 11:21 */ @RestController @EnableBinding(Sink.class)// 定义消息的接受管道。类似input public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) // 监听 public void input(Message<String> message){ System.out.println("消费者1号,---->接收的消息:"+message.getPayload()+" port:"+serverPort); } }