Hadoop基于DataInput和DataOutput实现了简单、高效的序列化协议,而Writable接口定义了Hadoop序列化的方法,MapReduce框架中的不论什么键值类型都实现了该接口,比方IntWritable、LongWritable等,具体的类关系见下图:
通过上图可以发现,MapReduce中经常使用的键值类型都没有直接实现Writable接口,而是实现了该接口的子接口WritableComparable,该子接口还继承了Comparable接口,这意味着实现类除了可以序列化和发序列化外,还可以彼此进行比較,这是由于当这些类做为键在MapReduce中使用时,在排序阶段须要对其进行比較排序。但这并非说假设实现自己定义的序列化类时,必须实现WritableComparable接口,仅当自己定义的序列化类也用作键时才必须实现该接口,假设仅是做为值使用的话,仅实现Writable接口就可以。
当自己定义的序列化类用做键时,须要考虑到在依据键进行分区时经经常使用到hashCode()方法,因此须要确保该方法在不同的JVM实例中返回同样的结果,而Object对象中默认的hashCode()方法不可以满足该特性,所以在实现自己定义类时须要重写hashCode()方法,而假设两个对象依据equals()方法是相等的,那么二者的hashCode()返回值也必须同样,因此在重写hashCode()的时候,有必要重写equals(Object obj)方法。
除了上图中实现WritableComparable的类外,还有若干类直接实现了Writable接口,比方ObjectWritable,一个多态的Writable,该类不使用Writable封装就行处理数组、字符串和其他Java基本类型。还有Writable集合类:ArrayWritable、EnumSetWritable、MapWritable、TwoDArrayWritable、SortedMapWritable。当中ArrayWritable是对同样Writable类型的数组的封装,也就是该类中Writable的类型必须同样,是IntWritable都是IntWritable,不能既有IntWritable也有LongWritable。TwoDArrayWritable是对二维数组即矩阵的封装,同样该类中Writable的类型也必须同样。EnumSetWritable是对EnumSet封装的Writable,MapWritable实现了Map<Writable,Writable>接口,SortedMapWritable实现了SortedMap<WritableComparable,Writable>接口,二者当然也都实现了Writable接口,在二者的内部实现中,使用Byte类型指示指定的类型,因此在一个Map实例中最多仅仅能有127个不同的类:
/* Class to id mappings */ @VisibleForTesting Map<Class, Byte> classToIdMap = new ConcurrentHashMap<Class, Byte>(); /* Id to Class mappings */ @VisibleForTesting Map<Byte, Class> idToClassMap = new ConcurrentHashMap<Byte, Class>();
如今通过分析IntWritable和Text的源码来学习怎样编写Writable以及WritableComparable,首先是IntWritable的源码:
public class IntWritable implements WritableComparable<IntWritable> { private int value; public IntWritable() {} public IntWritable(int value) { set(value); } /** Set the value of this IntWritable. */ public void set(int value) { this.value = value; } /** Return the value of this IntWritable. */ public int get() { return value; } @Override //重写Writable中的readFields(DataInput in) public void readFields(DataInput in) throws IOException { value = in.readInt(); } @Override //重写Writable中的write(DataOutput out) public void write(DataOutput out) throws IOException { out.writeInt(value); } /** Returns true if <code>o</code> is a IntWritable with the same value. */ @Override public boolean equals(Object o) { if (!(o instanceof IntWritable)) return false; IntWritable other = (IntWritable)o; return this.value == other.value; } @Override public int hashCode() { return value; } /** Compares two IntWritables. */ @Override //重写Comparable接口中的compareTo方法 public int compareTo(IntWritable o) { int thisValue = this.value; int thatValue = o.value; return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); } @Override public String toString() { return Integer.toString(value); } //此处省略了继承自WritableComparator的内部类Comparator static { // register this comparator WritableComparator.define(IntWritable.class, new Comparator()); } }
IntWritable的源码相对来说还是比較简单的,除了实现接口中的方法外,还重写了hashCode、equals和toString方法,这也是要注意的一点。其次是Text类,Text将字符串存储为标准UTF8编码,提供了在字节层次序列化、反序列化和比較字符串的方法,比方decode(byte[]utf8)、encode(String string)、readFields(DataInput in)、write(DataOutput out)等。该类除了实现WritableComparable外,还继承自BinaryComparable抽象类,当中实现的方法例如以下:
private byte[] bytes; private int length; @Override public void readFields(DataInput in) throws IOException { //从输入流中读取整数值,很多其他工具方法可參考WritableUtils工具类 int newLength = WritableUtils.readVInt(in); setCapacity(newLength, false); //向bytes中读入长度为newLength的数据 in.readFully(bytes, 0, newLength); length = newLength; } @Override public void write(DataOutput out) throws IOException { WritableUtils.writeVInt(out, length); out.write(bytes, 0, length); } @Override public int compareTo(BinaryComparable other) { if (this == other) return 0; return WritableComparator.compareBytes(getBytes(), 0, getLength(), other.getBytes(), 0, other.getLength()); }
总结IntWritable和Text类的实现,可以据此实现自定的WritableComparable,以下就是一个简单的演示样例。在该演示样例中使用name和age做为联合键,仅仅有在二者都同样的情况下才觉得是一个对象。
public class CompositeWritable implements WritableComparable<CompositeWritable>{ private String name; private int age; public CompositeWritable(){} public CompositeWritable(String name, int age){ set(name, age); } @Override public void readFields(DataInput in) throws IOException { name = in.readUTF(); age = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(age); } @Override public int compareTo(CompositeWritable o) { int cmp = name.compareTo(o.getName()); if(cmp != 0) return cmp; return age < o.getAge()? -1:(age == o.getAge()? 0 : 1); } @Override public boolean equals(Object o) { if(o instanceof CompositeWritable){ CompositeWritable other = (CompositeWritable)o; return this.name.equals(other.name) && this.age == other.age; } return false; } @Override public int hashCode() { return name.hashCode() + age; } @Override public String toString() { return name + " " + age; } public void set(String name, int age){ this.name = name; this.age = age; } public String getName(){ return this.name; } public int getAge(){ return this.age; } }