stream这个项目让我们不必通过繁琐的自定义ampq来建立exchange,通道名称,以及队列名称和路由方式。
只需要简单几步我们就轻松使用stream完成推送到rabbitmq和kafafa,并完成监听工作。
spring.cloud.stream.rabbit.bindings.demo-input.consumer.binding-routing-key=rk.demo spring.cloud.stream.bindings.demo-input.binder=rabbit spring.cloud.stream.bindings.demo-input.destination=exchange.demo spring.cloud.stream.bindings.demo-input.content-type=application/json spring.cloud.stream.bindings.demo-input.group=queues.dem
对于RabbitMQ,destination 对应的是exchange,group对应的是queue(带有前缀)
如果没有配置routing-key,那么会使用默认值 #
1.stream 配置中的含义
当我们使用默认group,和destination时默认创建名为input 的持久化的topic类型的exchange。
队列是一个destination前缀然后一个随你名称的自动删除队列。
现在我们自定义group,destination。如下:
spring.cloud.stream.bindings.input.group=queueName
spring.cloud.stream.bindings.input.destination=exchangeName
根据destination,帮我们创建了一个持久化的topic类型的exchange。
根据group,帮我们创建了一个持久化的queue。
总结:对于RabbitMQ,destination 对应的是exchange,group对应的是queue(带有前缀)。
对于kafka,destination 对应的是Topic,group就是对应的消费group。
对于一个应用集群,如果不需要重复消费消息,必须定义group,否则不必定义group(比如刷新配置消息)。
2.发送消息
spring.cloud.stream.bindings.mqScoreOutput.destination=exchangeName2
public interface SendChannel { String SCORE_OUPUT = "mqScoreOutput"; @Output(SendChannel.SCORE_OUPUT) MessageChannel scoreOutput(); }
@EnableBinding(SendChannel.class) public class SendServerConfig { }
@Service public class SendServer { @Autowired private SendChannel sendChannel; @Autowired private MessageChannel mqScoreOutput; public void send1() { Message<String> fffff = MessageBuilder.withPayload("fffffsend1").build(); sendChannel.scoreOutput().send(fffff); System.out.println("发送消息send1"); } public void send2() { Message<String> fffff = MessageBuilder.withPayload("fffffsend2").build(); mqScoreOutput.send(fffff); System.out.println("发送消息send2"); } }
3.接收消息
spring.cloud.stream.bindings.mqScoreInput.group=queueName2
spring.cloud.stream.bindings.mqScoreInput.destination=exchangeName2
public interface ReceiverChannel { String SCORE_INPUT = "mqScoreInput"; @Input(ReceiverChannel.SCORE_INPUT) SubscribableChannel scoreInput(); }
@EnableBinding({ReceiverChannel.class}) public class ReceiverServerConfig { @StreamListener(ReceiverChannel.SCORE_INPUT) public void receive(Object object) { System.out.println(object); } }
4.注意事项
spring.cloud.stream.bindings.mqScoreInput.group=queueName2 spring.cloud.stream.bindings.mqScoreInput.destination=exchangeName2 spring.cloud.stream.bindings.mqScoreOutput.destination=exchangeName2
public interface SendChannel { String SCORE_OUPUT = "mqScoreInput"; @Output(SendChannel.SCORE_OUPUT) @Qualifier("mqScoreOutput") MessageChannel scoreOutput(); }
5.Spring Integration 原生支持接收消息
spring.cloud.stream.bindings.mq1.group=queueName
spring.cloud.stream.bindings.mq1.destination=exchangeName
public interface ReceiverChannel2 { String INPUT = "mq1"; @Input(INPUT) SubscribableChannel input(); }
@EnableBinding({ReceiverChannel2.class}) public class ReceiverServerConfig2 { @ServiceActivator(inputChannel = ReceiverChannel2.INPUT) public void receive(Object object) { System.out.println(object); } }
6.消息转换
spring.cloud.stream.bindings.mqScoreInput.group=queueName2 spring.cloud.stream.bindings.mqScoreInput.destination=exchangeName2 spring.cloud.stream.bindings.mqScoreInput.contentType=application/json spring.cloud.stream.bindings.mqScoreOutput.destination=exchangeName2 spring.cloud.stream.bindings.mqScoreOutput.contentType=application/json
@Service public class SendServer { @Autowired private SendChannel sendChannel; @Autowired @Qualifier("mqScoreOutput") private MessageChannel mqScoreOutput; public void send1() { User u = new User(); u.setId(1L); u.setName("fffffsend1"); Message<User> fffff = MessageBuilder.withPayload(u).build(); sendChannel.scoreOutput().send(fffff); System.out.println("发送消息send1"); } public void send2() { User u = new User(); u.setId(2L); u.setName("fffffsend2"); Message<User> fffff = MessageBuilder.withPayload(u).build(); mqScoreOutput.send(fffff); System.out.println("发送消息send2"); } }
@EnableBinding({ReceiverChannel.class}) public class ReceiverServerConfig { @StreamListener(ReceiverChannel.SCORE_INPUT) public void receive(User user) { System.out.println(user); } }
@EnableBinding({ReceiverChannel.class}) public class ReceiverServerConfig { @StreamListener(ReceiverChannel.SCORE_INPUT) @SendTo(SendServer2.SendChannel2.OUTPUT) public Object receive(User user) { user.setId(5555L); return user; } }
https://blog.csdn.net/supper10090/article/details/78295682
坑1.Dispatcher has no subscribers for channel
需要注意的是,最好不要自定义输入输出在同一个类里面。
这样,如果我们只调用生产者发送消息。会导致提示Dispatcher has no subscribers for channel。
并且会让我们发送消息的次数莫名减少几次。详细情况可以查看gihub官方issue,也给出的这种解决方法 官方解决方式
坑2.stream生成的exchang默认是topic模式。就是按照前缀匹配,发送消息给对应的队列。
*(星号):可以(只能)匹配一个单词。
说明:"*" 操作符将 “.”视为分隔符
$channel->queue_bind('msg-inbox-logs','logs-exchange','*.msg-inbox') 单个”.“把路由键分为了几个部分, ”*“匹配特定位置的做任意文本 msg-inbox-logs队列将会接收从routingKey为error.msg-inbox,warn.msg-inbox,info.msg-inbox ,即routingKey为 ”任意不带.的字符.msg-inbox“ 的消息,都可以收到
#(井号):可以匹配多个单词(或者零个)
说明:“#”操作符没有分块的概念,它将任意“.”字符均视为关键字的匹配部分
为了实现匹配所有规则,你可以使用“#”字符:
$channel-->queue_bind('all-logs','logs-exchange','#')
通过这种的绑定方式,all-logs队列将会接收所有从Web应用程序发布的日志消息。
fanout:广播模式,发送到所有的队列
direct:直传。完全匹配routingKey的队列可以收到消息。
坑3.默认消息异常之后,都会往死消息队列里面写,然而异常是放到一个header里面去的。默认消息队列支持的最大frame_max 是128kb,超过这个大小,服务器就主动给你关闭连接,然后把你的消息会不断的重试。
坑4.看到国内好多博客,使用@Input和@output都是用MessageChannel,这是不对的。@Output对MessageChannel,@Input对应SubscribableChannel 。切记!
坑5.我使用的stream版本是1.2.1,springboot版本时1.5.6。没有办法使用routingkey属性,即在spring.cloud.stream.rabbit这个属性无法显示。
应该是我的stream版本偏低吧。遇到这种情况,大家果断换新版本,或者使用自带的ampq来实现吧。
坑6.stream的destination对应生成rabbitmq的exchange。加上了group后,例如destination:wx-consumer,group:queue。那么经过stream后队列名称会变成wx-consumer.queue。
如果使用group对应的是持久化队列,不会被rabbitmq删除。
https://www.jianshu.com/p/bf992c23c381