zoukankan      html  css  js  c++  java
  • 在Spark中自定义Kryo序列化输入输出API(转)

    原文链接:在Spark中自定义Kryo序列化输入输出API

    Spark中内置支持两种系列化格式:(1)、Java serialization;(2)、Kryo serialization。在默认情况下,Spark使用的是Java的ObjectOutputStream系列化框架,它支持所有继承java.io.Serializable的类系列化,虽然Java系列化非常灵活,但是它的性能不佳。然而我们可以使用Kryo 库来系列化,它相比Java serialization系列化高效,速度很快(通常比Java快10x),但是它不支持所有的系列化对象,而且要求用户注册类。

      在Spark中,使用Kryo系列化比使用Java系列化更明智。在shuffling和caching大量数据的情况下,使用 Kryo系列化就变得非常重要。

      虽然Kryo支持对RDD的cache和shuffle,但是在Spark中不是内置就显示提供使用Kryo将数据系列化到磁盘中的输入输出API,RDD中的saveAsObjectFile和SparkContext中的objectFile方法仅仅支持使用Java系列化。所以如果我们可以使用Kryo系列化将会变得很棒!

      在这篇文章中,我将讨论如何自定义Kryo系列化输出输出相关API来将数据进行读写到磁盘中。

    写数据

      通常,我们使用rdd.saveAsObjectFile API将已经系列化的对象写入到磁盘中。下面的代码将展示如何使用我们自定义的saveAsObjectFile方法将已经使用kryo系列化的对象写入到磁盘中:

    1 def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String)

    这个函数中参数rdd就是我们需要写的数据;path是数据保存的路径。

    1 val kryoSerializer = new KryoSerializer(rdd.context.getConf)

      KryoSerializer是Spark内部提供的用于提供操作Kryo的类。在上述代码中,我们创建了KryoSerializer对象,并从rdd.context.getConf中获取传进来的缓存大小。

    1 rdd.mapPartitions(iter => iter.grouped(10)
    2       .map(_.toArray))
    3       .map(splitArray => {}

    所有的objectFile 将会在HDFS上保存,我们对RDD中的每个分片进行遍历,然后将他们转换成Byte数组。

    1 val kryo = kryoSerializer.newKryo()

      对每个splitArray,我们首先创建了kryo实例,kryo是线程不安全的,所以我们在每个map操作中单独创建。当我们调用 kryoSerializer.newKryo()来创建新的kryo实例,他也会调用我们自定义的KryoRegistrator。

    1 //create output stream and plug it to the kryo output
    2 val bao = new ByteArrayOutputStream()
    3 val output = kryoSerializer.newKryoOutput()
    4 output.setOutputStream(bao)
    5 kryo.writeClassAndObject(output, splitArray)
    6 output.close()

    一旦我们拥有kryo实例,我们就可以创建kryo输出对象,然后我们将类信息和对象写入到那个输出对象中。

    1 val byteWritable = new BytesWritable(bao.toByteArray)
    2       (NullWritable.get(), byteWritable)
    3     }).saveAsSequenceFile(path)

      我们在创建byteWritable的时候,包装了bytearray,然后保存成Sequence文件。使用那些代码我们就可以将Kryo对象写入到磁盘中。完整代码如下:

    01 /**
    02  * User: 过往记忆
    03  * Date: 15-04-24
    04  * Time: 上午07:24
    05  * bolg: http://www.iteblog.com
    06  * 本文地址:http://www.iteblog.com/archives/1328
    07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
    08  * 过往记忆博客微信公共帐号:iteblog_hadoop
    09  */
    10  
    11   def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
    12     val kryoSerializer = new KryoSerializer(rdd.context.getConf)
    13  
    14     rdd.mapPartitions(iter => iter.grouped(10)
    15       .map(_.toArray))
    16       .map(splitArray => {
    17       //initializes kyro and calls your registrator class
    18       val kryo = kryoSerializer.newKryo()
    19  
    20       //convert data to bytes
    21       val bao = new ByteArrayOutputStream()
    22       val output = kryoSerializer.newKryoOutput()
    23       output.setOutputStream(bao)
    24       kryo.writeClassAndObject(output, splitArray)
    25       output.close()
    26  
    27       // We are ignoring key field of sequence file
    28       val byteWritable = new BytesWritable(bao.toByteArray)
    29       (NullWritable.get(), byteWritable)
    30     }).saveAsSequenceFile(path)
    31   }

    读数据

      光有写没有读对我们来说仍然不完美。通常我们使用sparkContext中的objectFile API从磁盘中读取数据,这里我们使用自定义的objectFile API来读取Kryo对象文件。

    01 def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)
    02     (implicit ct: ClassTag[T]) = {
    03     val kryoSerializer = new KryoSerializer(sc.getConf)
    04     sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],
    05        minPartitions)
    06        .flatMap(x => {
    07        val kryo = kryoSerializer.newKryo()
    08        val input = new Input()
    09        input.setBuffer(x._2.getBytes)
    10        val data = kryo.readClassAndObject(input)
    11        val dataObject = data.asInstanceOf[Array[T]]
    12        dataObject
    13     })
    14   }

    上面的步骤和写的步骤很类似,只不过这里我们使用的是input,而不是output。我们从BytesWritable中读取bytes数据,然后使用readClassAndObject API反序列化数据。

    如何使用

      下面例子使用上面介绍的两个方法来系列化和反序列化Person对象:

    01 /**
    02  * User: 过往记忆
    03  * Date: 15-04-24
    04  * Time: 上午07:24
    05  * bolg: http://www.iteblog.com
    06  * 本文地址:http://www.iteblog.com/archives/1328
    07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
    08  * 过往记忆博客微信公共帐号:iteblog_hadoop
    09  */
    10  
    11 // user defined class that need to serialized
    12   class Person(val name: String)
    13  
    14  def main(args: Array[String]) {
    15  
    16     if (args.length < 1) {
    17       println("Please provide output path")
    18       return
    19     }
    20     val outputPath = args(0)
    21  
    22     val conf = new SparkConf().setMaster("local").setAppName("kryoexample")
    23     conf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer")
    24     val sc = new SparkContext(conf)
    25  
    26     //create some dummy data
    27     val personList = 1 to 10000 map (value =new Person(value + ""))
    28     val personRDD = sc.makeRDD(personList)
    29  
    30     saveAsObjectFile(personRDD, outputPath)
    31     val rdd = objectFile[Person](sc, outputPath)
    32     println(rdd.map(person => person.name).collect().toList)
    33   }

  • 相关阅读:
    oracle表解锁
    pl/sql 如何配置连接远程一个或多个数据库
    Hibernate通过自编写Sql修改
    Hibernate通过自编写sql查询
    java生成临时令牌和访问令牌
    java生成字母首位8位随机码
    [C] 创建目录_mkdir()函数
    [C] 判断目录 / 文件是否存在access()函数
    [C] getopt使用说明
    [C] Segmentation fault (core dumped)
  • 原文地址:https://www.cnblogs.com/gaopeng527/p/4962030.html
Copyright © 2011-2022 走看看