zoukankan      html  css  js  c++  java
  • KAFKA-使用问题

    本篇文档使用kafka版本为:0.9.0.0

    问题1、在现场项目中,kafka连接正常一直无数据?

      1)通常是确认配置是否正确,包含任务配置,ip端口号;

    2)查看topic offset:是否有新数据进来,数据是否被消费掉了,

    3)然后检查kafka服务是否正常,查看服务是否有节点挂掉,topic配置是否做了副本,

    4)如果kafka是集群,而topic没有设置副本,那么挂掉一个节点就会导致无法拉取数据;

    5)其次是网络是否是通的,通过ping命令ping ip;

    之前遇到的棘手问题,上述这些都是正常的,程序讲过检测也是没问题的,后面发现一个现场网络很差,情况和上述类似;

    再经过抓包工具发现上述现场数据包发送成功很低,网咯很不稳定,发现网路问题导致无法接收数据。

     

    问题2、Occur runtime exception:org.apache.kafka.common.errors.RecordTooLargeException:

    There are some messages at [Partition=Offset]: {HIK_SMART_METADATA_TOPIC-1=8155364} whose size is larger than the fetch size 1048576 and hence cannot be ever returned.

    Increase the fetch size, or decrease the maximum message size the broker will allow.

    问题原因:消费下级Kafka数据,存在单条数据大小大于1048576 bytes (1M),造成工具无法消费该数据,该区县数据消费也会一直阻塞在这

    (上述单条数据大于1M仅个别数据)

    解决办法:

    配置项中消费端增加kafka配置项  max.partition.fetch.bytes=1572864  (1.5M)

    • max.partition.fetch.bytes:每次从单个分区中拉取的消息最大尺寸(byte),默认为1M

     

     

    问题3 、错误信息 :java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

             at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:706)

             at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:453)

             at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)

             at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.sendRecordToKafka(KafkaToKafkaStep.java:453)

             at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep$KafkaToKafkaGroupTask.run(KafkaToKafkaStep.java:204)

             at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)

             at java.util.concurrent.FutureTask.run(Unknown Source)

             at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

             at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

             at java.lang.Thread.run(Unknown Source)

    Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

    问题原因: 定位原因是发送数据到上级Kafka时,producer.send(record)  执行逻辑是从服务端获取metadata信息,然后send数据 ,

    若数据量过大、或者发送过于频繁等原因,更新metadata信息有可能超时(60s),捕获异常,然后重新发送,异常捕获后会打印出发送失败的数据,并打印异常信息

     

    问题4

    区县kafka集群是8台服务器,而HIK_SMART_METADATA_TOPIC partion仅在两台服务器上,重启另外几台kafka,仍是如此,其他topic无此现象

     

    解决办法:

    1、重建topic,需要topic数据全部被消费

    2、均衡topic

    kafka副本相关.docx

    问题5

    日志报如下错误

    2018-06-01 18:52:39 ERROR [node_2_Worker-18] core.JobRunShell (run:211) - Job com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.node_2_(kafka_BoLe)_(kafka_sanhui) threw an unhandled Exception:
    org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)
    at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.startStep(KafkaToKafkaStep.java:126)
    at com.hikvision.bigdata.hwf.workflow.node.steps.AbstractWorkflowNodeStep.execute(AbstractWorkflowNodeStep.java:143)
    at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
    at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
    Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Unable to establish loopback connection
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:98)
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:122)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:272)
    ... 5 more
    Caused by: java.io.IOException: Unable to establish loopback connection
    at sun.nio.ch.PipeImpl$Initializer.run(Unknown Source)
    at sun.nio.ch.PipeImpl$Initializer.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at sun.nio.ch.PipeImpl.<init>(Unknown Source)
    at sun.nio.ch.SelectorProviderImpl.openPipe(Unknown Source)
    at java.nio.channels.Pipe.open(Unknown Source)
    at sun.nio.ch.WindowsSelectorImpl.<init>(Unknown Source)
    at sun.nio.ch.WindowsSelectorProvider.openSelector(Unknown Source)
    at java.nio.channels.Selector.open(Unknown Source)
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:96)
    ... 7 more
    Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): bind
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Unknown Source)
    at sun.nio.ch.Net.bind(Unknown Source)
    at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
    at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source)
    at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source)

     

    问题结论:  本机tcp连接被占用,导致无法建立tcp(socket)连接

    方法1:重启服务器释放连接解决

    方法2: 调整本机tcp连接数设置,增大buffer大小,找出tcp连接未被释放原因

    cmd

    netstat -an

     

    netstat -an中state含义
    LISTEN:侦听来自远方的TCP端口的连接请求
    SYN-SENT:再发送连接请求后等待匹配的连接请求
    SYN-RECEIVED:再收到和发送一个连接请求后等待对方对连接请求的确认
    ESTABLISHED:代表一个打开的连接
    FIN-WAIT-1:等待远程TCP连接中断请求,或先前的连接中断请求的确认
    FIN-WAIT-2:从远程TCP等待连接中断请求
    CLOSE-WAIT:等待从本地用户发来的连接中断请求
    CLOSING:等待远程TCP对连接中断的确认
    LAST-ACK:等待原来的发向远程TCP的连接中断请求的确认
    TIME-WAIT:等待足够的时间以确保远程TCP接收到连接中断请求的确认
    CLOSED:没有任何连接状态

    有关kafka offset的一些方法:

    查询topic的offset的范围
    用下面命令可以查询到topic:test broker:suna:9092的offset的最小值:

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -2

    输出

    test:0:1288

    查询offset的最大值:

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -1

    输出

    test:0:7885

    从上面的输出可以看出topic:test只有一个partition:0 offset范围为:[1288,7885]

    设置consumer group的offset
    启动zookeeper client

    /zookeeper/bin/zkCli.sh

    通过下面命令设置consumer group:testgroup topic:test partition:0的offset为1288:

    set /consumers/testgroup/offsets/test/0 1288

    注意如果你的kafka设置了zookeeper root,比如为/kafka,那么命令应该改为:

    set /kafka/consumers/testgroup/offsets/test/0 1288

    重启相关的应用程序,就可以从设置的offset开始读数据了。

    手动更新Kafka存在Zookeeper中的偏移量。我们有时候需要手动将某个主题的偏移量设置成某个值,这时候我们就需要更新Zookeeper中的数据了。Kafka内置为我们提供了修改偏移量的类:kafka.tools.UpdateOffsetsInZK,我们可以通过它修改Zookeeper中某个主题的偏移量,具体操作如下:

     


    [iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

    USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

     

     

    在不输入参数的情况下,我们可以得知kafka.tools.UpdateOffsetsInZK类需要输入的参数。我们的consumer.properties文件配置内容如下:

     


    zookeeper.connect=www.iteblog.com:2181

    # timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000

    #consumer group id
    group.id=group

     


    这个工具只能把Zookeeper中偏移量设置成earliest或者latest,如下:

    [iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

        earliest config/consumer.properties iteblog
    updating partition 0 with new offset: 276022922
    updating partition 1 with new offset: 234360148
    updating partition 2 with new offset: 157237157
    updating partition 3 with new offset: 106968019
    updating partition 4 with new offset: 80696130
    updating partition 5 with new offset: 317144986
    updating partition 6 with new offset: 299182459
    updating partition 7 with new offset: 197012246
    updating partition 8 with new offset: 230433681
    updating partition 9 with new offset: 120971431
    updating partition 10 with new offset: 51200673
    updated the offset
    for 11 partitions

  • 相关阅读:
    UVA 11174 Stand in a Line,UVA 1436 Counting heaps —— (组合数的好题)
    UVA 1393 Highways,UVA 12075 Counting Triangles —— (组合数,dp)
    【Same Tree】cpp
    【Recover Binary Search Tree】cpp
    【Binary Tree Zigzag Level Order Traversal】cpp
    【Binary Tree Level Order Traversal II 】cpp
    【Binary Tree Level Order Traversal】cpp
    【Binary Tree Post order Traversal】cpp
    【Binary Tree Inorder Traversal】cpp
    【Binary Tree Preorder Traversal】cpp
  • 原文地址:https://www.cnblogs.com/yangh2016/p/14756047.html
Copyright © 2011-2022 走看看