zoukankan      html  css  js  c++  java
  • ZZ kafka性能问题调优

    https://blog.csdn.net/jiecxy/article/details/53369173

    Kafka errors NotLeaderForPartitionException, TimeoutException: Batch containing * record(s) expired

    jiecxy 2016-11-27 22:59:51 24223 收藏 6
    展开
    1. 错误描述

    kafka Producer在运行期间,出现大量以下错误:

    # 第一种
    org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
    1
    2

    # 第二种
    org.apache.kafka.common.errors.TimeoutException: Batch containing * record(s) expired due to timeout while requesting metadata from brokers for test-*

    注:在Kafka 0.10.0.1后,报错方式为:
    Expiring * record(s) for test-* due to *
    org.apache.kafka.common.errors.TimeoutException: Expiring * record(s) for test-0: 30941 ms has passed since last append
    1
    2
    3
    4
    5
    6
    但是系统在一定时间后系统会恢复正常
    导致错误的原因有很多,笔者只记录自己所遇到的问题及解决方法


    2. 导致错误运行的脚本及系统状态

    broker三节点:CPU:24Core 32G内存 1T硬盘
    Kafka 版本: 0.10.0.1
    Zookeeper 版本: 3.4.9 / 3.4.6
    每个节点挂载盘数:1
    Broker重要配置(均为默认值):
    num.network.threads=3
    num.io.threads=8

    Topic:
    bin/kafka-topics.sh --create --zookeeper hw168:2181 --replication-factor 3 --partitions 6 --topic test

    同时在两个节点上运行测试脚本(对应参数如下):
    bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic test --num-records 500000000 --record-size 100 --throughput -1 --producer-props bootstrap.servers=yourhostname:9092 buffer.memory=67108864 batch.size=8192 acks=1
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13


    3. 错误分析

    3.1 org.apache.kafka.common.errors.NotLeaderForPartitionException

    分析 server.log 日志:
    其中一台Broker会出现与Zookeeper的 session timed out 连接超时,接着导致 Controller挂掉重新选举 ,导致获取元数据不正确,挂掉的Broker所持有的partition就会出现NotLeaderForPartitionException,具体日志如下:
    注:在后续的代码Debug中,发现Broker在与Zookeeper客户端与服务器中,网络IO的 select 严重超时(可达二十秒),导致进程在此“假死“,进而造成 session 超时,心跳包超时,引发了后续(因此在最后的解决方案中,增大 Session timeout 时间本质上治标不治本)。

    [2016-12-01 15:35:47,946] INFO Client session timed out, have not heard from server in 7500ms for sessionid 0x58b937e13d0001, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
    [2016-12-01 15:35:48,046] DEBUG Received event: WatchedEvent state:Disconnected type:None path:null (org.I0Itec.zkclient.ZkClient)[2016-12-01 15:35:48,047] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient)
    [2016-12-01 15:35:48,047] DEBUG Leaving process event (org.I0Itec.zkclient.ZkClient)
    [2016-12-01 15:35:48,391] INFO Opening socket connection to server 172.18.11.169/172.18.11.169:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)[2016-12-01 15:35:48,391] INFO Socket connection established to 172.18.11.169/172.18.11.169:2181, initiating session (org.apache.zookeeper.ClientCnxn)
    1
    2
    3
    4

    [2016-11-27 22:20:39,802] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    [2016-11-27 22:20:39,806] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    [2016-11-27 22:20:39,806] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
    1
    2
    3
    以及

    [2016-11-27 22:20:05,918] INFO re-registering broker info in ZK for broker 2 (kafka.server.KafkaHealthcheck$SessionExpireListen
    er)
    [2016-11-27 22:20:05,919] INFO Creating /brokers/ids/2 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    [2016-11-27 22:20:05,972] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    [2016-11-27 22:20:05,972] INFO Registered broker 2 at path /brokers/ids/2 with addresses: PLAINTEXT -> EndPoint(172.18.11.169,9
    092,PLAINTEXT) (kafka.utils.ZkUtils)
    [2016-11-27 22:20:05,973] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener)
    [2016-11-27 22:20:05,973] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$Sessi
    onExpireListener)
    [2016-11-27 22:20:05,981] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10


    分析 Zookeeper 日志
    发现在报错前,即系统”假死“后,Zookeeper 服务端由于超时关闭了与kafka的连接,因此,该Broker超时挂掉,但很快就重新连接上了,因此总体上系统又看似恢复了
    2016-12-01 15:27:06,222 [myid:0] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@868] - Client attempting to establish new session at /172.18.11.169:39879
    2016-12-01 15:27:06,239 [myid:0] - INFO [CommitProcessor:0:ZooKeeperServer@617] - Established session 0x58b937e13d0001 with negotiated timeout 6000 for client /172.18.11.169:39879
    2016-12-01 15:35:48,586 [myid:0] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception
    EndOfStreamException: Unable to read additional data from client sessionid 0x58b937e13d0001, likely client has closed socket
    at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
    at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
    at java.lang.Thread.run(Thread.java:745)
    2016-12-01 15:35:48,591 [myid:0] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /172.18.11.169:39879 which had sessionid 0x58b937e13d0001
    1
    2
    3
    4
    5
    6
    7
    8


    分析 kafkaServer-gc.log 日志
    没有出现 Full GC 状况,GC均为正常,因此排除GC原因


    分析系统资源使用情况
    每台Broker的网卡出入流量大概在80MB/s,内存被耗尽剩200多MB。而磁盘上,iowait时间在35%左右,可以达到40%,await 远大于 svctm,说明磁盘的IO压力挺大


    3.2 org.apache.kafka.common.errors.TimeoutException

    分析 Producer源码
    可以从源码 Sender 的 run 方法 中看出,线程每次会查看RecordAccumulator中超时的Batch,具体报错地点在RecordBatch的maybeExpire方法中,其根本原因在于Sender线程不能在有效时间内将积存的消息发出去,导致超时。
    本实验中buffer.memory为64MB,远大于8KB的batch.size,根据topic,一个ProduceRequest有6个Batch,即48KB(不考虑Batch是否满),因此持续不断的数据请求BufferMemory可以积攒64*1024/48=1365个request,而若Broker处理不过来时(可能由于网络、磁盘瓶颈等原因),导致IO阻塞,缓存的数据不能及时发出,最终报错。


    4. 错误解决方案

    4.1 org.apache.kafka.common.errors.NotLeaderForPartitionException

    增大session的timeout时间
    可有效缓解(但系统依旧”卡顿“,影响性能),将初始值更改为30s,在 server.properties 中:
    zookeeper.session.timeout.ms=30000
    1
    增加磁盘IO处理线程数(具体数值请根据实际情况)
    在 server.properties 中:
    num.io.threads=12
    1
    增加 log 数据盘数(具体数值请根据实际情况)
    在每台机器上多挂载几个盘,更改 kafka 的 server.properties 参数:
    log.dirs=/mnt/datadir1/kafka-logs-0,/mnt/datadir2/kafka-logs-0,/mnt/datadir3/kafka-logs-0
    1


    4.2 org.apache.kafka.common.errors.TimeoutException

    增大 batch.size 或 减少 buffer.memory
    更改Producer参数(具体数值请根据实际情况)
    batch.size=65536
    1

    buffer.memory=524288
    1
    注:简单做了下实验,在我的机器上,buffer.memory为batch.size的64倍左右性能最好,但是倍数一定,随着batch.size增大,时延也会相应增大,因此需要自己先测试一下选择可接受的时延大小。值得一提的是,buffer.memory过大(相比batch.size)会导致非常高的时延,batch.size也要比record.size大,才能批处理。另外,注意设置linger.ms,否则默认linger.ms=0每次发送的Batch是不会积满才发出去的,也就不能提高吞吐率。

    增加broker处理网络请求的线程数(具体数值请根据实际情况)
    更改Broker的配置文件server.properties (具体数值请根据实际情况)
    # The number of threads handling network requests, default = 3
    num.network.threads=8
    1
    2

    ————————————————
    版权声明:本文为CSDN博主「jiecxy」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/jiecxy/article/details/53369173

  • 相关阅读:
    [Leetcode]Linked List Cycle
    [Leetcode]Excel Sheet Column Number
    [Leetcode]Unique Binary Search Trees
    [Leetcode]Same Tree
    同时访问内外网设置路由信息
    希腊字母表示及读音
    jni入门
    查看某个进程运行时间的几种方法
    企业级hbase HA配置
    存在单点故障的namenode宕机恢复测试
  • 原文地址:https://www.cnblogs.com/yeyong/p/12963986.html
Copyright © 2011-2022 走看看