zoukankan      html  css  js  c++  java
  • SpringCloud-Stream消息通信

    SpringCloud微服务实战系列教程                                                                                                                             

      Spring Cloud Stream 消息驱动组件帮助我们更快速,更⽅便,更友好的去构建消息驱动微服务的。当时定时任务和消息驱动的⼀个对⽐。(消息驱动:基于消息机制做⼀些事情)MQ:消息队列/消息中间件/消息代理,产品有很多,ActiveMQ RabbitMQ RocketMQ Kafka

    一、 Stream解决的痛点问题

      MQ消息中间件⼴泛应⽤在应⽤解耦合、异步消息处理、流量削峰等场景中。不同的MQ消息中间件内部机制包括使⽤⽅式都会有所不同,⽐如RabbitMQ中有Exchange(交换机/交换器)这⼀概念,kafka有Topic、Partition分区这些概念,MQ消息中间件的差异性不利于我们上层的开发应⽤,当我们的系统希望从原有的RabbitMQ切换到Kafka时,我们会发现切换⽐较困难,很多要操作可能重来(因为⽤程序和具体的某⼀款MQ消息中间件耦合在⼀起了)。
      Spring Cloud Stream进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀
    来,我们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream⽀持RabbitMQ和Kafka。

    二、Stream重要概念

      Spring Cloud Stream 是⼀个构建消息驱动微服务的框架。应⽤程序通过inputs(相当于消息消费者consumer)或者outputs(相当于消息⽣产者producer)来与Spring Cloud Stream中的binder对象交互,⽽Binder对象是⽤来屏蔽底层MQ细节的,它负责与具体的消息中间件交互。
      Binder绑定器是Spring Cloud Stream 中⾮常核⼼的概念,就是通过它来屏蔽底层不同MQ消息中间件的细节差异,当需要更换为其他消息中间件时,我们需要做的就是更换对应的Binder绑定器⽽不需要修改任何应⽤逻辑(Binder绑定器的实现是框架内置的,Spring Cloud Stream⽬前⽀持Rabbit、Kafka两种消息队列)

    三、Stream消息通信⽅式

      Stream中的消息通信⽅式遵循了发布—订阅模式。在Spring Cloud Stream中的消息通信⽅式遵循了发布-订阅模式,当⼀条消息被投递到消息中间件之后,它会通过共享的 Topic 主题进⾏⼴播,消息消费者在订阅的主题中收到它并触发⾃身的业务逻辑处理。这⾥所提到的 Topic 主题是Spring Cloud Stream中的⼀个抽象概念,⽤来代表发布共享消息给消费者的地⽅。在不同的消息中间件中, Topic 可能对应着不同的概念,⽐如:在RabbitMQ中的它对应了Exchange、在Kakfa中则对应了Kafka中的Topic。

     四、基于RabbitMQ应用

      第一步:构建消息生产者

        1、引入依赖pom.ml

            <!--eureka client 客户端依赖引⼊-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
            <!--spring cloud stream 依赖(rabbit)-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>

        2、添加配置

        注意:标红的位置应设置相同的binder,蓝色位置必须为output,被默认框架定义好的

    server:
      port: 9090
    spring:
      application:
        name: stream-provider
      cloud:
        stream:
          binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
            cityRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联
              type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
              environment: # MQ环境配置(⽤户名、密码等)
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 关联整合通道和binder对象
            output: # output是我们定义的通道名称,此处不能乱改
              destination: cityExchange # 要使⽤的Exchange名称(消息队列主题名称)
              content-type: text/plain # application/json # 消息类型设置,⽐如json
              binder: cityRabbitBinder # 关联MQ服务
    eureka:
      client:
        serviceUrl: # eureka server的路径
          defaultZone: http://localhost:8761/eureka/
        instance:
          prefer-ip-address: true #使⽤ip注册

      3、消息发送通过source对象

    package city.alber;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.support.MessageBuilder;
    
    /**
     * @author niunafei
     * @function
     * @email niunafei0315@163.com
     * @date 2020/9/24  2:26 PM
     * Source.class⾥⾯就是对输出通道的定义(这是Spring Cloud Stream内置的通道封装)
     */
    @EnableBinding(Source.class)
    public class ProviderService {
        /**
         * 将MessageChannel的封装对象Source注⼊到这⾥使⽤
         */
        @Autowired
        private Source source;
    
        public void sendMessage(String content) {
            // 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)
            // 使⽤通道向外发出消息(指的是Source⾥⾯的output通道)
            source.output().send(MessageBuilder.withPayload(content).build());
        }
    }

      4、启动类添加@EnableDiscoveryClient 注解,启动

     第二步:构建消息消费者

      下面的内容是和生产者不一致的地方

      1、配置,标蓝地方为不同点

    server:
      port: 9091
    spring:
      application:
        name: stream-consumer
      cloud:
        stream:
          binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
            cityRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联
              type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
              environment: # MQ环境配置(⽤户名、密码等)
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 关联整合通道和binder对象
            input: # output是我们定义的通道名称,此处不能乱改
              destination: cityExchange # 要使⽤的Exchange名称(消息队列主题名称)
              content-type: text/plain # application/json # 消息类型设置,⽐如json,自动将对象转为json
              binder: cityRabbitBinder # 关联MQ服务
    eureka:
      client:
        serviceUrl: # eureka server的路径
          defaultZone: http://localhost:8761/eureka/
        instance:
          prefer-ip-address: true #使⽤ip注册

        2、接收消费消息类

    package city.albert;
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    
    /**
     * @author niunafei
     * @function
     * @email niunafei0315@163.com
     * @date 2020/9/24  2:43 PM
     */
    @EnableBinding(Sink.class)
    public class ConsumerMsg {
        @StreamListener(Sink.INPUT)
        public void recevieMessages(Message<String> message) {
            System.out.println("=========接收到的消息:" + message);
        }
    }

    五、定义输出

      定一使用与上面配置中蓝色字体的input/ouput类似,定义完成配置应该为 inputSysLog/outputSysLog

      1、定义通过接口,目的是给生产者消费者调用

    package city.albert;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
    
    /**
     * @author niunafei
     * @function
     * @email niunafei0315@163.com
     * @date 2020/9/24  3:11 PM
     */
    public interface CustomStreamConfig {
    
        String INPUT_SYS_LOG = "inputSysLog";
        String OUTPUT_SYS_LOG = "outputSysLog";
    
        @Input(INPUT_SYS_LOG)
        SubscribableChannel inputSysLog();
    
        @Output(OUTPUT_SYS_LOG)
        MessageChannel outputSysLog();
    
        /**
         * 。。。。。可以跟上面一样定义多个通道信息
         */
    }
    View Code

      2、生产者调用

    package city.albert;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    
    /**
     * @author niunafei
     * @function
     * @email niunafei0315@163.com
     * @date 2020/9/24  3:14 PM
     *
     */
    @EnableBinding(CustomStreamConfig.class)
    public class CustomStreamProvider {
    
        @Autowired
        CustomStreamConfig customStreamConfig;
    
        public void sendMessage(String content) {
            // 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)
            // 使⽤通道向外发出消息(指的是Source⾥⾯的output通道)
            customStreamConfig.outputSysLog().send(MessageBuilder.withPayload(content).build());
        }
    }
    View Code

      3、消费者调用

    package city.albert;
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.messaging.Message;
    
    /**
     * @author niunafei
     * @function
     * @email niunafei0315@163.com
     * @date 2020/9/24  3:14 PM
     *
     */
    @EnableBinding(CustomStreamConfig.class)
    public class CustomStreamConsumer {
    
        @StreamListener(CustomStreamConfig.INPUT_SYS_LOG)
        public void messages(Message<String> message) {
            System.out.println("=========接收到的消息:" + message);
        }
    }
    View Code

    六、分组

      同组内一条消息,只能一个消费者获取,添加配置即可

     
  • 相关阅读:
    Windows中启动和关闭Oracle服务的x.bat批处理文件
    Flex删除文件 SecurityError: fileWriteResource
    Eclipse中的@Override标注报错
    Flex删除文件 SecurityError: fileWriteResource
    commonsfileupload 上传文件乱码问题
    Eclipse中的@Override标注报错
    Windows中启动和关闭Oracle服务的x.bat批处理文件
    csdn论坛最近又有人灌水
    用JavaScript获取URL中的参数值
    用JavaScript获取URL中的参数值
  • 原文地址:https://www.cnblogs.com/niunafei/p/13603357.html
Copyright © 2011-2022 走看看