zoukankan      html  css  js  c++  java
  • Hadoop的Writerable在Spark无法序列化的问题

    Spark序列化这块网上讲的比较少,自己还没来得及看这块代码,今天编程的时候遇到一个Hadoop的Writerable实现在Spark无法序列化的问题。我的代码如下:

    object EntryApp extends App{
    
    
      val conf = new SparkConf().setAppName("cgbdata").setMaster("local")
    
      val sc = new SparkContext(conf)
      val hadoopConfig = new Configuration()
      hadoopConfig.set("sequoiadb.input.url","master:11810,slave1:11810,slave2:11810")
      hadoopConfig.set("sequoiadb.in.collectionspace","default")
      hadoopConfig.set("sequoiadb.in.collection","bar")
      val sdbRDD = sc.newAPIHadoopRDD[Object,BSONWritable,SequoiadbInputFormat](hadoopConfig,classOf[SequoiadbInputFormat],classOf[Object], classOf[BSONWritable])
      sdbRDD.map(_._2.getBson).collect.map(println)
    
      sc.stop()
    }

    这块代码执行报了如下错误。

    Serialization stack:
        - object not serializable (class: org.bson.BasicBSONObject, value: { "_id" : { "$oid" : "55fe4caa4bb0b32e0e000000"} , "name" : "gaoxing"})
        - element of array (index: 0)
        - array (class [Lorg.bson.BSONObject;, size 2)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    因为Spark默认使用Java的序列化,而Writeable没有实现序列化接口,导致整个问题的发生。通过google找到解决方式了。

    val conf = new SparkConf().setAppName("cgbdata").setMaster("local").registerKryoClasses(Array(classOf[BSONWritable]))

    查看相关代码:

      def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
        val allClassNames = new LinkedHashSet[String]()
        allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty)
        allClassNames ++= classes.map(_.getName)
    
        set("spark.kryo.classesToRegister", allClassNames.mkString(","))
        set("spark.serializer", classOf[KryoSerializer].getName)
        this
      }

    调用registerKryoClasses这个方法,spark的序列化框架换成Kryo, 这个时候不需要实现Serializer接口了。当然里面具体怎么搞得不是太清楚。

  • 相关阅读:
    数组添加元素到特定位置
    jquery $().each,$.each的区别
    JSON字符串 与 JSON对象 互转
    js 获取 url 参数
    js 获取随机数 Math.random()
    js中的|| 与 &&
    js立即执行函数
    css 蒙层
    css 多行文本的溢出显示省略号(移动端)
    移动端利用-webkit-box水平垂直居中(旧弹性盒)
  • 原文地址:https://www.cnblogs.com/gaoxing/p/4904447.html
Copyright © 2011-2022 走看看