最近写的 binlog2kafka storm job 上线在一个新的集群环境中(storm 0.9.0.1, kafka 0.8), storm job 运行时报出如下异常:
java.lang.RuntimeException: java.nio.channels.UnresolvedAddressException at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:83) at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:45) at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:118) at backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562) at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:701) Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:89) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:514) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:129) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:80) at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:55) at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:45) at storm.kafka.PartitionManager.(PartitionManager.java:77) at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:78) ... 6 more
网上查了一下, 很多答案答非所问, 最后在github的kafkaSpout的作者的Issue List中找到了答案, 顺利解决 https://github.com/wurstmeister/storm-kafka-0.8-plus/issues/36
问题在于: storm kafkaSpout 通过ZK去获取kafka的地址, 但是zk中保存的kafka是以域名的方式保存的, 而新集群上没有配置相关的hosts, 所以只需要在新集群的supervior机器上的/etc/hosts加上对应的kafka hosts即可