zoukankan      html  css  js  c++  java
  • Spring Cloud Stream

    说明

    对Spring Boot 和 Spring Integration的整合,通过Spring Cloud Stream能够简化消息中间件使用的复杂难度!让业务人员更多的精力能够花在业务层面

    简单例子

    consumer

    1.创建一个一个项目名为spring-cloud-stream-consumer

    2.引入pom依赖

    <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>

    3.yml配置

    复制代码
    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
    server:
      port: 8081
    复制代码

    5.创建消息监听类

    复制代码
    //实现对定义了多个@Input和@Output的接口实现对通道的绑定 Sink定义了@Input 我们自己处理时是自己定义接口
    @EnableBinding(Sink.class)
    public class SkinReceiverService {
        private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
        //对input的消息监听处理
        @StreamListener(Sink.INPUT)
        public void receiver(Object message){
            logger.info(message.toString());
        }
    }
    复制代码

    6.启动rabbitmq

    7.启动项目

    8.通过mq管理页面发送消息

     控制台打印 表示成功接收到消息

    核心概念

    绑定器

    应用程序与消息中间件的抽象层。应用程序中间件的解耦。应用程序不需要考虑用的什么类型的消息中间件。当我们需要更换消息中间件 只需要替换绑定器

    发布订阅

    spring cloud stream 完全遵循发布订阅模式 当一条消息被发布到消息中间件后 将会以topic主题模式进行广播,消费者对订阅的topic主题进行相应的逻辑处理。topic是spring cloud stream的一个抽象概念,不同消息中间件topic概念可能不同 rabbitMq对应exchage

    如rabbitMQ的topic

     发布订阅模式能够有效避免点对点的耦合 当一种消息要增加一种处理方式时只需要增加一个消息订阅者

    消费组

    一般我们的消费组都会集群部署 但是我们再集群部署的情况下 会形成多个订阅者 导致消息被消费多次, 消费组则是解决一个消息只能被一个实例消费者消费

    消费分区

    指定统一特征的消息被指定服务实例消费 spring cloud stream消费分区提供通用的抽象实现 使不支持分区的中间件也能支持消费分区

    自定义输入和输出

    定义输入

    Sink 是spring cloud stream 的默认实现 我们可以通过查看源码

    复制代码
    public interface Sink {
        String INPUT = "input";
    
        @Input("input")
        SubscribableChannel input();
    }
    复制代码

    通过@Input注册参数为通道名字 同时需要返回SubscribableChannel

    我们通过参考Sink定义一个输入通道 比如处理订单保存的通道

    1.定义第一个通道

    public interface OrderMQInputChannel {
        String saveOrderChannelName="saveOrder";//定义通道的名字
        @Input(saveOrderChannelName)//定义为输入通道
        public SubscribableChannel saveOrder();
    }

    2.绑定通道并监听

    复制代码
    //通过绑定器 对OrderMQInputChannel通道进行绑定
    @EnableBinding(OrderMQInputChannel.class)
    public class OrderMQReceiverService {
        private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
        //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
        @StreamListener(OrderMQInputChannel.saveOrderChannelName)
        public void receiver(Object message){
            logger.info(message.toString());
        }
    }
    复制代码

    定义输出

    1.创建一个测试消费提供者的项目

     

    2.引入pom依赖

    复制代码
    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
        </dependencies>
    复制代码

    2.yml配置文件配置

    复制代码
    spring:
      application:
        name: streamProvider
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
    server:
      port: 8082
    复制代码

    3.定义一个输出通道

    public interface OrderMQOutputChannel {
        String saveOrderChannelName="saveOrder";
        @Output(saveOrderChannelName)//定义输出管道的名字
        MessageChannel saveOrder();
    }

    4.绑定通道

    @EnableBinding(OrderMQOutputChannel.class) //绑定通道OrderMQOutputChannel
    public class OrderChannelBindConfig {
    }

    5.添加测试contorller

    复制代码
    @Controller
    public class TestContorller {
        @Autowired
        OrderMQOutputChannel orderMQOutputChannel;
        @RequestMapping("/saveOrder")
        @ResponseBody
        public boolean saveOrder(){
            //发送一条保存订单的命令
           return  orderMQOutputChannel.saveOrder().send(MessageBuilder.withPayload("fff").build());
        }
    }
    复制代码

    或者

    复制代码
    @Controller
    public class TestContorller {
    
        //直接注入对应通道是的实例
        @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
        MessageChannel messageChannel;
    
        @RequestMapping("/saveOrder")
        @ResponseBody
        public boolean saveOrder(){
            //发送一条保存订单的命令
            return  messageChannel.send(MessageBuilder.withPayload("fff").build());
        }
    
    }
    复制代码

    6.访问

    http://127.0.0.1:8082/saveOrder

    7.consumer打印 表示消息被消费

     spring intergration原生支持

    spring cloud stream 是通过spring boot和spring intergreation的整合 所以也可以使用原生的用法实现相同的功能

    provider

    复制代码
    @EnableBinding(OrderMQOutputChannel.class) //绑定通道OrderMQOutputChannel
    public class OriginalOrderMQOutPutChannelService {
        //定义2秒发送一次消息
        @Bean
        @InboundChannelAdapter(value = OrderMQOutputChannel.saveOrderChannelName, poller = @Poller(fixedDelay = "2000"))
        public MessageSource<Date> timerMessageSource() {
            return () -> new GenericMessage<>(new Date());
    
        }
    }
    复制代码

    consumer

    复制代码
    //通过绑定器 对OrderMQInputChannel通道进行绑定
    @EnableBinding(OrderMQInputChannel.class)
    public class OriginalOrderMQReceiverService {
        private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
    
        //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
        @ServiceActivator(inputChannel = OrderMQInputChannel.saveOrderChannelName)
        public void receiver(Object message) {
            logger.info(message.toString());
        }
    
        //定义消息转换器 转换saveOrderChannelName 通道的的消息
        @Transformer(inputChannel = OrderMQInputChannel.saveOrderChannelName, outputChannel = OrderMQInputChannel.saveOrderChannelName)
        public Object transform(Date message) {
            return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message);
    
        }
    }
    复制代码

    启动之后conusmer2秒则会受到一条消息 更多用法查看spring intergration文档

    消息转换 

    通过上面我们可以看到原生通过@Transformer实现消息转换 spring cloud stream 只需要定义消息通道的消息类型

    spring.cloud.stream.bindings.[inputname].content-type=application/json
    复制代码
    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          bindings:
            saveOrder:
              content-type: application/json
    server:
      port: 8081
    复制代码

    消费者

    复制代码
    //通过绑定器 对OrderMQInputChannel通道进行绑定
    @EnableBinding(OrderMQInputChannel.class)
    public class OrderMQReceiverService {
        private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
        //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
        @StreamListener(OrderMQInputChannel.saveOrderChannelName)
        public void receiver(Order order){
           
            logger.info(order.getId()+"-"+order.getOrderCode());
        }
    }
    复制代码

    消息提供者

    复制代码
    @Controller
    public class TestContorller {
    
        //直接注入对应通道是的实例
        @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
        MessageChannel messageChannel;
    
        @RequestMapping("/saveOrder")
        @ResponseBody
        public boolean saveOrder(){
           com.liqiang.entity.Order order=new  com.liqiang.entity.Order();
           order.setId(1L);
           order.setOrderCode("201901020001");
            //发送一条保存订单的命令
            return  messageChannel.send(MessageBuilder.withPayload(order).build());
        }
    
    }
    复制代码

    消息反馈

    用于将消息交给别的应用处理 处理后再回传  或者异步请求 接收处理结果

    provider

    复制代码
    public interface OrderMQOutputChannel {
        String saveOrderChannelName="saveOrder";
        String saveOrderCallbackChannelName="saveOrderCallback";//定义回调通道的名字
        @Output(saveOrderChannelName)//定义输出管道的名字
        MessageChannel saveOrder();
    
        @Input(saveOrderCallbackChannelName)//定义为输入通道
        public SubscribableChannel saveOrderCallback();
    }
    复制代码
    复制代码
    @EnableBinding(OrderMQOutputChannel.class) //绑定通道OrderMQOutputChannel
    public class OrderChannelBindConfig {
        private static Logger logger = LoggerFactory.getLogger(OrderMQOutputChannel.class);
        //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
        @StreamListener(OrderMQOutputChannel.saveOrderCallbackChannelName)
        public void receiver(boolean boo){
            logger.info(String.valueOf(boo));
        }
    }
    复制代码

    consumer

    复制代码
    public interface OrderMQInputChannel {
        String saveOrderChannelName="saveOrder";//定义通道的名字
        String saveOrderCallbackChannelName="saveOrderCallback";//定义回调通道的名字
        @Input(saveOrderChannelName)//定义为输入通道
        public SubscribableChannel saveOrder();
    }
    复制代码
    复制代码
    //通过绑定器 对OrderMQInputChannel通道进行绑定
    @EnableBinding(OrderMQInputChannel.class)
    public class OrderMQReceiverService {
        private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
        //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
        @StreamListener(OrderMQInputChannel.saveOrderChannelName)
        @SendTo(OrderMQInputChannel.saveOrderCallbackChannelName)//反馈的通道名字
        public boolean receiver(Order order){
    
            logger.info(order.getId()+"-"+order.getOrderCode());
            return true;
        }
    }
    复制代码

    消息分组

    多实例情况下 只需要指定spring.cloud.stream.bindings.[channelname].group=gorupname 当同一组实例对同一个主题的消息只能会有一个实例消费

    1.测试 创建2个配置文件 分别为

    application-peer1.yml

    复制代码
    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          bindings:
            saveOrder:
              group: groupA
              content-type: application/json
    server:
      port: 8081
    复制代码

    application-peer2.yml

    复制代码
    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          bindings:
            saveOrder:
              group: groupA
              content-type: application/json
    server:
      port: 8083
    复制代码

    通过启动2个消费者

    java -jar /Users/liqiang/Desktop/java开发环境/javadom/spring-cloud-parent/spring-cloud-stream-consumer/target/spring-cloud-stream-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer1

    java -jar /Users/liqiang/Desktop/java开发环境/javadom/spring-cloud-parent/spring-cloud-stream-consumer/target/spring-cloud-stream-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer2

    只要一个实例消费了 

    消费分区

    再某些场景 我需要指定某一类消息只能被哪些实例消费

    消费者

    application-peer1.yml

    复制代码
    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          instanceCount: 2 #跟分区一起使用 有多少实例
          instanceIndex: 0 #分区当前实例编号 从0开始
          bindings:
            saveOrder:
              group: streamConsumer
              content-type: application/json
              consumer:
                partitioned: true #开启消息分区的功能
    
    server:
      port: 8081
    复制代码

    application-peer2.yml

    复制代码
    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          instanceCount: 2 #跟分区一起使用 有多少实例
          instanceIndex: 1 #当前实例编号 从0开始
          bindings:
            saveOrder:
              group: streamConsumer
              content-type: application/json
              consumer:
                partitioned: true #开启消息分区的功能
    
    server:
      port: 8083
    复制代码

    生产者

    复制代码
    spring:
      application:
        name: streamProvider
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          bindings:
            saveOrder:
              producer:
                partitionKeyExpression: '0' #表示只有实例索引为0的才能收到消息 支持SpEL表达式
                partitionCount: 2
    server:
      port: 8082
    复制代码

    当启动2个消费者 和生产者 当前生产者 生产的消息只能被实例编号为0的消费

    这里限制死了当前实例生产的消息被某个实例消费。如果我们需要指定 当前生产者生产的某一类服务被指定实例消费呢可以通过SpEL表达式设置

    生产者yml

    复制代码
    spring:
      application:
        name: streamProvider
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          bindings:
            saveOrder:
              producer:
                partitionKeyExpression: headers['partitionKey'] #SpEL表达式 通过读取消息hearder的partitionKey属性动态指定
                partitionCount: 2 #消息分区数量
    server:
      port: 8082
    复制代码

    消息生产通过header动态指定

    复制代码
    package com.liqiang.springcloudstreamprovider;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.core.annotation.Order;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    @Controller
    public class TestContorller {
    
        //直接注入对应通道是的实例
        @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
        MessageChannel messageChannel;
        private static  int index=0;
    
        @RequestMapping("/saveOrder")
        @ResponseBody
        public boolean saveOrder(){
           com.liqiang.entity.Order order=new  com.liqiang.entity.Order();
           order.setId(1L);
           order.setOrderCode("201901020001");
    
            //发送一条保存订单的命令
            return  messageChannel.send(MessageBuilder.withPayload(order).setHeader("partitionKey",(index++)%2==0?0:1).build());
        }
    
    }
  • 相关阅读:
    学习笔记180—回归系数与相关系数的关系和区别
    学习笔记178—精品书籍推荐榜
    学习笔记177—PPT生成的图片设置成特定像素级的图片【四种方法】
    学习笔记176—PS 获得一个椭圆的某个部分
    shell:利用sed删除文件中的匹配行
    常用壁纸
    Linux下编译C文件:Hello World
    属性访问、特性和修饰符
    介绍Python基本特殊方法
    kafka配置
  • 原文地址:https://www.cnblogs.com/smallfa/p/13571205.html
Copyright © 2011-2022 走看看