zoukankan      html  css  js  c++  java
  • HBase Error: connection object not serializable

    HBase Error: connection object not serializable

    想在spark driver程序中连接HBase数据库,并将数据插入到HBase,但是在spark集群提交运行过程中遇到错误:connection object not serializable

    详细的错误:

    Exception in thread "main" java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
    com.sae.model.HbaseHelper
    Serialization stack:
            - object not serializable (class: com.sae.model.HbaseHelper, value: com.sae.model.HbaseHelper@27a09971)
            - field (class: com.sae.demo.KafkaStreamingTest$$anonfun$main$1, name: hbHelper$1, type: class com.sae.model.HbaseHelper)
            - object (class com.sae.demo.KafkaStreamingTest$$anonfun$main$1, <function1>)
            - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1)
            - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>)
            - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
            - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@4bdf)
            - element of array (index: 0)
            - array (class [Ljava.lang.Object;, size 16)
            - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
            - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@4bdf, org.apache.spark.streaming.dstream.ForEachDStream@2b4d4327))
            - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
            - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
    0 checkpoint files
    
    ])
            - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
            - object (class org.apache.spark.streaming.kafka.KafkaInputDStream, org.apache.spark.streaming.kafka.KafkaInputDStream@163042ea)
            - element of array (index: 0)
            - array (class [Ljava.lang.Object;, size 16)
            - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
            - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.kafka.KafkaInputDStream@163042ea))
            - writeObject data (class: org.apache.spark.streaming.DStreamGraph)
            - object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@2577a95d)
            - field (class: org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph)
            - object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@2b4b96a4)
            at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:557)
            at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
            at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
            at com.sae.demo.KafkaStreamingTest$.main(StreamingDataFromKafka.scala:225)
            at com.sae.demo.KafkaStreamingTest.main(StreamingDataFromKafka.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:497)
            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

    解决办法:

    参考官方文档:传送门

    应该把打开数据库连接的代码放到foreachPartition内部,如:

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val connection = createNewConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        connection.close()
      }
    }
  • 相关阅读:
    实时控制软件设计第一周作业-汽车ABS软件系统案例分析
    团队项目·冰球模拟器——任务间通信、数据共享等设计
    团队项目·冰球模拟器——cmake 自动化构建系统的配置文件的编写
    团队项目·冰球模拟器——文件结构设计
    团队项目·冰球模拟器——插值算法接口设计
    第四周作业
    第三周作业、实时操作系统µC/OS介绍及其它内容
    第二周作业、停车场门禁控制系统状态机
    Open Dynamics Engine for Linux 安装笔记
    第一周作业、典型实时控制系统案例分析
  • 原文地址:https://www.cnblogs.com/keitsi/p/5356111.html
Copyright © 2011-2022 走看看