zoukankan      html  css  js  c++  java
  • springcloud学习(七)之Stream

    前言

    MQ消息中间件⼴泛应⽤在应⽤解耦合、异步消息处理、流量削峰等场景中。

    不同的MQ消息中间件内部机制包括使⽤⽅式都会有所不同,⽐如RabbitMQ中有Exchange(交换机/交换器)这⼀概念, kafka有Topic、 Partition分区这些概念, MQ消息中间件的差异性不利于我们上层的开发应⽤,当我们的系统希望从原有的RabbitMQ切换到Kafka时,我们会发现⽐较困难,很多要操作可能重来(因为应⽤程序和具体的某⼀款MQ消息中间件耦合在⼀起了)。

    Spring Cloud Stream进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀来,我们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream⽀持RabbitMQ和Kafka。

    本质: 屏蔽掉了底层不同MQ消息中间件之间的差异,统⼀了MQ的编程模型,降低了学习、开发、维护MQ的成本

    Stream简介

    Spring Cloud Stream 是⼀个构建消息驱动微服务的框架。应⽤程序通过inputs(相当于消息消费者consumer)或者outputs(相当于消息⽣产者producer)来与Spring Cloud Stream中的binder对象交互,⽽Binder对象是⽤来屏蔽底层MQ细节的,它负责与具体的消息中间件交互。

    说⽩了:对于我们来说,只需要知道如何使⽤Spring Cloud Stream与Binder对象交互即可

    Stream的几个常用注解

    注解 描述
    @Input(在消费者⼯程中使⽤) 注解标识输⼊通道,通过该输⼊通道接收到的消息进⼊应⽤程序
    @Output(在⽣产者⼯程中使⽤) 注解标识输出通道,发布的消息将通过该通道离开应⽤程序
    @StreamListener(在消费者⼯程中使⽤,监听message的到来) 监听队列,⽤于消费者的队列的消息的接收(有消息监听.....)
    @EnableBinding 把Channel和Exchange(对于RabbitMQ)绑定在⼀起

    Stream开发实战

    生产者端

    在父工程下新建子模块lagou-cloud-stream-producer-9090,引入jar:

    <!--spring cloud stream 依赖(rabbit) -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    

    添加rabbit相关配置:

    spring:
      application:
        name: lagou-cloud-stream-producer
      cloud:
        stream:
          binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
            lagouRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联
              type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
              environment: # MQ环境配置(⽤户名、密码等)
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 关联整合通道和binder对象
            output: # output是我们定义的通道名称,此处不能乱改
              destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)
              content-type: text/plain # application/json # 消息类型设置,⽐如json
              binder: lagouRabbitBinder # 关联MQ服务
    

    定义一个发送消息的接口及实现类:

    public interface IMessageProduder {
        void sendMessage(String message);
    }
    
    //绑定的通道,output。springcloud stream内对输出通道的定义
    @EnableBinding(Source.class)
    public class MessageProducerImpl implements IMessageProduder {
    
        @Autowired
        private Source source;
    
        @Override
        public void sendMessage(String message) {
            //向mq发送消息(并不直接操作mq,而是操作springcloud stream
            //使用source指定的output通道向外发送消息
            source.output().send(MessageBuilder.withPayload(message).build());
        }
    }
    

    消费者端

    父工程下新建消费者子模块lagou-cloud-stream-consumer-9091,引入jar坐标和服务端一样,这里不再赘述。
    application.yml里面配置与rabbit相关参数,唯一与服务者端不同的是input和output参数:

    其他都保持一致。
    在消费端定义service类来接受消息:

    @EnableBinding(Sink.class)
    public class MessageConsumerService {
    
        @StreamListener(Sink.INPUT)
        public void receiveMessage(Message<String> message){
            System.out.println("----接受到的消息---->"+message);
        }
    }
    

    测试

    我们在服务提供者端写一个测试类来发送消息:

    @SpringBootTest(classes = {StreamProducerApplication9090.class})
    @RunWith(SpringJUnit4ClassRunner.class)
    public class MessageProducerTest
    {
        @Autowired
        private IMessageProduder iMessageProduder;
    
        @Test
        public void testSendMessage(){
            iMessageProduder.sendMessage("hello world----!!");
        }
    
    }
    

    我们先启动服务消费者,然后运行服务提供者端的测试类,看服务消费者端的控制台输出了接收到的信息:

    Stream之消息分组

    我们将服务消费者复制一份,新消费者的端口是9092,前一个消费者端口是9091。
    这样我们继续测试,会发现同一个服务提供者发送的消息,被两个消费者都接收到并进行处理了。
    这明显是有问题的,比如电商网站的订单,肯定只需要处理一次就行。

    为了解决这个问题,rabbitmq有个消息分组的概念,只要两个消费者实例处在一个组里,那么这个组里只有一个消费者会处理这个消息。

    我们仅仅需要在服务消费者端设置 spring.cloud.stream.bindings.input.group 属性,多个消费者实例配置为同⼀个group名称(在同⼀个group中的多个消费者只有⼀个可以获取到消息并消费)。如下:

    扩展:前面我们都是先启动服务消费者,然后再启动服务提供者发送消息,也就是消息是临时性的,并没有持久化存储。当我们设置了分组之后,消息就会持久化存储。我们先发送消息,然后再启动服务消费者客户端,也能够接收到消息。

    案例源码

    stream案例的源码地址

    欢迎访问我的博客:https://www.liuyj.top

  • 相关阅读:
    [原创]测试计划与测试方案区别
    [原创]什么是构建验证测试(BVT)
    [原创]什么是测试驱动开发?
    [原创]用TestDirector的测试管理的流程
    [原创]测试用例设计之"正面测试与和负面测试"
    [原创]测试用例设计之“因果图”法
    [原创]测试用例设计之“状态迁移图”法
    [转贴]测试工具自动化的最佳实践
    [原创]软件测试管理之“测试角色和职能”概述
    软件测试试题
  • 原文地址:https://www.cnblogs.com/liuyj-top/p/14233124.html
Copyright © 2011-2022 走看看