zoukankan      html  css  js  c++  java
  • Kafka系列2-producer和consumer报错

    1. 使用127.0.0.1启动生产和消费进程:

    1)启动生产者进程:

    bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

    输入消息:

    this is msg

    生产者进程报错:

    [plain] view plain copy
     
    1. [2016-06-03 11:33:47,934] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)  
    2. [2016-06-03 11:33:49,554] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)  
    3. [2016-06-03 11:33:51,177] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)  
    4. [2016-06-03 11:33:53,398] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)  
     
     

    2)启动消费者进程:

    bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning

    消费者进程报错:

    [plain] view plain copy
     
    1. [2016-06-03 11:34:53,574] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$)  
    2. java.nio.channels.ClosedChannelException  
    3.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
    4.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
    5.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
    6.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
    7.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
    8.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
    9.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
    10.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
    11. [2016-06-03 11:34:53,651] WARN [console-consumer-72675_zzs-1464924871670-2192d80a-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)  
    12. kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed  
    13.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)  
    14.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
    15.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
    16.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
    17. Caused by: java.nio.channels.ClosedChannelException  
    18.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
    19.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
    20.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
    21.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
    22.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
    23.     ... 3 more  
    24. [2016-06-03 11:35:14,916] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$)  
    25. java.nio.channels.ClosedChannelException  
    26.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
    27.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
    28.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
    29.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
    30.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
    31.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
    32.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
    33.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
    34. [2016-06-03 11:35:14,918] WARN [console-consumer-72675_zzs-1464924871670-2192d80a-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)  
    35. kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed  
    36.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)  
    37.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
    38.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
    39.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
    40. Caused by: java.nio.channels.ClosedChannelException  
    41.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
    42.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
    43.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
    44.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
    45.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
    46.     ... 3 more  



    2 使用localhost启动生产和消费进程:

    1)启动生产者进程:

    [plain] view plain copy
     
    1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  
    2. this is msg  
    3. [2016-06-03 11:44:16,932] WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
    4. [2016-06-03 11:44:18,255] WARN Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
    5. [2016-06-03 11:44:18,648] WARN Error while fetching metadata with correlation id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
    6. [2016-06-03 11:44:18,801] WARN Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
    7. [2016-06-03 11:44:18,928] WARN Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
    8. [2016-06-03 11:44:19,035] WARN Error while fetching metadata with correlation id 5 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
    9. [2016-06-03 11:44:19,180] WARN Error while fetching metadata with correlation id 6 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
    10. [2016-06-03 11:44:19,308] WARN Error while fetching metadata with correlation id 7 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  

    2)启动消费者进程:

    [plain] view plain copy
     
    1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  
    2. [2016-06-03 11:45:18,330] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$)  
    3. java.nio.channels.ClosedChannelException  
    4.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
    5.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
    6.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
    7.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
    8.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
    9.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
    10.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
    11.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
    12. [2016-06-03 11:45:18,541] WARN [console-consumer-42554_zzs-1464925496436-ae2ee9c7-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)  
    13. kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed  
    14.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)  
    15.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
    16.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
    17.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
    18. Caused by: java.nio.channels.ClosedChannelException  
    19.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
    20.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
    21.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
    22.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
    23.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
    24.     ... 3 more  



    3.解决问题

    1)查看Kafka的配置文件,cat config/server.properties

    [plain] view plain copy
     
    1. zookeeper.connect=localhost:2181  

    连接的zookeeper的为localhost,所以需要用localhost启动生产和消费进程

    2)查看kafka启动的日志,发现

    [plain] view plain copy
     
    1. Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(218.30.64.194,9092,PLAINTEXT) (kafka.utils.ZkUtils)  
    [plain] view plain copy
     
    1.   

    为什么启动的broker的ip是 218.30.64.194

    ==> 没有绑定Kafka启动监听的host信息

    vi  config/server.properties

    [plain] view plain copy
     
    1. listeners=PLAINTEXT://localhost:9092  



    3)重新启动zookeeper、kafka、consumer、producer

    ./bin/zookeeper-server-start.sh ./config/zookeeper.properties

    ./bin/kafka-server-start.sh ./config/server.properties

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    在producer中输入消息,可以在producer中消费:

    [plain] view plain copy
     
    1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  
    2. this is msg  
    3. this is msg2  
    [plain] view plain copy
     
    1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  
    2. this is msg  
    3. this is msg2  
    4. this is msg3  
    5. this is mgs4  



    问题解决!

  • 相关阅读:
    (4)ES6解构赋值-字符串篇
    (3)ES6解构赋值-对象篇
    (2)ES6解构赋值-数组篇
    (1)ES6中let,const,对象冻结,跨模块常量,新增的全局对象介绍
    MySQL中char与varchar区别,varchar最大长度是多少?
    集成学习实战——Boosting(GBDT,Adaboost,XGBoost)
    集成学习——Boosting(GBDT,Adaboost,XGBoost)
    集成学习(Random Forest)——实践
    集成学习——Bagging
    决策树实践
  • 原文地址:https://www.cnblogs.com/hackerer/p/6296871.html
Copyright © 2011-2022 走看看