zoukankan      html  css  js  c++  java
  • Kafka问题排查(消费者自动关闭)

    问题描述:
               在消费端能够正常消费到Kafka数据并成功生产到producer topic 中,当将kafka的一台机器关机之后,正常情况下应该是 消费端是不受影响的。因为有还有两台的负载机器。问题就是一台机器停止运行之后,消费端酒 shutdown  而无法重新starting
    解决方式 : 
    在如下代码中。
     1      public void run(){
     2              try{
     3                   System. out.println( "Consumer....");
     4                   Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
     5                    topicCountMap.put( topic, partitionNum);
     6                   Map<String,List<KafkaStream< byte[], byte[]>>> consumerMap = consumer.createMessageStreams( topicCountMap);
     7                   List<KafkaStream< byte[], byte[]>> partitions = consumerMap.get( topic);
     8 
     9                    threadPool = Executors. newCachedThreadPool();
    10                    for(KafkaStream< byte[], byte[]> partition : partitions){
    11                          threadPool.execute( new MessageFetcher(partition,producer ));
    12                   }
    13             } catch(Exception ex){
    14                    logger.info( "KafkaConsumer-> Run -> ErrInfo : " +ex.getMessage());
    15                   close();
    16             }
    17       }

    有一个 partitionNum,在代码中的可配置值为 private int partitionNum = 3;  

    把partitionNum 改为 1 即可解决此问题。

    问题跟踪源码分析:

    partitionNUm 改为 1 , 此处的Num 为ThreadNum ,因为kafka内部实现中,都为多线程, partition为1时,此时有一个backingQueue1,三个fetch thread 线程,该topic分布在几个node上就有几个 fetch thread 每个fetch thread 会于kafka broker建立一个连接,3个fetch thread线程去拉去消息数据,最终防盗blockingQueue中,等到consumer thread来消费。

  • 相关阅读:
    c# 类成员的定义 定义方法、字段和属性
    Set Rowcount分页查询(转)
    Sql 学习笔记
    xen与kvm 天高地厚
    Linux_free(buffer与cache区别) 天高地厚
    快照技术 天高地厚
    磁盘阵列的状态与种类 天高地厚
    在linux中使用ramdisk文件系统 天高地厚
    oracle逻辑读取 天高地厚
    BeginInvoke和EndInvoke操作线程 天高地厚
  • 原文地址:https://www.cnblogs.com/DeepLearing/p/5641454.html
Copyright © 2011-2022 走看看