zoukankan      html  css  js  c++  java
  • 关于kafka消费组模式,分区数和消费者数的一些疑惑

    关于kafka的消费组模式,差了点相关资料,其中有一点提到:

    一个主题下的分区不能小于消费者数量,即一个主题下消费者数量不能大于分区属,大了就浪费了

    那么,如果我的消费者进程数大于分区数的话,会有什么现象了,接下来就做个实验试试

    1、首先,创建一个3分区,主题名为test3

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test3

    2、编写生产者和消费者脚本,本人用的开发语言是php,demo例子可以参考文档,只要做一些小修改就可以了

    附上文档地址:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples-high-level-consumer.html

    生产者脚本

     1 <?php
     2 
     3 $objRdKafka = new RdKafkaProducer();
     4 $objRdKafka->setLogLevel(LOG_DEBUG);
     5 $objRdKafka->addBrokers("192.168.78.139:9092");
     6 
     7 $oObjTopic = $objRdKafka->newTopic("test3");
     8 
     9 // 从终端接收输入 
    10 $oInputHandler = fopen('php://stdin', 'r');
    11 
    12 while (true) {
    13     echo "
    Enter  messages:
    ";
    14     $sMsg = trim(fgets($oInputHandler));
    15 
    16    // 空消息意味着退出
    17     if (empty($sMsg)) {
    18         break;
    19     }
    20 
    21     // 发送消息
    22     $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
    23 }
    24 
    25 echo "done
    ";
    26 ?>

    消费者脚本

    <?php
    
    $conf = new RdKafkaConf();
    
    // Set a rebalance callback to log partition assignments (optional)
    // 当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发
    $conf->setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign($partitions);
                break;
    
            case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                echo "Revoke: ";
                var_dump($partitions);
                $kafka->assign(NULL);
                break;
    
            default:
                throw new Exception($err);
        }
    });
    
    // 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
    $conf->set('group.id', 'myConsumerGroup1');
    
    //添加 kafka集群服务器地址(ip地址和端口替换成自己本地测试环境)
    $conf->set('metadata.broker.list', '192.168.78.139:9092');
    
    $topicConf = new RdKafkaTopicConf();
    
    
    // Set where to start consuming messages when there is no initial offset in
    // offset store or the desired offset is out of range.
    // 'smallest': start from the beginning
    //当没有初始偏移量时,从哪里开始读取
    $topicConf->set('auto.offset.reset', 'smallest');
    
    
    // Set the configuration to use for subscribed/assigned topics
    $conf->setDefaultTopicConf($topicConf);
    
    $consumer = new RdKafkaKafkaConsumer($conf);
    
    // 让消费者订阅log 主题(topic替换成自己生成的)
    $consumer->subscribe(['test3']);
    
    
    while (true) {
        $message = $consumer->consume(120*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more
    ";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out
    ";
                break;
            default:
                throw new Exception($message->errstr(), $message->err);
                break;
        }
    }
    ?>

    3、开始试验

    启4个消费者脚本,然后用生产者生成多个消息,进行观察

     

     如上图所示。消费者2,3,4都能进行消费,而消费者1就只是挂着啥事都不干

    接下来,试试kill掉其中一个消费者,比如消费者4,然后继续产生消息

     这是,消费者1顶替了消费者4的位置,进行消息的消费

    结论:当消费者总数大于分区数的话,多余的消费者进程会一直挂着,但是当某个消费者进程down掉的话,之前那些多余的消费者进程会顶替上来

  • 相关阅读:
    重构29-Remove Middle Man(去掉中间人)
    重构30-Return ASAP(尽快返回)
    重构26-Remove Double Negative(去掉双重否定)
    yaml语法学习3
    运行原理探究2
    SpringBoot简介 1
    SpringMVC项目所引用的一切依赖jar包和自定义设置
    2020/07/03 初始mybatis
    json数据格式字符串在java中的转移
    项目中遇到的一些异常
  • 原文地址:https://www.cnblogs.com/zhp-king/p/14584163.html
Copyright © 2011-2022 走看看