spark的序列化主要使用了hadoop的writable和java的Serializable。
说到底就是使用hadoop的writable替换了java的默认序列化实现方式。
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value: T = t
override def toString: String = t.toString
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.defaultWriteObject()
new ObjectWritable(t).write(out)
}
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
ow.setConf(new Configuration())
ow.readFields(in)
t = ow.get().asInstanceOf[T]
}
}
这个有个让人疑惑的地方是使用@transient 表示该值不会被序列化,我做个一个测试是可以的,为什么呢,因为spark这里定制了java的序列化,使用hadoop的序列化方案,同时t是Writable类型没有实现Serializable接口不能被序列化。
object SerializableWritableTest extends App { println("start") val name:Text=new Text("gaoxing"); val s=new SerializableWritable[Text](name); val fout=new FileOutputStream("name.dat"); val out=new ObjectOutputStream(fout); out.writeObject(s) val fin=new FileInputStream("name.dat"); val in=new ObjectInputStream(fin); val n=in.readObject() println(n.asInstanceOf[SerializableWritable[Text]].value.toString) println("end") }