一、什么是消息驱动?
消息驱动是一种统一消息编程模型,它的出现是为了屏蔽底层消息中间件的差异,降低切换消息中间件的成本。常见的消息中间件有ActiveMQ、RabbitMQ、RocketMQ、Kafka等,在这些技术之间切换需要花费大量的学习时间,我们可以通过消息驱动来适配绑定,在不同的消息中间件中切换,将不了解的中间件转换为自己拿手的消息中间件。同时也可以将微服务中多种消息中间件统一为一个中间件。
SpringCloudStream是一种构建消息驱动微服务的框架,应用程序通过inputs 或者outputs来与Spring Cloud Stream中binder对象交互,binder对象负责与消息中间件交互,所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动。通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。但目前仅支持RabbitMQ、Kafka。以下是其架构图:
理解了架构图之后,再来看看Spring Cloud Stream的标准执行流程图:
二、Spring Cloud Stream的使用
1、创建消息生产者(发送)模块:
a.引入spring-cloud-starter-stream-rabbit包
<dependency> <groupid>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
b.配置application.yml文件
server: port: 8801 spring: application: name: cloud-stream-provider
cloud: stream: binders: #在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: #表环定义的名称,用于于binding整合 type: rabbit #消息组件类型 environment: #设置rabbitmq的相关的环境配置
spring: rabbitmq: host: localhost
port: 5672 username: guest
password: guest bindings: #服务的整合处理 output: #这个名字是一个通道的名称 destination: studyExchange #表示要使用的Exchange名称定义 content-type: application/json #设置消息类型,本次为json,文本则设置"text/plain"
binder: defaultRabbit #设置要绑定的消息服务的具体设置
c.编写消息发送代码:新建一个ImessageProvider接口,在新建一个接口实现类MessageProviderImpl,编写如下代码,调用业务层调用sendmessage()方法即可发消息。
/*消息发送者*/ public interface IMessageProvider { public string sendmessage(); } /*消息发送者实现类*/ @EnableBinding(source.class) //定义消息的推送管道 public class MessageProviderImpl implements IMessageProvider { @Resource private Messagechannel output; //消息发送管道 @override public string sendmessage() { string serial = UUID.randomuuID().tostring(); output.send(MessageBuilder.withPayLoad(serial).build()); //消息发送 return null; } }
2、创建消息消费者(接收)模块:
a.消息接收模块也需要引入spring-cloud-starter-stream-rabbit包
b.配置application.yml文件:配置方式与消息生产模块唯一的区别是bindings节点下,一个是output、一个是input。
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: #在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: #表环定义的名称,用于于binding整合 type: rabbit #消息组件类型 environment: #设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: #服务的整合处理 input: #这个名字是一个通道的名称 destination: studyExchange #表示要使用的Exchange名称定义 content-type: application/json #设置消息类型,本次为json,文本则设置"text/plain" binder: defaultRabbit #设置要绑定的消息服务的具体设置
c.编写消息接收代码:新建一个消息接收业务类ReceiveMessageListenerController,编写如下代码。
@Component @EnableBinding(sink.class) public class ReceiveMessageListenerController { @value("${server.port}") private string serverPort; @streamListener(sink.INPUT) public void input(Message<string> message) //消息参数类型要与发送端参数类型一致 { system.out.println("接受消息:"+message.getPayload()); } }
完成以上步骤,则消息生产者和消费者都创建完成,启动微服务,调用生产者sendmessage()方法,消费者input()方法将会执行获取到消息。同时可以访问RabbitMQ可视化页面http://localhost:5672,登录进去将在Exchanges栏目查看到消息波峰图表如下。
三、分组消费与持久化
在以上消息消费的过程中,需要考虑重复消费、消息丢失和持久化的问题。
重复消费:在多个集群的微服务架构中,每个模块都会从RabbitMQ中获取消息,如果一个消息同时被多个相同的微服务模块获取到,就会造成重复消费(比如同一订单同时被两个支付模块获取到后会造成重复扣款),我们有时候需要避免产生这种情况,于是Stream中的消息分组派上用场了。在stream中同一个group中的多个消费者存在竞争关系,一条消息只能被其中的一个消费者获取到,因此我们需要将不能同时消费的这些服务模块放到同一group中。默认情况下,每一个微服务都属于一个独立的组(不同的组的模块是可以重复消费同一消息的),因此他们会重复消费。配置多个消费者模块在同一个组的方式是修改这些模块的yml配置文件,配置 " bindings.input.group=相同的组名 "即可。
持久化和消息丢失:
a.如果一个消费者去掉分组后重启,则重启后不会再次获取未被消费的消息,发生消息丢失现象。
b.如果一个消费者不移除分组直接重启,则重启后会重新获取未被消费的消息,这是就是消息的持久化。