zoukankan      html  css  js  c++  java
  • Spring Cloud Stream

      在实际的企业开发中,消息中间件是至关重要的组件之一。消息中间件主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。不同的中间件其实现方式,内部结构是不一样的。如常见的RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream 给我们提供了一种解耦合的方式。

      Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。)通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。
        

     入门案例:通过rabbitMQ作为消息中间件

      消息生产者:

        (1)创建工程引入依赖

        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            </dependency>
        </dependencies>

        (2)定义bingding

          发送消息时需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 SpringCloud Stream 内置的接口:

    public interface Source {
        String OUTPUT = "output";
        @Output("output")
        MessageChannel output();
    }

          接口声明了一个 binding 命名为 “output”。这个binding 声明了一个消息输出流,也就是消息的生产者。

        (3)配置application.yml

    server:
      port: 7001 #服务端口
    spring:
      application:
        name: stream_producer #指定服务名
      rabbitmq:
        addresses: 127.0.0.1
        username: guest
        password: guest
      cloud:
        stream:
          bindings:
            output:
              destination: fan-default #指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 fan-default 的所有消息队列中。
              contentType: text/plain #用于指定消息的类型。
          binders:
            defaultRabbit:
              type: rabbit

        (4)消息发送工具类

    // 负责向中间件发送数据
    @Component
    @EnableBinding(Source.class)
    public class MessageSender {
    
        @Autowired
        private MessageChannel output; // 接口来源于Source.class
    
        public void send(Object obj) {
            //发送MQ消息
            output.send(MessageBuilder.withPayload(obj).build());
        }
    }

        (5)启动类

    @SpringBootApplication
    public class ProducerApplication {
        public static void main(String[] args) {
            SpringApplication.run(ProducerApplication.class);
        }
    }

        (6)测试类

    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest
    public class ProducerTest {
    
        @Autowired
        private MessageSender messageSender;
    
        @Test
        public void testSend() {
            messageSender.send("hello word");
        }
    }

        (7)运行启动类,访问RabbitMQ地址 http://127.0.0.1:15672/ 即可看到发送的消息

          

       消息消费者:

        (1)创建工程引入依赖

        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
        </dependencies>

        (2)定义bingding

          同发送消息一致,在Spring Cloud Stream中接收消息,需要定义一个接口,如下是内置的一个接口。

    public interface Sink {
        String INPUT = "input";
        @Input("input")
        SubscribableChannel input();
    }

          接口声明了一个 binding 命名为 “input” 。注释 @Input 对应的方法,需要返回 SubscribableChannel

        (3)配置application.yml

    server:
      port: 7002 #服务端口
    spring:
      application:
        name: stream_consumer #指定服务名
      rabbitmq:
        addresses: 127.0.0.1
        username: guest
        password: guest
      cloud:
        stream:
          bindings:
            input:
              destination: fan-default
          binders:
            defaultRabbit:
              type: rabbit

        (4)消息监听类

    @Component
    @EnableBinding(Sink.class)
    public class MessageListener {
    
        // 监听 binding 为 Sink.INPUT 的消息
        @StreamListener(Sink.INPUT)
        public void input(Message<String> message) {
            System.out.println("监听收到:" + message.getPayload());
        }
    }

        (5)启动类

    @SpringBootApplication
    public class ConsumerApplication {
        public static void main(String[] args) {
            SpringApplication.run(ConsumerApplication.class);
        }
    }

        (6) 运行启动类后,再去运行消息生产者的测试类,即可在消息消费者控制台接收到消息

    自定义消息通道:

    public interface OrderProcessor {
    
        String INPUT_ORDER = "inputOrder";
        String OUTPUT_ORDER = "outputOrder";
    
        @Input(INPUT_ORDER)
        SubscribableChannel inputOrder();
    
        @Output(OUTPUT_ORDER)
        MessageChannel outputOrder();
    }

      Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,而在我们实际使用中,往往是需要定义各种输入输出流

        一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个订单输入和订单输出两个 binding。
        使用时,需要在 @EnableBinding 注解中,添加自定义的接口。
        使用 @StreamListener 做监听的时候,需要指定 OrderProcessor.INPUT_ORDER

        消息发送工具类要指定注入的通道:

          

        配置文件也要修改:

          

     消息分组:

      通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。

      只需要在每个服务消费者端设置spring.cloud.stream.bindings.input.group 属性即可

        

     消息分区:

      有一些场景需要满足, 同一个特征的数据被同一个实例消费, 比如同一个id的传感器监测数据必须被同一个实例统计计算分析, 否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求至同一实例。

      消息生产者配置:

        1. spring.cloud.stream.bindings.output.producer.partitionKeyExpression :通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;

        2. spring.cloud.stream.bindings.output.producer.partitionCount :该参数指定了消息分区的数量。
        

       消息消费者配置:

        1. spring.cloud.stream.bindings.input.consumer.partitioned :通过该参数开启消费者分区功能;

        2. spring.cloud.stream.instanceCount :该参数指定了当前消费者的总实例数量;

        3. spring.cloud.stream.instanceIndex :该参数设置当前实例的索引号,从0开始,最大值为spring.cloud.stream.instanceCount 参数 - 1。

        

  • 相关阅读:
    引用 AspNetCoreRateLimit => StatusCode cannot be set because the response has already started.
    Sublime Json 格式化
    gitlab 建立本地仓库
    R语言 启动报错 *** glibc detected *** /usr/lib64/R/bin/exec/R: free(): invalid next size (fast): 0x000000000263a420 *** 错误 解决方案
    范数
    SparkR-Install
    R语言扩展包dplyr——数据清洗和整理
    R语言与机器学习学习笔记
    sparkR原理
    data.frame类型数据如何将第一列值替换为行号
  • 原文地址:https://www.cnblogs.com/roadlandscape/p/12937796.html
Copyright © 2011-2022 走看看