zoukankan      html  css  js  c++  java
  • Spark:将RDD[List[String,List[Person]]]中的List[Person]通过spark api保存为hdfs文件时一直出现not serializable task,没办法找到"spark自定义Kryo序列化输入输出API"

    声明:本文转自《在Spark中自定义Kryo序列化输入输出API

        在Spark中内置支持两种系列化格式:(1)、Java serialization;(2)、Kryo serialization。在默认情况下,Spark使用的是Java的ObjectOutputStream系列化框架,它支持所有继承java.io.Serializable的类系列化,虽然Java系列化非常灵活,但是它的性能不佳。然而我们可以使用Kryo 库来系列化,它相比Java serialization系列化高效,速度很快(通常比Java快10x),但是它不支持所有的系列化对象,而且要求用户注册类。

      在Spark中,使用Kryo系列化比使用Java系列化更明智。在shuffling和caching大量数据的情况下,使用 Kryo系列化就变得非常重要。

      虽然Kryo支持对RDD的cache和shuffle,但是在Spark中不是内置就显示提供使用Kryo将数据系列化到磁盘中的输入输出API,RDD中的saveAsObjectFile和SparkContext中的objectFile方法仅仅支持使用Java系列化。所以如果我们可以使用Kryo系列化将会变得很棒!

    实现代码:

    import java.sql.Timestamp
    import java.text.SimpleDateFormat
    import java.util.Calendar
    
    import org.apache.spark.api.java.JavaPairRDD
    import org.apache.spark.api.java.function.PairFunction
    import org.apache.spark.sql.functions._
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.hive.HiveContext
    import java.io._
    
    import com.esotericsoftware.kryo.io.Input
    import org.apache.hadoop.conf._
    import org.apache.hadoop.fs._
    import org.apache.hadoop.fs.Path._
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.FileSystem
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.fs.permission.FsAction
    import org.apache.hadoop.fs.permission.FsPermission
    import org.apache.hadoop.fs.FSDataOutputStream
    import org.apache.hadoop.io.{BytesWritable, NullWritable}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.serializer.KryoSerializer
    
    import scala.reflect.ClassTag
    
    // user defined class that need to serialized
    class Person(val name: String)
    
    /**
      * Created by Administrator on 11/10/2017.
      */
    object TestSaveClasToHdfs{
    
      def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
        val kryoSerializer = new KryoSerializer(rdd.context.getConf)
        rdd.mapPartitions(iter => iter.grouped(10)
          .map(_.toArray))
          .map(splitArray => {
            //initializes kyro and calls your registrator class
            val kryo = kryoSerializer.newKryo()
    
            //convert data to bytes
            val bao = new ByteArrayOutputStream()
            val output = kryoSerializer.newKryoOutput()
            output.setOutputStream(bao)
            kryo.writeClassAndObject(output, splitArray)
            output.close()
    
            // We are ignoring key field of sequence file
            val byteWritable = new BytesWritable(bao.toByteArray)
            (NullWritable.get(), byteWritable)
          }).saveAsSequenceFile(path)
      }
    
      def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)(implicit ct: ClassTag[T]) = {
        val kryoSerializer = new KryoSerializer(sc.getConf)
        sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],
          minPartitions)
          .flatMap(x => {
            val kryo = kryoSerializer.newKryo()
            val input = new Input()
            input.setBuffer(x._2.getBytes)
            val data = kryo.readClassAndObject(input)
            val dataObject = data.asInstanceOf[Array[T]]
            dataObject
          })
      }
    
      def main(args: Array[String]) {
        if (args.length < 1) {
          println("Please provide output path")
          return
        }
        val conf = new SparkConf().setMaster("local").setAppName("kryoexample")
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sc = new SparkContext(conf)
    
        val outputPath = args(0)
    
        //create some dummy data
        val personList = 1 to 10000 map (value => new Person(value + ""))
        val personRDD = sc.makeRDD(personList)
    
        saveAsObjectFile(personRDD, outputPath)
        val rdd = objectFile[Person](sc, outputPath)
        println(rdd.map(person => person.name).collect().toList)
      }
    }

    在spark-shell中执行时,一直出现错误,但是当我把它编译为jar包使用spark-submit命令提交时,错误就没有了。

  • 相关阅读:
    Bookshop(一)数据库连接
    JSTL标签库(一)核心标签库
    JavaScript基础(一)之语法、变量、数据类型
    JSP页面元素构成
    微信开发(三)消息的自动回复和关注后自动回复
    微信开发(二)开发者模式接口接入
    微信开发(一)内网映射之natapp的使用
    JavaWeb监听器的使用(一)监听上下文和会话信息
    记录一下这次web实训的两个网站
    原生JS实现简易轮播图
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/7807508.html
Copyright © 2011-2022 走看看