消息驱动概述
- 是什么
- 简单的说, 消息驱动屏蔽底层消息中间件的差异, 降低切换成本, 统一消息的编程模型.
- 官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架.
- 应用程序inputs或outputs来与Spring Cloud Stream中binder对象交互, 通过我们配置来binding(绑定), 而Stream的binder对象负责与消息中间件交互. 所以我们只需要搞清楚如何与Stream交互就可以方便使用消息驱动的方式.
- 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动.
- Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现, 引用了发布-订阅, 消费组, 分区的三个核心概念, 但目前仅支持RabbitMQ和Kafka
- Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架, 该框架提供了一个灵活的编程模型.
- 中文指导手册: https://m.wang1314.com/doc/webapp/topic/20971999.html
- 设计思想
- 标准的MQ
- 生产者/消费者之间靠消息媒介传递信息内容 -> Message
- 消息必须走特定的通道 -> MessageChannel
- 消息通道里的消息如何被消费, 谁负责收发处理? -> 消息通道MessageChannel的子接口SubscribableChannel, 由MessageHandler消息处理器订阅.
- 为什么用Stream
- 中间件的差异问题
- 比如说RabbitMQ有交换机(exchange), 而Kafka有Topic和Partitions分区.
- 中间件的差异会造成一定困扰, 我们如果用了两个消息队列的其中一种, 如果后面需要网另一个消息队列迁移, 一大堆东西需要推倒重做, 因为它跟我们的系统耦合了, 而Stream给我们提供了解耦的方式.
- Stream如何统一底层差异
- 通过定义绑定器(binder)作为中间层, 完美地实现了应用程序与消息中间件细节之间的隔离.
- 通过向应用程序暴露统一的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现.
- Stream对消息中间件的进一步封装, 可以做到代码层面对中间件的无感知, 甚至于动态的切换中间件, 使得微服务开发的高度解耦, 服务可以关注更多自己的业务流程.
- Binder
- INPUT对应消费者
- OUTPUT对应生产者
- 中间件的差异问题
- Stream中的消息通信方式遵循了发布-订阅模式
- Topic主题进行广播
- Stream的标准流程
-
- Binder: 很方便的连接中间件, 屏蔽差异
- Channel: 通道, 是队列Queue的一种抽象, 在消息通讯系统中就是实现存储与转发的媒介, 通过Channel对队列进行配置.
- Source和Sink: 简单的可理解为参照对象是Stream自身, 从Stream发布消息就是输出, 接受消息就是输入.
- 编码API和常用注解
消息驱动之生产者
- 新建Module: cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块.
- pom中新加
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
- yml
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: send-8801.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址
- 主启动类
@SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }
- 业务类
- 发送消息接口
public interface IMessageProvider { String send(); }
- 发送消息接口实现类
@EnableBinding(Source.class) //定义消息的推送管道. public class IMessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; //消息发送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("***********serial: " + serial); return null; } }
- controller
@RestController public class SendMessageController { @Resource private IMessageProvider messageProvider; @GetMapping("/sendMessage") public String sendMessage() { return messageProvider.send(); } }
- 发送消息接口
- 测试
- 启动7001, 7002, rabbitmq
- 启动8001后: http://localhost:8801/sendMessage
消息驱动之消费者
- 新建Module: cloud-stream-rabbitmq-provider8801
- 新加pom
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
- yml
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/ instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: receive-8802.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址
- 主启动类
@SpringBootApplication public class StreamMQMain8802 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class, args); } }
- 业务类
@Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消费者1号, ----> 接收的消息: " + message.getPayload() + " port" + serverPort); } }
- 测试: 8801发送消息, 8802接收消息
- http://localhost:8801/sendMessage
- 在8802的控制台会打出接收的消息.
分组消费与持久化
- 根据8802, clone出一份8803: cloud-stream-rabbitmq-consumer8803
- 启动
- RabbitMQ
- 7001, 7002, 8801, 8802, 8803
- 运行后的两个问题
- 有重复消费问题
- 消费持久化问题
- 消费
- 发送: http://localhost:8801/sendMessage
- 8802, 8803两个微服务都收到了消息, 存在重复消费问题.
- 要用分组和持久化属性group解决该问题
- 实际案例
- 如下场景, 订单系统我们做集群部署, 都会从RabbitMQ中获取订单信息, 如果一个订单同时被两个服务取到, 会造成数据错误, 我们要避免这种情况.
- 这时我们可以使用Stream中的消息分组来解决.
- 注意: Stream中处于同一个group中的多个消费者是竞争关系, 能够保证消息只会被其中一个应用消费一次, 而不同的组是可以重复消费的.
- 如下场景, 订单系统我们做集群部署, 都会从RabbitMQ中获取订单信息, 如果一个订单同时被两个服务取到, 会造成数据错误, 我们要避免这种情况.
- 分组
- 出现重复消费的原因
- 默认分组group是不同的, 组流水号不一样, 被认为是不同组
- 我们可以自定义配置分到同一个组, 来解决重复消费问题.
- 自定义分组操作:
- 在yml中添加: spring.cloud.stream.bindings.input.group
- 所以只需让8802, 8803设置到同一个组, 即可解决重复消费.
- 采取轮询的方式获取消息.
- 在yml中添加: spring.cloud.stream.bindings.input.group
- 持久化
- 上述解决了重复消费, 再看看持久化..
- 停止8802和8803微服务, 并去除掉8802的分组group: bbbbb
- 8801先发送4条消息给rabbitmq.
- 先启动8802, 无分组属性, 后台无任何消息打出来.
- 再启动8803, 有分组属性配置, 后台打出来了MQ上的消息.
- 配置了分组属性, 会把曾经未获得的消息重新获得并消费, 这就是消息的持久化.