前言
MQ消息中间件⼴泛应⽤在应⽤解耦合、异步消息处理、流量削峰等场景中。
不同的MQ消息中间件内部机制包括使⽤⽅式都会有所不同,⽐如RabbitMQ中有Exchange(交换机/交换器)这⼀概念, kafka有Topic、 Partition分区这些概念, MQ消息中间件的差异性不利于我们上层的开发应⽤,当我们的系统希望从原有的RabbitMQ切换到Kafka时,我们会发现⽐较困难,很多要操作可能重来(因为应⽤程序和具体的某⼀款MQ消息中间件耦合在⼀起了)。
Spring Cloud Stream进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀来,我们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream⽀持RabbitMQ和Kafka。
本质: 屏蔽掉了底层不同MQ消息中间件之间的差异,统⼀了MQ的编程模型,降低了学习、开发、维护MQ的成本
Stream简介
Spring Cloud Stream 是⼀个构建消息驱动微服务的框架。应⽤程序通过inputs(相当于消息消费者consumer)或者outputs(相当于消息⽣产者producer)来与Spring Cloud Stream中的binder对象交互,⽽Binder对象是⽤来屏蔽底层MQ细节的,它负责与具体的消息中间件交互。
说⽩了:对于我们来说,只需要知道如何使⽤Spring Cloud Stream与Binder对象交互即可
Stream的几个常用注解
注解 | 描述 |
---|---|
@Input(在消费者⼯程中使⽤) | 注解标识输⼊通道,通过该输⼊通道接收到的消息进⼊应⽤程序 |
@Output(在⽣产者⼯程中使⽤) | 注解标识输出通道,发布的消息将通过该通道离开应⽤程序 |
@StreamListener(在消费者⼯程中使⽤,监听message的到来) | 监听队列,⽤于消费者的队列的消息的接收(有消息监听.....) |
@EnableBinding | 把Channel和Exchange(对于RabbitMQ)绑定在⼀起 |
Stream开发实战
生产者端
在父工程下新建子模块lagou-cloud-stream-producer-9090,引入jar:
<!--spring cloud stream 依赖(rabbit) -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
添加rabbit相关配置:
spring:
application:
name: lagou-cloud-stream-producer
cloud:
stream:
binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
lagouRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联
type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
environment: # MQ环境配置(⽤户名、密码等)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 关联整合通道和binder对象
output: # output是我们定义的通道名称,此处不能乱改
destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,⽐如json
binder: lagouRabbitBinder # 关联MQ服务
定义一个发送消息的接口及实现类:
public interface IMessageProduder {
void sendMessage(String message);
}
//绑定的通道,output。springcloud stream内对输出通道的定义
@EnableBinding(Source.class)
public class MessageProducerImpl implements IMessageProduder {
@Autowired
private Source source;
@Override
public void sendMessage(String message) {
//向mq发送消息(并不直接操作mq,而是操作springcloud stream
//使用source指定的output通道向外发送消息
source.output().send(MessageBuilder.withPayload(message).build());
}
}
消费者端
父工程下新建消费者子模块lagou-cloud-stream-consumer-9091,引入jar坐标和服务端一样,这里不再赘述。
application.yml里面配置与rabbit相关参数,唯一与服务者端不同的是input和output参数:
其他都保持一致。
在消费端定义service类来接受消息:
@EnableBinding(Sink.class)
public class MessageConsumerService {
@StreamListener(Sink.INPUT)
public void receiveMessage(Message<String> message){
System.out.println("----接受到的消息---->"+message);
}
}
测试
我们在服务提供者端写一个测试类来发送消息:
@SpringBootTest(classes = {StreamProducerApplication9090.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class MessageProducerTest
{
@Autowired
private IMessageProduder iMessageProduder;
@Test
public void testSendMessage(){
iMessageProduder.sendMessage("hello world----!!");
}
}
我们先启动服务消费者,然后运行服务提供者端的测试类,看服务消费者端的控制台输出了接收到的信息:
Stream之消息分组
我们将服务消费者复制一份,新消费者的端口是9092,前一个消费者端口是9091。
这样我们继续测试,会发现同一个服务提供者发送的消息,被两个消费者都接收到并进行处理了。
这明显是有问题的,比如电商网站的订单,肯定只需要处理一次就行。
为了解决这个问题,rabbitmq有个消息分组的概念,只要两个消费者实例处在一个组里,那么这个组里只有一个消费者会处理这个消息。
我们仅仅需要在服务消费者端设置 spring.cloud.stream.bindings.input.group 属性,多个消费者实例配置为同⼀个group名称(在同⼀个group中的多个消费者只有⼀个可以获取到消息并消费)。如下:
扩展:前面我们都是先启动服务消费者,然后再启动服务提供者发送消息,也就是消息是临时性的,并没有持久化存储。当我们设置了分组之后,消息就会持久化存储。我们先发送消息,然后再启动服务消费者客户端,也能够接收到消息。
案例源码
欢迎访问我的博客:https://www.liuyj.top