zoukankan      html  css  js  c++  java
  • Spring Cloud Stream

    stream这个项目让我们不必通过繁琐的自定义ampq来建立exchange,通道名称,以及队列名称和路由方式。
    只需要简单几步我们就轻松使用stream完成推送到rabbitmq和kafafa,并完成监听工作。

    spring.cloud.stream.rabbit.bindings.demo-input.consumer.binding-routing-key=rk.demo
    spring.cloud.stream.bindings.demo-input.binder=rabbit
    spring.cloud.stream.bindings.demo-input.destination=exchange.demo
    spring.cloud.stream.bindings.demo-input.content-type=application/json
    spring.cloud.stream.bindings.demo-input.group=queues.dem

    对于RabbitMQ,destination 对应的是exchange,group对应的是queue(带有前缀)
    如果没有配置routing-key,那么会使用默认值 #

    1.stream 配置中的含义

    当我们使用默认group,和destination时默认创建名为input 的持久化的topic类型的exchange。

    队列是一个destination前缀然后一个随你名称的自动删除队列。

    现在我们自定义group,destination。如下:

    spring.cloud.stream.bindings.input.group=queueName
    spring.cloud.stream.bindings.input.destination=exchangeName 

    根据destination,帮我们创建了一个持久化的topic类型的exchange。




    根据group,帮我们创建了一个持久化的queue。

    总结:对于RabbitMQ,destination 对应的是exchange,group对应的是queue(带有前缀)。
    对于kafka,destination 对应的是Topic,group就是对应的消费group。
    对于一个应用集群,如果不需要重复消费消息,必须定义group,否则不必定义group(比如刷新配置消息)。

    2.发送消息

    1.定义通道,通道名为mqScoreOutput
    spring.cloud.stream.bindings.mqScoreOutput.destination=exchangeName2
    2.定义接口
    public interface SendChannel {
    
        String SCORE_OUPUT = "mqScoreOutput";
    
        @Output(SendChannel.SCORE_OUPUT)
        MessageChannel scoreOutput();
    
    }
    3.进行绑定
    @EnableBinding(SendChannel.class)
    public class SendServerConfig {
    
    }
    4.注入可以发送消息的bean
    @Service
    public class SendServer {
    
        @Autowired
        private SendChannel sendChannel;
    
        @Autowired
        private MessageChannel mqScoreOutput;
    
        public void send1() {
            Message<String> fffff = MessageBuilder.withPayload("fffffsend1").build();
            sendChannel.scoreOutput().send(fffff);
            System.out.println("发送消息send1");
        }
    
        public void send2() {
            Message<String> fffff = MessageBuilder.withPayload("fffffsend2").build();
            mqScoreOutput.send(fffff);
            System.out.println("发送消息send2");
        }
    }

    3.接收消息

    1.定义通道
    spring.cloud.stream.bindings.mqScoreInput.group=queueName2
    spring.cloud.stream.bindings.mqScoreInput.destination=exchangeName2
    2.定义接口
    public interface ReceiverChannel {
    
        String SCORE_INPUT = "mqScoreInput";
    
        @Input(ReceiverChannel.SCORE_INPUT)
        SubscribableChannel scoreInput();
    
    
    }
    3.进行绑定
    @EnableBinding({ReceiverChannel.class})
    public class ReceiverServerConfig {
    
        @StreamListener(ReceiverChannel.SCORE_INPUT)
        public void receive(Object object) {
            System.out.println(object);
        }
    
    }

    4.注意事项

    接收消息通道和发送消息通道名不可以重复。即使destination一样。例如:
    spring.cloud.stream.bindings.mqScoreInput.group=queueName2
    spring.cloud.stream.bindings.mqScoreInput.destination=exchangeName2
    
    spring.cloud.stream.bindings.mqScoreOutput.destination=exchangeName2
    
    我们不用mqScoreOutput 这个通道名,发送消息的时候也用mqScoreInput.
    
    public interface SendChannel {
    
        String SCORE_OUPUT = "mqScoreInput";
    
        @Output(SendChannel.SCORE_OUPUT)
        @Qualifier("mqScoreOutput")
        MessageChannel scoreOutput();
    
    
    }
    接收消息的时候报如下错误。
    Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
     

    5.Spring Integration 原生支持接收消息

    1.定义通道
    spring.cloud.stream.bindings.mq1.group=queueName
    spring.cloud.stream.bindings.mq1.destination=exchangeName
    
    2.定义接口
    
    public interface ReceiverChannel2 {
        String INPUT = "mq1";
    
        @Input(INPUT)
        SubscribableChannel input();
    
    }
    
    3.进行绑定
    
    @EnableBinding({ReceiverChannel2.class})
    public class ReceiverServerConfig2 {
    
    
        @ServiceActivator(inputChannel = ReceiverChannel2.INPUT)
        public void receive(Object object) {
            System.out.println(object);
        }
    
    
    }
    
    

    6.消息转换

    1.配置contentType类型
    spring.cloud.stream.bindings.mqScoreInput.group=queueName2
    spring.cloud.stream.bindings.mqScoreInput.destination=exchangeName2
    spring.cloud.stream.bindings.mqScoreInput.contentType=application/json
    
    spring.cloud.stream.bindings.mqScoreOutput.destination=exchangeName2
    spring.cloud.stream.bindings.mqScoreOutput.contentType=application/json
    
    2.发送对象
    
    @Service
    public class SendServer {
    
        @Autowired
        private SendChannel sendChannel;
    
        @Autowired
        @Qualifier("mqScoreOutput")
        private MessageChannel mqScoreOutput;
    
        public void send1() {
            User u = new User();
            u.setId(1L);
            u.setName("fffffsend1");
            Message<User> fffff = MessageBuilder.withPayload(u).build();
            sendChannel.scoreOutput().send(fffff);
            System.out.println("发送消息send1");
        }
    
        public void send2() {
            User u = new User();
            u.setId(2L);
            u.setName("fffffsend2");
            Message<User> fffff = MessageBuilder.withPayload(u).build();
            mqScoreOutput.send(fffff);
            System.out.println("发送消息send2");
        }
    }
    
    3.接收对象
    
    @EnableBinding({ReceiverChannel.class})
    public class ReceiverServerConfig {
    
        @StreamListener(ReceiverChannel.SCORE_INPUT)
        public void receive(User user) {
            System.out.println(user);
        }
    
    
    }
    
    
    7.消息反馈
    @EnableBinding({ReceiverChannel.class})
    public class ReceiverServerConfig {
    
        @StreamListener(ReceiverChannel.SCORE_INPUT)
        @SendTo(SendServer2.SendChannel2.OUTPUT)
        public Object receive(User user) {
            user.setId(5555L);
            return user;
        }
    
    
    }
    使用SendTo 指定通道即可。

     https://blog.csdn.net/supper10090/article/details/78295682

    坑1.Dispatcher has no subscribers for channel
    需要注意的是,最好不要自定义输入输出在同一个类里面。
    这样,如果我们只调用生产者发送消息。会导致提示Dispatcher has no subscribers for channel。
    并且会让我们发送消息的次数莫名减少几次。详细情况可以查看gihub官方issue,也给出的这种解决方法 官方解决方式

    坑2.stream生成的exchang默认是topic模式。就是按照前缀匹配,发送消息给对应的队列。

    *(星号):可以(只能)匹配一个单词。 
    说明:"*" 操作符将 “.”视为分隔符

    $channel->queue_bind('msg-inbox-logs','logs-exchange','*.msg-inbox')
    
    单个”.“把路由键分为了几个部分,
    ”*“匹配特定位置的做任意文本
    
    msg-inbox-logs队列将会接收从routingKey为error.msg-inbox,warn.msg-inbox,info.msg-inbox ,即routingKey为 ”任意不带.的字符.msg-inbox“ 的消息,都可以收到


    #(井号):可以匹配多个单词(或者零个)
    说明:“#”操作符没有分块的概念,它将任意“.”字符均视为关键字的匹配部分
    为了实现匹配所有规则,你可以使用“#”字符:

    $channel-->queue_bind('all-logs','logs-exchange','#')
    通过这种的绑定方式,all-logs队列将会接收所有从Web应用程序发布的日志消息。


    fanout:广播模式,发送到所有的队列
    direct:直传。完全匹配routingKey的队列可以收到消息。

    坑3.默认消息异常之后,都会往死消息队列里面写,然而异常是放到一个header里面去的。默认消息队列支持的最大frame_max 是128kb,超过这个大小,服务器就主动给你关闭连接,然后把你的消息会不断的重试。

    坑4.看到国内好多博客,使用@Input和@output都是用MessageChannel,这是不对的。@Output对MessageChannel,@Input对应SubscribableChannel 。切记!

    坑5.我使用的stream版本是1.2.1,springboot版本时1.5.6。没有办法使用routingkey属性,即在spring.cloud.stream.rabbit这个属性无法显示。
    应该是我的stream版本偏低吧。遇到这种情况,大家果断换新版本,或者使用自带的ampq来实现吧。

    坑6.stream的destination对应生成rabbitmq的exchange。加上了group后,例如destination:wx-consumer,group:queue。那么经过stream后队列名称会变成wx-consumer.queue。
    如果使用group对应的是持久化队列,不会被rabbitmq删除。

    https://www.jianshu.com/p/bf992c23c381

  • 相关阅读:
    POJ 2260
    安防监控 —— 数据上行刷新与命令下发过程
    安防监控 —— 主框架搭建
    安防监控 —— 软硬件环境分析与通信协议制定
    安防监控项目 —— 需求分析
    安防监控项目 —— 环境搭建
    Linux下I2C总线驱动框架
    IIC总线硬件工作原理(待完善)
    linux驱动面试(转)
    驱动开发 —— 输入子系统(工作逻辑分析)
  • 原文地址:https://www.cnblogs.com/softidea/p/13832714.html
Copyright © 2011-2022 走看看