zoukankan      html  css  js  c++  java
  • Spring Cloud Stream

      在实际的企业开发中,消息中间件是至关重要的组件之一。消息中间件主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。不同的中间件其实现方式,内部结构是不一样的。如常见的RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream 给我们提供了一种解耦合的方式。

      Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。)通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。
        

     入门案例:通过rabbitMQ作为消息中间件

      消息生产者:

        (1)创建工程引入依赖

        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            </dependency>
        </dependencies>

        (2)定义bingding

          发送消息时需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 SpringCloud Stream 内置的接口:

    public interface Source {
        String OUTPUT = "output";
        @Output("output")
        MessageChannel output();
    }

          接口声明了一个 binding 命名为 “output”。这个binding 声明了一个消息输出流,也就是消息的生产者。

        (3)配置application.yml

    server:
      port: 7001 #服务端口
    spring:
      application:
        name: stream_producer #指定服务名
      rabbitmq:
        addresses: 127.0.0.1
        username: guest
        password: guest
      cloud:
        stream:
          bindings:
            output:
              destination: fan-default #指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 fan-default 的所有消息队列中。
              contentType: text/plain #用于指定消息的类型。
          binders:
            defaultRabbit:
              type: rabbit

        (4)消息发送工具类

    // 负责向中间件发送数据
    @Component
    @EnableBinding(Source.class)
    public class MessageSender {
    
        @Autowired
        private MessageChannel output; // 接口来源于Source.class
    
        public void send(Object obj) {
            //发送MQ消息
            output.send(MessageBuilder.withPayload(obj).build());
        }
    }

        (5)启动类

    @SpringBootApplication
    public class ProducerApplication {
        public static void main(String[] args) {
            SpringApplication.run(ProducerApplication.class);
        }
    }

        (6)测试类

    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest
    public class ProducerTest {
    
        @Autowired
        private MessageSender messageSender;
    
        @Test
        public void testSend() {
            messageSender.send("hello word");
        }
    }

        (7)运行启动类,访问RabbitMQ地址 http://127.0.0.1:15672/ 即可看到发送的消息

          

       消息消费者:

        (1)创建工程引入依赖

        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
        </dependencies>

        (2)定义bingding

          同发送消息一致,在Spring Cloud Stream中接收消息,需要定义一个接口,如下是内置的一个接口。

    public interface Sink {
        String INPUT = "input";
        @Input("input")
        SubscribableChannel input();
    }

          接口声明了一个 binding 命名为 “input” 。注释 @Input 对应的方法,需要返回 SubscribableChannel

        (3)配置application.yml

    server:
      port: 7002 #服务端口
    spring:
      application:
        name: stream_consumer #指定服务名
      rabbitmq:
        addresses: 127.0.0.1
        username: guest
        password: guest
      cloud:
        stream:
          bindings:
            input:
              destination: fan-default
          binders:
            defaultRabbit:
              type: rabbit

        (4)消息监听类

    @Component
    @EnableBinding(Sink.class)
    public class MessageListener {
    
        // 监听 binding 为 Sink.INPUT 的消息
        @StreamListener(Sink.INPUT)
        public void input(Message<String> message) {
            System.out.println("监听收到:" + message.getPayload());
        }
    }

        (5)启动类

    @SpringBootApplication
    public class ConsumerApplication {
        public static void main(String[] args) {
            SpringApplication.run(ConsumerApplication.class);
        }
    }

        (6) 运行启动类后,再去运行消息生产者的测试类,即可在消息消费者控制台接收到消息

    自定义消息通道:

    public interface OrderProcessor {
    
        String INPUT_ORDER = "inputOrder";
        String OUTPUT_ORDER = "outputOrder";
    
        @Input(INPUT_ORDER)
        SubscribableChannel inputOrder();
    
        @Output(OUTPUT_ORDER)
        MessageChannel outputOrder();
    }

      Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,而在我们实际使用中,往往是需要定义各种输入输出流

        一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个订单输入和订单输出两个 binding。
        使用时,需要在 @EnableBinding 注解中,添加自定义的接口。
        使用 @StreamListener 做监听的时候,需要指定 OrderProcessor.INPUT_ORDER

        消息发送工具类要指定注入的通道:

          

        配置文件也要修改:

          

     消息分组:

      通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。

      只需要在每个服务消费者端设置spring.cloud.stream.bindings.input.group 属性即可

        

     消息分区:

      有一些场景需要满足, 同一个特征的数据被同一个实例消费, 比如同一个id的传感器监测数据必须被同一个实例统计计算分析, 否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求至同一实例。

      消息生产者配置:

        1. spring.cloud.stream.bindings.output.producer.partitionKeyExpression :通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;

        2. spring.cloud.stream.bindings.output.producer.partitionCount :该参数指定了消息分区的数量。
        

       消息消费者配置:

        1. spring.cloud.stream.bindings.input.consumer.partitioned :通过该参数开启消费者分区功能;

        2. spring.cloud.stream.instanceCount :该参数指定了当前消费者的总实例数量;

        3. spring.cloud.stream.instanceIndex :该参数设置当前实例的索引号,从0开始,最大值为spring.cloud.stream.instanceCount 参数 - 1。

        

  • 相关阅读:
    leetcode701. Insert into a Binary Search Tree
    leetcode 958. Check Completeness of a Binary Tree 判断是否是完全二叉树 、222. Count Complete Tree Nodes
    leetcode 110. Balanced Binary Tree
    leetcode 104. Maximum Depth of Binary Tree 111. Minimum Depth of Binary Tree
    二叉树
    leetcode 124. Binary Tree Maximum Path Sum 、543. Diameter of Binary Tree(直径)
    5. Longest Palindromic Substring
    128. Longest Consecutive Sequence
    Mac OS下Android Studio的Java not found问题,androidfound
    安卓 AsyncHttpClient
  • 原文地址:https://www.cnblogs.com/roadlandscape/p/12937796.html
Copyright © 2011-2022 走看看