zoukankan      html  css  js  c++  java
  • Spring Cloud Stream消息驱动@SendTo和消息降级

    参考程序员DD大佬的文章,自己新建demo学习学习,由于需要消息回执,看到了@SendTo这个注解能够实现,下面开始学习demo,新建两个项目cloud-stream-consumer消费端 和 cloud-stream-consumer 生产端

    public interface StreamReceive {
    
        @Input("MQRece")
        SubscribableChannel mqReceive();
    }
    

    添加一个StreamReceive接口,定义@input通道

    @Component
    @Slf4j
    public class ReceiveListener {
    
        @StreamListener("MQRece")
        public byte[] receive(byte[] bytes){
            log.info("接受消息:"+new String(bytes));
            return "ok".getBytes();
        }
    
    }
    

    添加消息监听,接受消息定义为byte[]

    添加application.properties配置文件信息

    spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.211.11:9876
    spring.cloud.stream.bindings.MQRece.destination=message-topic
    spring.cloud.stream.bindings.MQRece.group=rece-group
    server.port=19999
    

    为MQRece通道添加主题message-topic,组名rece-group

    到此Stream 客户端消费就完成了,本节需要把@SendTo注解用起来,需要新建一个MessageChannel进行产生消息

    public interface MsgBackPush {
        
        @Output("back-push")
        MessageChannel backPush();
    }
    

    然后在ReceiveListener添加@SendTo

    @Component
    @Slf4j
    public class ReceiveListener {
    
        @StreamListener("MQRece")
        @SendTo("back-push")
        public byte[] receive(byte[] bytes){
            log.info("接受消息:"+new String(bytes));
            return "ok".getBytes();
        }
    
    }
    
    

    新增通道配置application.properties

    spring.cloud.stream.bindings.back-push.destination=back-topic
    spring.cloud.stream.bindings.back-push.group=back-group
    

    SpringBoot启动类记得添加 @EnableBinding(value = {StreamReceive.class,MsgBackPush.class})

    @SpringBootApplication
    @EnableBinding(value = {StreamReceive.class,MsgBackPush.class})
    public class CloudStreamConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(CloudStreamConsumerApplication.class, args);
        }
    
    }
    
    

    到此,cloud-stream-consumer这个demo就完成了

    接下来看看 cloud-stream-producer

    public interface StreamPush {
    
        @Output("MQPush")
        MessageChannel mqPush();
    }
    
    

    定义一个通道名为MQPush,进行消息生产

    public interface ProducerReceive {
    
    
        @Input("producer-receive")
        SubscribableChannel producerReceive();
    }
    

    定义一个通道名为producer-receive,进行回执消息的消费

    @Component
    @Slf4j
    public class ProducerListener {
        @StreamListener("producer-receive")
        public void producerReceive(byte[] bytes){
            log.info("come back message:"+new String(bytes));
        }
    }
    

    具体回执消息处理逻辑,再来看看application.properties

    spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.214.191:9876
    spring.cloud.stream.bindings.MQPush.destination=message-topic
    spring.cloud.stream.bindings.MQPush.group=push-group
    
    
    spring.cloud.stream.bindings.producer-receive.destination=back-topic
    spring.cloud.stream.bindings.producer-receive.group=back-group
    
    
    server.port=20000
    

    为通道设置topic和group,新建一个Http接口测试一下成果

    @SpringBootApplication
    @EnableBinding(value = {StreamPush.class,ProducerReceive.class})
    @RestController
    public class CloudStreamProducerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(CloudStreamProducerApplication.class, args);
        }
    
        @Autowired
        private StreamPush streamPush;
    
        @GetMapping("/sendMessage")
        public String sendMessage(){
            streamPush.mqPush().send(MessageBuilder.withPayload("message body".getBytes()).build());
            return "ok";
        }
    }
    

    访问http://localhost:20000/sendMessage,结果图如下

    cloud-stream-consumer日志输出

    file

    cloud-stream-producer日志输出

    file

    学习@ServiceActivator这个注解,上面的项目cloud-stream-consumer ReceiveListener类中添加

    @Component
    @Slf4j
    public class ReceiveListener {
    
        @StreamListener("MQRece")
        @SendTo("back-push")
        public byte[] receive(byte[] bytes){
            log.info("接受消息:"+new String(bytes));
    				// 抛出异常
            if(1==1){
                throw new RuntimeException("Message consumer failed!");
            }
            return "ok".getBytes();
        }
    
        @Autowired
        private MsgBackPush msgBackPush;
    
        @ServiceActivator(inputChannel = "message-topic.rece-group.errors")
        public void error(Message<?> message){
            log.info("消费者消费消息失败:"+message);
            msgBackPush.backPush().send(MessageBuilder.withPayload("消息消费失败".getBytes()).build());
        }
    }
    

    通过使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某个通道的错误处理映射。其中,inputChannel的配置中对应关系如下:

    • message-topic:消息通道对应的目标(destination,即:spring.cloud.stream.bindings.MQRece.destination的配置)
    • rece-group:消息通道对应的消费组(group,即:spring.cloud.stream.bindings.MQRece.group的配置)

    访问http://localhost:20000/sendMessage,结果图如下

    cloud-stream-consumer日志输出

    file

    cloud-stream-producer日志输出

    file

    个人联系方式QQ:944484545,欢迎大家的加入,分享学习是一件开心事
  • 相关阅读:
    企业应用的互联网化
    企业应用开发平台GAP平台
    技术有什么用?
    Samba服务器配置
    IT从业人员需要知道的10个小秘密
    在雪豹10.6.2(Mac OS X)上安装Oracle10g
    项目经理的职责
    通过堡垒机远程连接Windows Server
    MySQL:Data truncated for column 'last_apply_time' at row 1
    当有莫名其妙的错误里, 可以
  • 原文地址:https://www.cnblogs.com/hy-xiaobin/p/12175120.html
Copyright © 2011-2022 走看看