zoukankan      html  css  js  c++  java
  • spring-cloud-stream-rabbit的一个topic对应多组消费者实例

    一、概述

       “Spring Cloud Stream is a framework for building message-driven microservice applications.”这是来自官方文档对spring cloud sream的介绍,大致可以理解为Spring Cloud Stream 是一个构建消息驱动微服务的框架,该项目用于代理消息队列的集成过程。避免业务与具体的mq产品有深刻的绑定关系,易于后期的服务切换。

    二、课题

      如何通过spring-cloud-starter-stream-rabbit实现一条消息,可以被多个群组同时消费?

    三、过程

    1、创建消息生产者项目

    •  pom引用
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
        </dependencies>
    • spring-cloud-stream的生产队列配置
    spring:
      rabbitmq:
        addresses: 192.168.0.114:5672
        username: simm
        password: test
      cloud:
        stream:
          default-binder: rabbit
          bindings:
            # 生产-消费者 指向同一个topic
            bizunit-install-producter:
              destination: yysc-bizunit-install
    #          group: bizunit-queue
              consumer:
                concurrency: 5
    • yysc-bizunit-install消息的生产,即rabbit的topic-exchange
     1 /**
     2  * 生产者通道
     3  * @author miscr
     4  */
     5 public interface InstallCallbackOutputChannel {
     6     /**
     7      * 定义通道的名称
     8      */
     9     String OUTPUT = "bizunit-install-producter";
    10 
    11     /**
    12      * 定义为输入通道
    13      * @return
    14      */
    15     @Output(OUTPUT)
    16     MessageChannel output();
    17 }
    18 
    19 
    20 /**
    21  * 生产消息
    22  * @author miscr
    23  */
    24 @EnableBinding(InstallCallbackOutputChannel.class)
    25 public class InstallCallbackSender {
    26     @Bean
    27     @InboundChannelAdapter(value = InstallCallbackOutputChannel.OUTPUT,poller = @Poller(fixedDelay = "2000"))
    28     public MessageSource<Date> timerMessagaSource(){
    29         return ()->new GenericMessage<>(new Date());
    30     }
    31 }

    2、创建两个消费者群组,群组分别命名为 main 和 template,各设置5个消费线程

    •  application.yml中配置消费者相关信息
    spring:
      rabbitmq:
        addresses: 192.168.0.114:5672
        username: simm
        password: test
      cloud:
        stream:
          default-binder: rabbit
          bindings:
            install-consumer:
              destination: yysc-bizunit-install
              group: template
              consumer:
                concurrency: 5
            bizunit-install-consumer:
              destination: yysc-bizunit-install
              group: main
              consumer:
                concurrency: 5
    • 消费者 install-consumer 的绑定源码示例
    /**
     * 安装状态回调接收通道
     * @author miscr
     */
    public interface InstallCallbackInputChannel {
        /**
         * 定义通道的名称
         */
        String INPUT = "install-consumer";
    
        /**
         * 定义为输入通道
         * @return
         */
        @Input(INPUT)
        SubscribableChannel input();
    }
    
    /**
     * 消费者服务
     *
     * @author miscr
     */
    @EnableBinding(InstallCallbackInputChannel.class)
    public class InstallCallbackReceiver {
        /**
         * 消息监听
         *
         * @param message
         */
        @StreamListener(InstallCallbackInputChannel.INPUT)
        private void receiver(Object message) {
            System.out.println("template" + message.toString());
        }
    }
    • 消费者 bizunit-install-consumer 的绑定源码示例
    /**
     * 安装状态回调接收通道
     * @author miscr
     */
    public interface BizUnitInstallCallbackInputChannel {
        /**
         * 定义通道的名称
         */
        String INPUT = "bizunit-install-consumer";
    
        /**
         * 定义为输入通道
         * @return
         */
        @Input(INPUT)
        SubscribableChannel input();
    }
    
    /**
     * 消费者服务
     *
     * @author miscr
     */
    @EnableBinding(BizUnitInstallCallbackInputChannel.class)
    public class BizUnitInstallCallbackReceiver {
        /**
         * 消息监听
         *
         * @param message
         */
        @StreamListener(BizUnitInstallCallbackInputChannel.INPUT)
        private void receiver(Object message) {
            System.out.println("bizunit-install" + message.toString());
        }
    }

    四、启动后监看mq的队列生产与消费情况

    • 生产者项目与消费者项目启动后,查看mq的控制台,结果如下

     

    • 查看java控制台的消费日志,两个群组确实都在消费同一条消息

  • 相关阅读:
    hdu1686 最大匹配次数 KMP
    洛谷 P5057 [CQOI2006]简单题(树状数组)
    洛谷 P5020 货币系统
    洛谷 P5019 铺设道路(差分)
    洛谷 P1119 灾后重建(Floyd)
    洛谷 P1082 同余方程(同余&&exgcd)
    洛谷 P2384 最短路
    洛谷 P3371 【模板】单源最短路径(弱化版) && dijkstra模板
    洛谷 P1387 最大正方形
    洛谷 P2866 [USACO06NOV]糟糕的一天Bad Hair Day
  • 原文地址:https://www.cnblogs.com/MrSi/p/14214141.html
Copyright © 2011-2022 走看看