zoukankan      html  css  js  c++  java
  • Hadoop中WritableComparable 和 comparator

    1.WritableComparable

    查看HadoopAPI,如图所示:

    WritableComparable继承自Writable和java.lang.Comparable接口,是一个Writable也是一个Comparable,也就是说,既可以序列化,也可以比较!

    再看看它的实现类,发现BooleanWritable, BytesWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, MD5Hash, NullWritable, Record, RecordTypeInfo, Text, VIntWritable, VLongWritable都实现了WritableComparable类!

    WritableComparable的实现类之间相互来比较,在Map/Reduce中,任何用作键来使用的类都应该实现WritableComparable接口!

     

    Example:

     1 package cn.roboson.writable;
     2 
     3 import java.io.DataInput;
     4 import java.io.DataOutput;
     5 import java.io.IOException;
     6 
     7 import org.apache.hadoop.io.WritableComparable;
     8 
     9 /**
    10  * 1.自定义一个类,继承WritableComparable
    11  * 2.发现有三个未实现的方法,两个是Writable接口的(序列化),一个是Comparable接口的(用来比较)
    12  * 3.自定义比较,这里以counter来作为比较
    13  * @author roboson
    14  *
    15  */
    16 public class MyWritableComparable implements WritableComparable<MyWritableComparable>{
    17     
    18     private int counter;
    19     private long timestamp;
    20     public MyWritableComparable() {
    21         // TODO Auto-generated constructor stub
    22     }
    23 
    24     public MyWritableComparable(int counter,long timestamp) {
    25         // TODO Auto-generated constructor stub
    26         this.counter = counter;
    27         this.timestamp = timestamp;
    28     }
    29     
    30     @Override
    31     public void readFields(DataInput in) throws IOException {
    32         // TODO Auto-generated method stub
    33         
    34         //将输入流中的字节流数据转化为结构化数据
    35         counter = in.readInt();
    36         timestamp = in.readLong();
    37     }
    38 
    39     @Override
    40     public void write(DataOutput out) throws IOException {
    41         // TODO Auto-generated method stub
    42         
    43         //讲结构化数据写入输出流
    44         out.writeInt(counter);
    45         out.writeLong(timestamp);
    46     }
    47 
    48     @Override
    49     public int compareTo(MyWritableComparable other) {
    50         // TODO Auto-generated method stub
    51         int thisValue = this.counter;
    52         int otherValue = other.counter;
    53         return (thisValue < otherValue ? -1 : (thisValue == otherValue ? 0 : 1));
    54     }
    55 
    56     public int getCounter() {
    57         return counter;
    58     }
    59 
    60     public void setCounter(int counter) {
    61         this.counter = counter;
    62     }
    63 
    64     public long getTimestamp() {
    65         return timestamp;
    66     }
    67 
    68     public void setTimestamp(long timestamp) {
    69         this.timestamp = timestamp;
    70     }
    71     
    72 
    73     public static void main(String[] args) {
    74         MyWritableComparable comparable = new MyWritableComparable(3,4);
    75         MyWritableComparable otherComparable = new MyWritableComparable(4, 5);
    76         int value = comparable.compareTo(otherComparable);
    77         if(value==-1){
    78             System.out.println("comparable<otherComparable");
    79         }else if(value==0){
    80             System.out.println("comparable=otherComparable");
    81         }else{
    82             System.out.println("comparable>otherComparable");
    83         }
    84     }
    85 }

    运行结果:

     

    2.RawComparator

    对于MapReduce来说,因为中间有个基于键的排序阶段,所以类型的比较是非常重要的。Hadoop中提供了原生的比较接口RawComparator,该接口继承子Java Comparator接口。RawComparator接口允许其实现直接比较数据流中的记录,无需先把数据流饭序列化为对象,这样便避免了新建对象的额外开销。

     1 package org.apache.hadoop.io;
     2 
     3 import java.util.Comparator;
     4 
     5 public interface RawComparator<T> extends Comparator<T>{
     6     
     7     //自己的方法
     8     public int compare(byte[] b1, int s1, int l1, byte[] b2,int s2, int l2);
     9     
    10     //继承自Comparator的方法
    11     @Override
    12     public int compare(T o1, T o2);
    13     
    14     @Override
    15     public boolean equals(Object obj);
    16 }

     

    查看HadoopAPI:

    该类并非被多数的衍生类所实现,其具体的子类为WritableComparator,多数情况下是作为实现Writable接口的类的内置类,提供序列化字节的比较。如下图说所示:BooleanWritable, BytesWritable, ByteWritable, org.apache.hadoop.io.serializer.DeserializerComparator, DoubleWritable, FloatWritable, IntWritable, JavaSerializationComparator, LongWritable, LongWritable, MD5Hash, NullWritable, RecordComparator, Text, UTF8,都实现了RawComparator,作为其内部类。

    而WritableComparator则是其的具体子类。

    3.WritableComparator

    在《Hadoop权威指南》中,说到这儿,很模糊,只说WritableComparator是对继承自WritableComparable类的RawCompartor类的一个通用实现。让人看着很迷惑,这句话什么意思呢?

    首先、在第二个小标题RawComparator中,我门都知道WritableComparator实现了RawComparator这个接口,也就是说,WritableComparator是RawComparator的实现。

    其次、是对继承自WritableComparable类的RawComparator的一个通用实现。那么继承自WritableComparable类的RawComparator都有哪些呢?也就是说那些类,继承自WritableComparator,并且实现了RawComparator?在第二个小标题RawComparator中有也都说明清楚了,上面的红色部分!同理,实现了WritableComparable类的在第一个小标题WritableComparable中也有说明,红色部分字体!也就谁说WritableComparator是对BooleanWritable.Comparator, BytesWritable.Comparator, ByteWritable.Comparator, DoubleWritable.Comparator, FloatWritable.Comparator, IntWritable.Comparator, LongWritable.Comparator, MD5Hash.Comparator, NullWritable.Comparator, RecordComparator, Text.Comparator, UTF8.Comparator这些类的一个通用实现!这句话就引出了WritableComparator的两个功能:第一,它提供了对原始compare()方法的一个默认实现。该方法能够饭序列化将流中进行比较的对象,并调用对象的compara()方法。第二,它充当的是RawComparator实例的工厂(已注册Writable的实现)。例如,为了获得IntWratable的comparator,我们直接如下调用:

    RawComparator<IntWritable> comparator = WritableComparator.get(IntWratable.class);

    再来看看WritableComparator这个类是如何定义的,如下图所示:

    WritableComparator类类似于一个注册表,里面记录了所有Comparator类的集合。Comparators成员用一张Hash表记录Key=Classvalue=WritableComprator的注册信息.这就是它能够充当RawComparator实例工厂的原因!因为它本省的实现中有意个HashMap集合,HashMap<Class,WritableComparator>根据对应的Class,就能返回一个响应的WritableComparator!

    Example:

     1 package cn.roboson.writable;
     2 
     3 import java.io.ByteArrayInputStream;
     4 import java.io.ByteArrayOutputStream;
     5 import java.io.DataInputStream;
     6 import java.io.DataOutputStream;
     7 import java.io.IOException;
     8 
     9 import org.apache.hadoop.io.IntWritable;
    10 import org.apache.hadoop.io.RawComparator;
    11 import org.apache.hadoop.io.Writable;
    12 import org.apache.hadoop.io.WritableComparator;
    13 
    14 /**
    15  * 1.通过WritableComparator获得IntWritable类的RawComparator实例
    16  * 2.通过两种方式来比较
    17  * @author roboson
    18  *
    19  */
    20 
    21 public class ComparableFinish {
    22     
    23     public static void main(String[] args) throws IOException {
    24         
    25         //创建两个IntWritable来比较
    26         IntWritable writable1 = new IntWritable(163);
    27         IntWritable writable2 = new IntWritable(165);
    28         
    29         //获得IntWritable的RawComparator实例
    30         RawComparator<IntWritable> intRawComparator = WritableComparator.get(IntWritable.class);
    31         
    32         //直接比较对象
    33         int value1 =intRawComparator.compare(writable1, writable2);
    34         
    35         if(value1==-1){
    36             System.out.println("writable1<writable2");
    37         }else if(value1==0){
    38             System.out.println("writable1=writable2");
    39         }else{
    40             System.out.println("writable1>writable2");
    41         }
    42         
    43         //序列化两个对象,获得其字节流
    44         byte[] byte1 = serizlize(writable1);
    45         byte[] byte2 = serizlize(writable2);
    46         
    47         //直接通过字符流比较大小
    48         int value2 = intRawComparator.compare(byte1, 0, 4, byte2, 0, 4);
    49         if(value2==-1){
    50             System.out.println("writable1<writable2");
    51         }else if(value2==0){
    52             System.out.println("writable1=writable2");
    53         }else{
    54             System.out.println("writable1>writable2");
    55         }
    56     }
    57     
    58     public static byte[] serizlize(Writable writable) throws IOException{
    59         
    60         //创建一个输出字节流对象
    61         ByteArrayOutputStream out = new ByteArrayOutputStream();
    62         DataOutputStream dataout = new DataOutputStream(out);
    63         
    64         //将结构化数据的对象writable写入到输出字节流。
    65         writable.write(dataout);
    66         return out.toByteArray();
    67     }
    68     
    69     public static byte[] deserizlize(Writable writable,byte[] bytes) throws IOException{
    70         
    71         //创建一个输入字节流对象,将字节数组中的数据,写入到输入流中
    72         ByteArrayInputStream in = new ByteArrayInputStream(bytes);
    73         DataInputStream datain = new DataInputStream(in);
    74         
    75         //将输入流中的字节流数据反序列化
    76         writable.readFields(datain);
    77         return bytes;
    78         
    79     }
    80 }

    运行结果:

    关于序列化方面的知识,可以参考我的博客《Hadoop序列化》地址如下:

    http://www.cnblogs.com/robert-blue/p/4157768.html

    参考博文:

    http://blog.csdn.net/keda8997110/article/details/8518255

    http://www.360doc.com/content/12/0827/09/9318309_232551844.shtml

  • 相关阅读:
    解决ccSvcHst.exe CPU占用超50%的问题,及其缘由
    Windows API一日一练(55)FlushFileBuffers和SetFilePointer函数
    SVD神秘值分解
    Debug目录下没有.exe文件
    OpenStreetMap初探(一)——了解OpenStreetMap
    cocostudio——js 3 final控件事件
    [Android] ImageView.ScaleType设置图解
    c++中sort()及qsort()的使用方法总结
    SVD神秘值分解
    胡na娜、少年和恩师-写在甲午冬的仅仅言片语及感想
  • 原文地址:https://www.cnblogs.com/robert-blue/p/4159434.html
Copyright © 2011-2022 走看看