Spring boot对MQ类如RabbitMQ、kafka支持都很好,但是仍然要写一些模板代码。Spring cloud stream进一步掩盖了这个差异,仅仅使用配置就可以完成。
Spring cloud Stream 用了基于topic-subsriber的模式,虽然不支持全部MQ的特性,但绝大多数应用来说,这样就足够用了,毕竟方便很多。具体用法如下:
包含的包
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
MQ接口
public interface MyMQInterface{
@Input("通道名")
SubscribableChannel inputChannel();
@Output("通道名")
MessageChannel outputChanel();
}
- 通道名发送方和接受方必须是一致的
- 一个应用不能既是同一个通道的发送方和接收方,否则会警告。这实际上也没有意义。
- 声明后,启用绑定
@EnableBinding(MyMQInterface.class)
- 可以绑定多个接口
发送MQ消息
@Autowired
MyMQInterface myInterface;
.....
myInterface.inputChannel().send(MessageBuilder....);
接受消息
@SystemListener("通道名")
public void onReceive(Message content)
应用集群问题
- 如果某个应用起来多个实例,如上面的配置,会导致每条消息每个实例都会收到,如果你不想这么做,请在配置里面加上:
spring.cloud.stream.bindings.testOrders.group=分组名
- 每个应用定义一个唯一的分组名,不好和其他应用重复。
消息处理异常
- 如果收到消息处理有问题,比如写入数据库失败,请抛出RuntimeException异常,MQ会重试,不过重试几次后会失败,这个要注意。