zoukankan      html  css  js  c++  java
  • kafka集群使用遇到的问题

    1、kafka集群上通过kafka-console-producer.sh发送消息,使用kafka-console-consumer.sh接收不到消息,一直卡住没动静

    发送:

    ./kafka-console-producer.sh --broker-list zk1:9092,zk2:9092,zk3:9092 --topic xyz

    接收:

    ./kafka-console-consumer.sh --bootstrap-server zk1:9092,zk2:9092,zk3:9092 --topic xyz

    解决办法:

    打开kafka的日志,config/log4j.properties文件中日志级别全改为DEBUG;

    在执行上面的接收命令时候,在一个broker的日志中看到如下错误日志,

    Error while fetching metadata for __consumer_offsets-26: listener ListenerName(PLAINTEXT) not found on leader -1

    进入zookeeper/bin目录,执行zkCli.sh,

    rmr /brokers
    rmr /config/brokers
    rmr /config/topics

     注:我这里不是生产环境,操作随意,你可以尝试只执行第一条命令后就重启集群试试。

    而后重启kafka集群即可。

    根本原因不清楚,参考这里 https://stackoverflow.com/questions/34844209/consumer-not-receiving-messages-kafka-console-new-consumer-api-kafka-0-9  Liz Bennett 的做法,根据咱们这里的报错大概意思也是brokers相关信息有误,所以在zookeeper中清楚相关信息,重启集群,来尝试。

    2、spark streaming kafka报错:

    org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

    设置consumer configs, max.poll.records=200   (client1.2版本默认是1000) 以后再次尝试可以了;

  • 相关阅读:
    如何打日志才能方便排查问题?
    为什么 HashMap 并发时会引起死循环?
    Spring 为什么会有 FactoryBean?
    常用 Git 使用技巧,收藏了~
    Gin中context的使用
    Gin的路由算法
    k8s中的网络通信总结
    k8s架构
    Golang中的值拷贝与引用拷贝
    golang知识要点总结
  • 原文地址:https://www.cnblogs.com/mylittlecabin/p/11541473.html
Copyright © 2011-2022 走看看