zoukankan      html  css  js  c++  java
  • storm-sql-kafka问题情况

      首先上官方文档:http://storm.apache.org/releases/1.2.2/storm-sql.html

    解决的问题

      1、kafka版本不对

      开始测试时采用storm1.2.2+kafka2.0.1的组合进行测试,测试过程完全按照官方文档所写进行。出现了kafka无法获取scala依赖的问题,经查找发现是storm-kafka采用的是2.10版本的scala,而kafka2.0.1采用的是2.11或者2.12,无法兼容。于是只能换回kafka0.8.2.2进行测试。

      2、storm找不到依赖

      按照官方文档所写的提交命令无法找到storm-sql-runtime,所以需要加入storm-sql相关依赖包,任务提交命令如下:

      

    ./bin/storm sql /softwares/storm-1.2.2/storm-sql.sql stormsql1 --artifacts "org.apache.storm:storm-sql-kafka:1.2.2,org.apache.storm:storm-kafka:1.2.2,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2,org.apache.storm:storm-sql-runtime:1.2.2,org.apache.storm:storm-sql-core:1.2.2"

    遇到的问题

      任务提交成功,但是bolt执行总是失败。

      此问题无法直接解决,并且在UI上无法找到日志。

      排除过程如下:首先去日志里看看:

    less logs/workers-artifacts/stormsql1-31-1542269873/6700/worker.log

      发现:

    2018-11-15 16:18:03.992 o.a.s.t.t.s.TransactionalState Thread-4-$spoutcoord-spout-TRIDENTSTREAMSCANREL_21_0-executor[2 2] [WARN] Failed to deserialize zookeeper data for path /meta/1
    org.apache.storm.shade.org.json.simple.parser.ParseException: null
            at org.apache.storm.shade.org.json.simple.parser.Yylex.yylex(Unknown Source) ~[storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.shade.org.json.simple.parser.JSONParser.nextToken(Unknown Source) ~[storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.shade.org.json.simple.parser.JSONParser.parse(Unknown Source) ~[storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.shade.org.json.simple.parser.JSONParser.parse(Unknown Source) ~[storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.shade.org.json.simple.parser.JSONParser.parse(Unknown Source) ~[storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.shade.org.json.simple.JSONValue.parseWithException(Unknown Source) ~[storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.trident.topology.state.TransactionalState.getData(TransactionalState.java:176) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.trident.topology.state.RotatingTransactionalState.sync(RotatingTransactionalState.java:165) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.trident.topology.state.RotatingTransactionalState.<init>(RotatingTransactionalState.java:46) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.trident.spout.TridentSpoutCoordinator.prepare(TridentSpoutCoordinator.java:57) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:43) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.daemon.executor$fn__10795$fn__10808.invoke(executor.clj:803) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:482) [storm-core-1.2.2.jar:1.2.2]
            at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
            at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
    2018-11-15 16:18:03.997 o.a.s.d.executor Thread-4-$spoutcoord-spout-TRIDENTSTREAMSCANREL_21_0-executor[2 2] [INFO] Prepared bolt $spoutcoord-spout-TRIDENTSTREAMSCANREL_21_0:(2)
    2018-11-15 16:18:04.550 o.a.s.k.t.TridentKafkaState Thread-14-b-0-executor[4 4] [WARN] Could not send messages [[null, null], [null, null], [null, null]] to topic = testout
    java.lang.NullPointerException: null
            at org.apache.storm.kafka.IntSerializer.serialize(IntSerializer.java:35) ~[dep-org.apache.storm-storm-kafka-jar-1.2.2.jar.1542245912000:1.2.2]
            at org.apache.storm.kafka.IntSerializer.serialize(IntSerializer.java:26) ~[dep-org.apache.storm-storm-kafka-jar-1.2.2.jar.1542245912000:1.2.2]
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:318) ~[dep-org.apache.kafka-kafka-clients-jar-0.8.2.2.jar.1542267398000:?]
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) ~[dep-org.apache.kafka-kafka-clients-jar-0.8.2.2.jar.1542267398000:?]
            at org.apache.storm.kafka.trident.TridentKafkaState.updateState(TridentKafkaState.java:87) [dep-org.apache.storm-storm-kafka-jar-1.2.2.jar.1542245912000:1.2.2]
            at org.apache.storm.kafka.trident.TridentKafkaUpdater.updateState(TridentKafkaUpdater.java:29) [dep-org.apache.storm-storm-kafka-jar-1.2.2.jar.1542245912000:1.2.2]
            at org.apache.storm.kafka.trident.TridentKafkaUpdater.updateState(TridentKafkaUpdater.java:26) [dep-org.apache.storm-storm-kafka-jar-1.2.2.jar.1542245912000:1.2.2]
            at org.apache.storm.trident.planner.processor.PartitionPersistProcessor.finishBatch(PartitionPersistProcessor.java:98) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.java:151) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExecutor.java:266) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExecutor.java:299) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:373) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) [storm-core-1.2.2.jar:1.2.2]
            at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
            at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
            at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]

      接下来分为两步,其一是查找/meta/1这个目录在zookeeper的哪里,另一个是查找报错的源码在哪里,根据storm源码可以看到:

      

        public Object getData(String path) {
            path = "/" + path;
            try {
                Object data;
                if (_curator.checkExists().forPath(path) != null) {
                    // Use parseWithException instead of parse so we can capture deserialization errors in the log.
                    // They are likely to be bugs in the spout code.
                    try {
                        data = JSONValue.parseWithException(new String(_curator.getData().forPath(path), "UTF-8"));
                    } catch (ParseException e) {
                        LOG.warn("Failed to deserialize zookeeper data for path {}", path, e);
                        data = null;
                    }
                } else {
                    data = null;
                }
                LOG.debug("Get. [path = {}] => [data = {}]", path, data);
                return data;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

      可以看到报错的是JSONValue.parseWithException出错。获取数据是解析一个json,并放到data里,而通过判断条件我们可以知道path是有内容的,只不过读不出来。

      通过查询源码找到/meta/1的完整路径为:/transactional/....../meta/1,去此路径下找到对应的文件为:/transactional/TRIDENTSTREAMSCANREL_21_0/coordinator/meta/1

      获取此文件信息:get /transactional/TRIDENTSTREAMSCANREL_21_0/coordinator/meta/1

      

    [GlobalPartitionInformation{topic=testin, partitionMap={0=cloud22:19092, 1=cloud22:19092, 2=cloud22:19092, 3=cloud22:19092, 4=cloud22:19092}}]
    cZxid = 0xf02f
    ctime = Thu Nov 15 10:08:13 CST 2018
    mZxid = 0x45a9b
    mtime = Thu Nov 15 17:02:31 CST 2018
    pZxid = 0xf02f
    cversion = 0
    dataVersion = 11444
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 142
    numChildren = 0

      可以发现,emmm...这一段信息看格式应该是通过toString写出来的,但是读却是用json格式读。代码鬼才,我觉得不行。

    结论:storm-sql-kafka基本不可用,至少未找到解决方案,从错误来看,zookeeper中写kafka partitions meta的代码和读的代码完全无法匹配上。在不修改源码的前提下无法解决此问题。

      

  • 相关阅读:
    J2那几个E和Web基础
    PHP开发人员对JAVA的WEB开发入门(初版-基础知识)
    一个处理大数据的后台服务(已废弃)
    一个请求过来都经过了什么
    请一定记得升级java虚拟机
    面向对象之 结构体和类的区别
    Swift 结构体的使用
    iOS 波浪效果的实现
    iOS 常用三方(持续更新)
    Xshell 链接 Could not connect to '192.168.80.129' (port 22): Connection failed
  • 原文地址:https://www.cnblogs.com/gaoze/p/9964993.html
Copyright © 2011-2022 走看看