zoukankan      html  css  js  c++  java
  • Spring Cloud Stream(十三)

    说明

    对Spring Boot 和 Spring Integration的整合,通过Spring Cloud Stream能够简化消息中间件使用的复杂难度!让业务人员更多的精力能够花在业务层面

    简单例子

    consumer

    1.创建一个一个项目名为spring-cloud-stream-consumer

    2.引入pom依赖

    <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>

    3.yml配置

    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
    server:
      port: 8081

    5.创建消息监听类

    //实现对定义了多个@Input和@Output的接口实现对通道的绑定 Sink定义了@Input 我们自己处理时是自己定义接口
    @EnableBinding(Sink.class)
    public class SkinReceiverService {
        private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
        //对input的消息监听处理
        @StreamListener(Sink.INPUT)
        public void receiver(Object message){
            logger.info(message.toString());
        }
    }

    6.启动rabbitmq

    7.启动项目

    8.通过mq管理页面发送消息

     控制台打印 表示成功接收到消息

    核心概念

    绑定器

    应用程序与消息中间件的抽象层。应用程序中间件的解耦。应用程序不需要考虑用的什么类型的消息中间件。当我们需要更换消息中间件 只需要替换绑定器

    发布订阅

    spring cloud stream 完全遵循发布订阅模式 当一条消息被发布到消息中间件后 将会以topic主题模式进行广播,消费者对订阅的topic主题进行相应的逻辑处理。topic是spring cloud stream的一个抽象概念,不同消息中间件topic概念可能不同 rabbitMq对应exchage

    如rabbitMQ的topic

     发布订阅模式能够有效避免点对点的耦合 当一种消息要增加一种处理方式时只需要增加一个消息订阅者

    消费组

    一般我们的消费组都会集群部署 但是我们再集群部署的情况下 会形成多个订阅者 导致消息被消费多次, 消费组则是解决一个消息只能被一个实例消费者消费

    消费分区

    指定统一特征的消息被指定服务实例消费 spring cloud stream消费分区提供通用的抽象实现 使不支持分区的中间件也能支持消费分区

    自定义输入和输出

    定义输入

    Sink 是spring cloud stream 的默认实现 我们可以通过查看源码

    public interface Sink {
        String INPUT = "input";
    
        @Input("input")
        SubscribableChannel input();
    }

    通过@Input注册参数为通道名字 同时需要返回SubscribableChannel

    我们通过参考Sink定义一个输入通道 比如处理订单保存的通道

    1.定义第一个通道

    public interface OrderMQInputChannel {
        String saveOrderChannelName="saveOrder";//定义通道的名字
        @Input(saveOrderChannelName)//定义为输入通道
        public SubscribableChannel saveOrder();
    }

    2.绑定通道并监听

    //通过绑定器 对OrderMQInputChannel通道进行绑定
    @EnableBinding(OrderMQInputChannel.class)
    public class OrderMQReceiverService {
        private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
        //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
        @StreamListener(OrderMQInputChannel.saveOrderChannelName)
        public void receiver(Object message){
            logger.info(message.toString());
        }
    }

    定义输出

    1.创建一个测试消费提供者的项目

     

    2.引入pom依赖

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
        </dependencies>

    2.yml配置文件配置

    spring:
      application:
        name: streamProvider
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
    server:
      port: 8082

    3.定义一个输出通道

    public interface OrderMQOutputChannel {
        String saveOrderChannelName="saveOrder";
        @Output(saveOrderChannelName)//定义输出管道的名字
        MessageChannel saveOrder();
    }

    4.绑定通道

    @EnableBinding(OrderMQOutputChannel.class) //绑定通道OrderMQOutputChannel
    public class OrderChannelBindConfig {
    }

    5.添加测试contorller

    @Controller
    public class TestContorller {
        @Autowired
        OrderMQOutputChannel orderMQOutputChannel;
        @RequestMapping("/saveOrder")
        @ResponseBody
        public boolean saveOrder(){
            //发送一条保存订单的命令
           return  orderMQOutputChannel.saveOrder().send(MessageBuilder.withPayload("fff").build());
        }
    }

    或者

    @Controller
    public class TestContorller {
    
        //直接注入对应通道是的实例
        @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
        MessageChannel messageChannel;
    
        @RequestMapping("/saveOrder")
        @ResponseBody
        public boolean saveOrder(){
            //发送一条保存订单的命令
            return  messageChannel.send(MessageBuilder.withPayload("fff").build());
        }
    
    }

    6.访问

    http://127.0.0.1:8082/saveOrder

    7.consumer打印 表示消息被消费

     spring intergration原生支持

    spring cloud stream 是通过spring boot和spring intergreation的整合 所以也可以使用原生的用法实现相同的功能

    provider

    @EnableBinding(OrderMQOutputChannel.class) //绑定通道OrderMQOutputChannel
    public class OriginalOrderMQOutPutChannelService {
        //定义2秒发送一次消息
        @Bean
        @InboundChannelAdapter(value = OrderMQOutputChannel.saveOrderChannelName, poller = @Poller(fixedDelay = "2000"))
        public MessageSource<Date> timerMessageSource() {
            return () -> new GenericMessage<>(new Date());
    
        }
    }

    consumer

    //通过绑定器 对OrderMQInputChannel通道进行绑定
    @EnableBinding(OrderMQInputChannel.class)
    public class OriginalOrderMQReceiverService {
        private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
    
        //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
        @ServiceActivator(inputChannel = OrderMQInputChannel.saveOrderChannelName)
        public void receiver(Object message) {
            logger.info(message.toString());
        }
    
        //定义消息转换器 转换saveOrderChannelName 通道的的消息
        @Transformer(inputChannel = OrderMQInputChannel.saveOrderChannelName, outputChannel = OrderMQInputChannel.saveOrderChannelName)
        public Object transform(Date message) {
            return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message);
    
        }
    }

    启动之后conusmer2秒则会受到一条消息 更多用法查看spring intergration文档

    消息转换 

    通过上面我们可以看到原生通过@Transformer实现消息转换 spring cloud stream 只需要定义消息通道的消息类型

    spring.cloud.stream.bindings.[inputname].content-type=application/json
    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          bindings:
            saveOrder:
              content-type: application/json
    server:
      port: 8081

    消费者

    //通过绑定器 对OrderMQInputChannel通道进行绑定
    @EnableBinding(OrderMQInputChannel.class)
    public class OrderMQReceiverService {
        private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
        //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
        @StreamListener(OrderMQInputChannel.saveOrderChannelName)
        public void receiver(Order order){
           
            logger.info(order.getId()+"-"+order.getOrderCode());
        }
    }

    消息提供者

    @Controller
    public class TestContorller {
    
        //直接注入对应通道是的实例
        @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
        MessageChannel messageChannel;
    
        @RequestMapping("/saveOrder")
        @ResponseBody
        public boolean saveOrder(){
           com.liqiang.entity.Order order=new  com.liqiang.entity.Order();
           order.setId(1L);
           order.setOrderCode("201901020001");
            //发送一条保存订单的命令
            return  messageChannel.send(MessageBuilder.withPayload(order).build());
        }
    
    }

    消息反馈

    用于将消息交给别的应用处理 处理后再回传  或者异步请求 接收处理结果

    provider

    public interface OrderMQOutputChannel {
        String saveOrderChannelName="saveOrder";
        String saveOrderCallbackChannelName="saveOrderCallback";//定义回调通道的名字
        @Output(saveOrderChannelName)//定义输出管道的名字
        MessageChannel saveOrder();
    
        @Input(saveOrderCallbackChannelName)//定义为输入通道
        public SubscribableChannel saveOrderCallback();
    }
    @EnableBinding(OrderMQOutputChannel.class) //绑定通道OrderMQOutputChannel
    public class OrderChannelBindConfig {
        private static Logger logger = LoggerFactory.getLogger(OrderMQOutputChannel.class);
        //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
        @StreamListener(OrderMQOutputChannel.saveOrderCallbackChannelName)
        public void receiver(boolean boo){
            logger.info(String.valueOf(boo));
        }
    }

    consumer

    public interface OrderMQInputChannel {
        String saveOrderChannelName="saveOrder";//定义通道的名字
        String saveOrderCallbackChannelName="saveOrderCallback";//定义回调通道的名字
        @Input(saveOrderChannelName)//定义为输入通道
        public SubscribableChannel saveOrder();
    }
    //通过绑定器 对OrderMQInputChannel通道进行绑定
    @EnableBinding(OrderMQInputChannel.class)
    public class OrderMQReceiverService {
        private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
        //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
        @StreamListener(OrderMQInputChannel.saveOrderChannelName)
        @SendTo(OrderMQInputChannel.saveOrderCallbackChannelName)//反馈的通道名字
        public boolean receiver(Order order){
    
            logger.info(order.getId()+"-"+order.getOrderCode());
            return true;
        }
    }

    消息分组

    多实例情况下 只需要指定spring.cloud.stream.bindings.[channelname].group=gorupname 当同一组实例对同一个主题的消息只能会有一个实例消费

    1.测试 创建2个配置文件 分别为

    application-peer1.yml

    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          bindings:
            saveOrder:
              group: groupA
              content-type: application/json
    server:
      port: 8081

    application-peer2.yml

    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          bindings:
            saveOrder:
              group: groupA
              content-type: application/json
    server:
      port: 8083

    通过启动2个消费者

    java -jar /Users/liqiang/Desktop/java开发环境/javadom/spring-cloud-parent/spring-cloud-stream-consumer/target/spring-cloud-stream-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer1

    java -jar /Users/liqiang/Desktop/java开发环境/javadom/spring-cloud-parent/spring-cloud-stream-consumer/target/spring-cloud-stream-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer2

    只要一个实例消费了 

    消费分区

    再某些场景 我需要指定某一类消息只能被哪些实例消费

    消费者

    application-peer1.yml

    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          instanceCount: 2 #跟分区一起使用 有多少实例
          instanceIndex: 0 #分区当前实例编号 从0开始
          bindings:
            saveOrder:
              group: streamConsumer
              content-type: application/json
              consumer:
                partitioned: true #开启消息分区的功能
    
    server:
      port: 8081

    application-peer2.yml

    spring:
      application:
        name: streamConsumer
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          instanceCount: 2 #跟分区一起使用 有多少实例
          instanceIndex: 1 #当前实例编号 从0开始
          bindings:
            saveOrder:
              group: streamConsumer
              content-type: application/json
              consumer:
                partitioned: true #开启消息分区的功能
    
    server:
      port: 8083

    生产者

    spring:
      application:
        name: streamProvider
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          bindings:
            saveOrder:
              producer:
                partitionKeyExpression: '0' #表示只有实例索引为0的才能收到消息 支持SpEL表达式
                partitionCount: 2
    server:
      port: 8082

    当启动2个消费者 和生产者 当前生产者 生产的消息只能被实例编号为0的消费

    这里限制死了当前实例生产的消息被某个实例消费。如果我们需要指定 当前生产者生产的某一类服务被指定实例消费呢可以通过SpEL表达式设置

    生产者yml

    spring:
      application:
        name: streamProvider
      rabbitmq:   #更多mq配置看书331页
        username: liqiang
        password: liqiang
        host: localhost
        port: 5672
      cloud:
        stream:
          bindings:
            saveOrder:
              producer:
                partitionKeyExpression: headers['partitionKey'] #SpEL表达式 通过读取消息hearder的partitionKey属性动态指定
                partitionCount: 2 #消息分区数量
    server:
      port: 8082

    消息生产通过header动态指定

    package com.liqiang.springcloudstreamprovider;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.core.annotation.Order;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    @Controller
    public class TestContorller {
    
        //直接注入对应通道是的实例
        @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
        MessageChannel messageChannel;
        private static  int index=0;
    
        @RequestMapping("/saveOrder")
        @ResponseBody
        public boolean saveOrder(){
           com.liqiang.entity.Order order=new  com.liqiang.entity.Order();
           order.setId(1L);
           order.setOrderCode("201901020001");
    
            //发送一条保存订单的命令
            return  messageChannel.send(MessageBuilder.withPayload(order).setHeader("partitionKey",(index++)%2==0?0:1).build());
        }
    
    }
  • 相关阅读:
    HDU 2098 分拆素数和 数论
    CodeForces The Endless River
    CodeForces Good Words
    CodeForces A or B Equals C
    HDU 1251 统计难题 字典树/STL
    CSUOJ 1555 Inversion Sequence 线段树/STL
    OpenJudge P4979 海贼王之伟大航路 DFS
    敌兵布阵 线段树
    HDU 4004 The Frog's Games 二分
    HDU 2578 Dating with girls(1) 二分
  • 原文地址:https://www.cnblogs.com/LQBlog/p/10342675.html
Copyright © 2011-2022 走看看