什么是消息驱动?
SpringCloud Stream消息驱动可以简化开发人员对消息中间件的使用复杂度,让系统开发人员更多尽力专注与核心业务逻辑的开发。SpringCloud Stream基于SpringBoot实现,自动配置化的功能可以帮助我们快速上手学习,类似与我们之前学习的orm框架,可以平滑的切换多种不同的数据库。
目前SpringCloud Stream 目前只支持 rabbitMQ和kafka
消息驱动原理
绑定器
通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通过,是的应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。
在该模型图上有如下几个核心概念:
- Source: 当需要发送消息时,我们就需要通过Source,Source将会把我们所要发送的消息(POJO对象)进行序列化(默认转换成JSON格式字符串),然后将这些数据发送到Channel中;
- Sink: 当我们需要监听消息时就需要通过Sink来,Sink负责从消息通道中获取消息,并将消息反序列化成消息对象(POJO对象),然后交给具体的消息监听处理进行业务处理;
- Channel: 消息通道是Stream的抽象之一。通常我们向消息中间件发送消息或者监听消息时需要指定主题(Topic)/消息队列名称,但这样一旦我们需要变更主题名称的时候需要修改消息发送或者消息监听的代码,但是通过Channel抽象,我们的业务代码只需要对Channel就可以了,具体这个Channel对应的是那个主题,就可以在配置文件中来指定,这样当主题变更的时候我们就不用对代码做任何修改,从而实现了与具体消息中间件的解耦;
- Binder: Stream中另外一个抽象层。通过不同的Binder可以实现与不同消息中间件的整合,比如上面的示例我们所使用的就是针对Kafka的Binder,通过Binder提供统一的消息收发接口,从而使得我们可以根据实际需要部署不同的消息中间件,或者根据实际生产中所部署的消息中间件来调整我们的配置。
消息驱动环境搭建
生产者环境
Maven依赖信息
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> </dependencies>
application.yml信息
server: port: 9000 spring: application: name: spingcloud-stream-producer # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest
创建管道
// 创建管道接口 public interface SendMessageInterface { // 创建一个输出管道,用于发送消息 @Output("my_msg") SubscribableChannel sendMsg(); }
发送消息
@RestController public class SendMsgController { @Autowired private SendMessageInterface sendMessageInterface; @RequestMapping("/sendMsg") public String sendMsg() { String msg = UUID.randomUUID().toString(); System.out.println("生产者发送内容msg:" + msg); Message build = MessageBuilder.withPayload(msg.getBytes()).build(); sendMessageInterface.sendMsg().send(build); return "success"; } }
启动服务
@SpringBootApplication @EnableBinding(SendMessageInterface.class) // 开启绑定 public class AppProducer { public static void main(String[] args) { SpringApplication.run(AppProducer.class, args); } }
消费者环境
Maven
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> </dependencies>
application.yml
server: port: 9000 spring: application: name: spingcloud-stream-consumer # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest
管道中绑定消息
public interface RedMsgInterface { // 从管道中获取消息 @Input("my_msg") SubscribableChannel redMsg(); }
消费者获取消息
@Component public class Consumer { @StreamListener("my_msg") public void listener(String msg) { System.out.println("消费者获取生产消息:" + msg); } }
启动消费者
@SpringBootApplication @EnableBinding(RedMsgInterface.class) public class AppConsumer { public static void main(String[] args) { SpringApplication.run(AppConsumer.class, args); } }
消费组
在现实的业务场景中,每一个微服务应用为了实现高可用和负载均衡,都会集群部署,按照上面我们启动了两个应用的实例,消息被重复消费了两次。为解决这个问题,Spring Cloud Stream 中提供了消费组,通过配置 spring.cloud.stream.bindings.myInput.group 属性为应用指定一个组名,下面修改下配置文件,
server: port: 8001 spring: application: name: spring-cloud-stream # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest cloud: stream: bindings: mymsg: ###指定 管道名称 #指定该应用实例属于 stream 消费组 group: stream
修改消费者
@Component public class Consumer { @Value("${server.port}") private String serverPort; @StreamListener("my_msg") public void listener(String msg) { System.out.println("消费者获取生产消息:" + msg + ",端口号:" + serverPort); } }
更改环境为kafka
Maven依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <version>2.0.1.RELEASE</version> </dependency>
生产者配置
server: port: 9000 spring: cloud: stream: # 设置成使用kafka kafka: binder: # Kafka的服务端列表,默认localhost brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092 # Kafka服务端连接的ZooKeeper节点列表,默认localhost zkNodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181 minPartitionCount: 1 autoCreateTopics: true autoAddPartitions: true
消费者配置
server: port: 8000 spring: application: name: springcloud_kafka_consumer cloud: instance-count: 1 instance-index: 0 stream: kafka: binder: brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092 zk-nodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181 auto-add-partitions: true auto-create-topics: true min-partition-count: 1 bindings: input: destination: my_msg group: s1 consumer: autoCommitOffset: false concurrency: 1 partitioned: false