zoukankan      html  css  js  c++  java
  • Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

    应用场景

    有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:

    @StreamListener(value = TestTopic.INPUT)
    public void receiveV1(String payload, @Header("version") String version) {
        if("1.0".equals(version)) {
            // Version 1.0
        }
        if("2.0".equals(version)) {
            // Version 2.0
        }
    }
    

    那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,可以用来优化这样的处理结构。

    动手试试

    下面通过编写一个简单的例子来具体体会一下这个属性的用法:

    @EnableBinding(TestApplication.TestTopic.class)
    @SpringBootApplication
    public class TestApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(TestApplication.class, args);
        }
    
        @RestController
        static class TestController {
    
            @Autowired
            private TestTopic testTopic;
    
            /**
             * 消息生产接口
             *
             * @param message
             * @return
             */
            @GetMapping("/sendMessage")
            public String messageWithMQ(@RequestParam String message) {
                testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());
                testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());
                return "ok";
            }
    
        }
    
        /**
         * 消息消费逻辑
         */
        @Slf4j
        @Component
        static class TestListener {
    
            @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
            public void receiveV1(String payload, @Header("version") String version) {
                log.info("Received v1 : " + payload + ", " + version);
            }
    
            @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
            public void receiveV2(String payload, @Header("version") String version) {
                log.info("Received v2 : " + payload + ", " + version);
            }
    
        }
    
        interface TestTopic {
    
            String OUTPUT = "example-topic-output";
            String INPUT = "example-topic-input";
    
            @Output(OUTPUT)
            MessageChannel output();
    
            @Input(INPUT)
            SubscribableChannel input();
    
        }
    
    }
    

    内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类TestListener中,对TestTopic.INPUT通道定义了两个@StreamListener,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的version值来做不同的处理逻辑分发。

    在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

    spring.cloud.stream.bindings.example-topic-input.destination=test-topic
    spring.cloud.stream.bindings.example-topic-input.group=stream-content-route
    spring.cloud.stream.bindings.example-topic-output.destination=test-topic
    

    完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

    2018-12-24 15:50:33.361  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v1 : hello, 1.0
    2018-12-24 15:50:33.363  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v2 : hello, 2.0
    

    从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。

    本文首发:http://blog.didispace.com/spring-cloud-starter-finchley-7-6/

  • 相关阅读:
    爬取校园新闻首页的新闻的详情,使用正则表达式,函数抽离
    网络爬虫基础练习
    Mysql 使用 select into outfile
    Mysql 使用CMD 登陆
    使用Clean() 去掉由函数自动生成的字符串中的双引号
    Get Resultset from Oracle Stored procedure
    获取引用某个主键的所有外键的表
    Entity Framework 丢失数据链接的绑定,在已绑好的EDMX中提示“Choose Your Data Connection”
    添加MySql Metat Database 信息
    at System.Data.EntityClient.EntityConnection.GetFactory(String providerString)
  • 原文地址:https://www.cnblogs.com/didispace/p/10187798.html
Copyright © 2011-2022 走看看