zoukankan      html  css  js  c++  java
  • Spark 实现自定义对象sequenceFile方式存储,读写示例(scala编写)

     1 package com.fuge.bigdata.datahub.analysis
     2 
     3 import java.io.{DataInput, DataOutput}
     4 
     5 import com.fuge.bigdata.tools.common.utils.SparkUtils
     6 import org.apache.hadoop.io.{NullWritable, WritableComparable}
     7 import org.apache.spark.SparkContext
     8 
     9 /**
    10   * Created by chen xiang on 18-6-13.
    11   * 一个使用SequenceFile进行存储读取的使用示例
    12   */
    13 object SequenceFileUsage {
    14   def main(args: Array[String]): Unit = {
    15 
    16     require(args.length == 1)
    17     // 构建SparkContext对象,封装过,单独运行,自行修改后定义
    18     val sc = new SparkContext(SparkUtils.getSparkConf("SequenceFileUsage"))
    19 
    20     // 获取路径参数
    21     val path = args(0).trim
    22 
    23     // 定义测试数据
    24     val studentList = List(Student("01", "abc"), Student("02", "baby"), Student("03", "xiang"))
    25 
    26     // 序列化测试数据到RDD,并写入到bos
    27     sc.parallelize(studentList)
    28       .repartition(1)
    29       // 以NullWritable 为key,构建kv结构.SequenceFile需要kv结构才能存储,NullWritable不占存储
    30       .map(NullWritable.get() -> _)
    31 // 压缩参数可选用
    32       .saveAsSequenceFile(s"$path", Option(classOf[GzipCodec]))
    33 
    34     // 读取刚才写入的数据
    35     val studentRdd = sc.sequenceFile(s"$path/part-*", classOf[NullWritable], classOf[Student])
    36       .map {
    37         // 读取数据,并且重新赋值对象
    38         case (_, y) => Student(y.id, y.name)
    39       }
    40       .persist()
    41 
    42     studentRdd
    43       .foreach(x => println("count: " + x.id + "	" + x.name))
    44   }
    45 }
    46 
    47 case class Student(var id: String, var name: String) extends WritableComparable[Student] {
    48   /**
    49     * 重写无参构造函数,用于反序列化时的反射操作
    50     */
    51   def this() {
    52     this("", "")
    53   }
    54 
    55   /**
    56     * 继承Comparable接口需要实现的方法,用于比较两个对象的大小
    57     */
    58   override def compareTo(o: Student): Int = {
    59     var cmp = id compareTo o.id
    60     if (cmp == 0) {
    61       cmp = name compareTo o.name
    62     }
    63     cmp
    64   }
    65 
    66   /**
    67     * 继承Writable接口需要实现的方法-反序列化读取结果,并且赋值到对象字段
    68     * 注意要和write的顺序一致
    69     */
    70   override def readFields(in: DataInput): Unit = {
    71     name = in.readUTF()
    72     id = in.readUTF()
    73     println("count: " + "	 id = " + id + "	 name = " + name)
    74   }
    75 
    76   /**
    77     * 继承Writable接口需要实现的方法-序列化写操作,将对象字段值写入序列化
    78     * 注意要和readFields的顺序一致
    79     */
    80   override def write(out: DataOutput): Unit = {
    81     out.writeUTF(id)
    82     out.writeUTF(name)
    83   }
    84 }
    补充:
    1. 自定义的类需要进行序列化,必须都要实现Writable,一般情况下采用实现WritableComparable的方式,并且实现comparaTo,readFields, write方法,并且提供一个无参构造函数
    2. readFields和write方法,里面字段的顺序要保持一致
    3. 遇到集合类型,序列化时需要先将集合长度写进去,然后再挨个写集合数据
    4. 遇到集合类型,反序列化时需要先读取集合的长度,然后接收数据,如果集合数据类型是自定义类型,还需要先实例化一个无参构造,然后赋值。
    5. SequenceFile需要使用KV结构才能调用存储,可以使用一个NullWritable来占位,上诉例子中的K值就是使用的NullWritable进行的
    6. sequenceFile序列化后占用的存储空间比较大,有需要的话,可以在存储的时候加上压缩算法,具体使用方式可以见上诉的例子

  • 相关阅读:
    3个简单易懂的项目技巧
    使用HTML和CSS实现3D文字效果
    星巴克咖啡杯svg特效
    Web移动端实现自适应缩放界面的方法汇总
    javscript 闭包应用介绍
    pytest命令行传参
    vue7:父组件向子组件传值
    vue6:子组件向父组件传值
    vue5: is规避错误
    vue4:refs介绍
  • 原文地址:https://www.cnblogs.com/husky/p/9178197.html
Copyright © 2011-2022 走看看