前言:不断学习就是程序员的宿命。
一、消息驱动概述
1、简介
一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
官网:https://spring.io/projects/spring-cloud-stream#overview
API:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/
中文参考文档:https://m.wang1314.com/doc/webapp/topic/20971999.html
2、设计思想
(1)标准MQ
①生产者/消费者之间靠消息媒介传递信息内容(Message)
②消息必须走特定的通道(消息通道MessageChannel)
③消息通道里的消息如何消费?谁负责收发处理(消息通道MessageChannel的子接口SubscribableChannel),由MessageHandler消息处理器所订阅)
(2)为什么使用Cloud Stream
这些中间件的差异性导致我们实际开发项目造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我们想往另外一种消息队列进行转移,这时候无疑就是一个灾难性的,一大堆东西都要重新推到重新做,因为它跟我们的系统耦合了,这时候SpringCloud Stream给我们提供了一种解耦合的方式。
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要考虑各种不同的消息中间件实现。
(3)处理架构
Input对应消费者,Output对应生产者
Stream中的消息通信方式遵循了发布-订阅模式:Topic主题进行广播:RabbitMQ中就是Exchange、在Kafka中就是Topic
3、Stream标准流程
Binder:很方便的连接中间件
Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
Source和Sink:简单的可理解为参照对象是SpringCloud Stream自身,从Stream发布消息就是输出,接受消息就是输入
4、编码API及常用注解
二、消息驱动之生产者
1、application.yml修改配置
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitMQ的服务信息 defaultRabbit: # 表示定义的名称,用于binding的整合 type: rabbit # 消息中间件类型 environment: # 设置rabbitMQ的相关环境配置 spring: rabbitmq: host: 192.168.154.21 port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka,http://eureka7001.com:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30 lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90 instance-id: send-8801.com # 主机名 prefer-ip-address: true # 显示ip
2、生产者代码
/** * EnableBinding 定义消息的推送管道 */ @EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider { /** * 消息发送管道 */ @Resource private MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("serial = " + serial); return serial; } }
3、测试
三、消息驱动之消费者
1、application.yml配置
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitMQ的服务信息 defaultRabbit: # 表示定义的名称,用于binding的整合 type: rabbit # 消息中间件类型 environment: # 设置rabbitMQ的相关环境配置 spring: rabbitmq: host: 192.168.154.21 port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain binder: defaultRabbit # 设置要绑定的消息服务的具体设置 # group: spectrumrpcA eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30 lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90 instance-id: receive-8802.com #主机名 prefer-ip-address: true # 显示ip
2、业务类
@Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message){ System.out.println("消费者1号,----->接收到的消息:"+ message.getPayload() +" port:" + serverPort); } }
3、测试
四、分组消费与持久化
解决重复消费:组内竞争、组间共享(默认分组是不同的)
1、分组消费
1.1、application.yml指定消费者组
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitMQ的服务信息 defaultRabbit: # 表示定义的名称,用于binding的整合 type: rabbit # 消息中间件类型 environment: # 设置rabbitMQ的相关环境配置 spring: rabbitmq: host: 192.168.154.21 port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: spectrumrpcA eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30 lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90 instance-id: receive-8802.com #主机名 prefer-ip-address: true # 显示ip
1.2、测试
解决了重复消费的问题。
2、持久化
生产者发送消息,8802消费者不指定消费者重新启动,8803消费者指定消费者组启动,发现8802消息丢失,消息全被指定消费者的8803消费到