zoukankan      html  css  js  c++  java
  • Hadoop中Writable类之四

    1.定制Writable类型

    Hadoop中有一套Writable实现,例如:IntWritable、Text等,但是,有时候可能并不能满足自己的需求,这个时候,就需要自己定制Writable类型。

    定制分以下几步:

    • 需要实现WritableComparable接口,因为Writable常常作为健值对出现,而在MapReduce中,中间有个排序很重要,因此,Hadoop中就让Writable实现了WritableComparable
    • 需要实现WritableComparable的write()、readFields()、compareTo()方法
    • 需要重写java.lang.Object中的hashCode()、equals()、toString()方法。

    由于hashCode()方法对reduce分区很重要,所以,需要重写java.lang.Object的hashCode()方法

    如果要结合使用TextOutputFormat和定制的Writable,则许重写java.lang.Object的toString()方法

     

    Example:

    CreateWritable.java

      1 package cn.roboson.writable;
      2 
      3 import java.io.DataInput;
      4 import java.io.DataOutput;
      5 import java.io.IOException;
      6 
      7 import org.apache.hadoop.io.Text;
      8 import org.apache.hadoop.io.WritableComparable;
      9 /**
     10  * 1.定制一个Writable类型,里面包含两个Text
     11  * 2.和IntWritable等原有的Writable类似,它需要实现WritableComparable
     12  * 3.由于hashCode()方法对reduce分区很重要,所以,需要重写java.lang.Object的hashCode()方法
     13  * 4.如果要结合使用TextOutputFormat和定制的Writable,则许重写java.lang.Object的toString()方法
     14  * @author roboson
     15  *
     16  */
     17 
     18 public class CreateWritable implements WritableComparable<CreateWritable>{
     19     
     20     private Text first;
     21     private Text second;
     22     
     23     //构造方法,这个是必须要的
     24     public CreateWritable(){
     25         
     26     }
     27     
     28     public CreateWritable(String first,String second){
     29         set(new Text(first), new Text(second));
     30     }
     31     
     32     public CreateWritable(Text first,Text second){
     33         set(first,second);
     34     }
     35     
     36     
     37     
     38     public Text getFirst() {
     39         return first;
     40     }
     41 
     42     public void setFirst(Text first) {
     43         this.first = first;
     44     }
     45 
     46     public Text getSecond() {
     47         return second;
     48     }
     49 
     50     public void setSecond(Text second) {
     51         this.second = second;
     52     }
     53 
     54     public void set(Text first,Text second){
     55         this.first=first;
     56         this.second=second;
     57     }
     58     
     59 
     60     @Override
     61     public void readFields(DataInput in) throws IOException {
     62         // TODO Auto-generated method stub
     63         first.readFields(in);
     64         second.readFields(in);
     65     }
     66 
     67     @Override
     68     public void write(DataOutput out) throws IOException {
     69         // TODO Auto-generated method stub
     70         first.write(out);
     71         second.write(out);
     72     }
     73 
     74     @Override
     75     public int compareTo(CreateWritable other) {
     76         // TODO Auto-generated method stub
     77         int cmp=first.compareTo(other.getFirst());
     78         if(cmp!=0){
     79             return cmp;
     80         }
     81         return second.compareTo(other.getSecond());
     82     }
     83     
     84     @Override
     85     public int hashCode() {
     86         // TODO Auto-generated method stub
     87         return first.hashCode()*163 + second.hashCode();
     88     }
     89     
     90     @Override
     91     public String toString() {
     92         // TODO Auto-generated method stub
     93         return first+"	"+second;
     94     }
     95     
     96     @Override
     97     public boolean equals(Object obj) {
     98         // TODO Auto-generated method stub
     99         if(obj instanceof CreateWritable){
    100             CreateWritable create = (CreateWritable) obj;
    101             return first.equals(create.getFirst()) && second.equals(create.getSecond());
    102         }
    103         return false;
    104     }
    105 
    106 }

     

    Writable06.java

     1 package cn.roboson.writable;
     2 
     3 import org.apache.hadoop.io.Text;
     4 
     5 public class Writable06 {
     6     
     7     public static void main(String[] args) {
     8         CreateWritable createWritable01 = new CreateWritable("Hadoop", "roboson");
     9         CreateWritable createWritable02 = new CreateWritable(new Text("Hadoop"), new Text("roboson"));
    10         
    11         //重写了equals()方法,相比叫first 和 second两者都相同的时候,返回true
    12         System.out.println(createWritable01.equals(createWritable02));
    13         
    14         //重写了compareTo()方法,先比较first,再比较second,返回0:相等 -1:first<second 1:first>second
    15         System.out.println(createWritable01.compareTo(createWritable02));
    16         
    17         //重写了toString()方法,返回 first +"	"+second
    18         System.out.println(createWritable01.toString());
    19     }
    20 }

     

    运行结果:

     

    2.为速度实现一个RawComparator

    关于Comparator方面的知识,可以参考我的博文《Hadoop中WritableComparable 和 comparator》,Comparator中,有一个方法是compare(byte[] b1,int s1,int l1,byte[] b2,int s2, int l2),该方法直接比较的是序列化,不必先将序列化数据流转化为对象,再进行比较,所以,与上面的compareTo()方法,相比,其更高效!其实,我们查看HadoopAPI,就可以发现,基本类型的Writable类都实现有了Comparator作为其内部类:

    好,再看看IntWritable.Comparator这个类,如下图所示,发现它是一个静态类,好好观察它的结构:

     

    通过上面的,我们可以发现,要为一个Writable实现一个Comparator,安装Hadoop的格式来,需要注意以下几点:

    • 将其在内部实现,作为内部类
    • 是一个静态的内部类
    • 注册,以便可以通过WritableComparator直接创建

     至于注册方面的知识,可以查看我的博客《Hadoop中Comparator原理

    Example:给上面自定义的CreateWritable类实现一个Comparator,也就是CreateWritable.Comparator类

    CreateWritable.java

      1 package cn.roboson.writable;
      2 
      3 import java.io.DataInput;
      4 import java.io.DataOutput;
      5 import java.io.IOException;
      6 import java.util.Comparator;
      7 
      8 import org.apache.hadoop.io.Text;
      9 import org.apache.hadoop.io.WritableComparable;
     10 import org.apache.hadoop.io.WritableComparator;
     11 import org.apache.hadoop.io.WritableUtils;
     12 /**
     13  * 1.给定制的CreateWritable类实现一个Comparator,CreateWritable.Comparator
     14  * @author roboson
     15  *
     16  */
     17 
     18 public class CreateWritable implements WritableComparable<CreateWritable>{
     19     
     20     private Text first;
     21     private Text second;
     22     
     23     //构造方法,这个是必须要的
     24     public CreateWritable(){
     25         
     26     }
     27     
     28     public CreateWritable(String first,String second){
     29         set(new Text(first), new Text(second));
     30     }
     31     
     32     public CreateWritable(Text first,Text second){
     33         set(first,second);
     34     }
     35     
     36     
     37     
     38     public Text getFirst() {
     39         return first;
     40     }
     41 
     42     public void setFirst(Text first) {
     43         this.first = first;
     44     }
     45 
     46     public Text getSecond() {
     47         return second;
     48     }
     49 
     50     public void setSecond(Text second) {
     51         this.second = second;
     52     }
     53 
     54     public void set(Text first,Text second){
     55         this.first=first;
     56         this.second=second;
     57     }
     58     
     59 
     60     @Override
     61     public void readFields(DataInput in) throws IOException {
     62         // TODO Auto-generated method stub
     63         first.readFields(in);
     64         second.readFields(in);
     65     }
     66 
     67     @Override
     68     public void write(DataOutput out) throws IOException {
     69         // TODO Auto-generated method stub
     70         System.out.println(first);
     71         first.write(out);
     72         second.write(out);
     73     }
     74 
     75     @Override
     76     public int compareTo(CreateWritable other) {
     77         // TODO Auto-generated method stub
     78         int cmp=first.compareTo(other.getFirst());
     79         if(cmp!=0){
     80             return cmp;
     81         }
     82         return second.compareTo(other.getSecond());
     83     }
     84     
     85     @Override
     86     public int hashCode() {
     87         // TODO Auto-generated method stub
     88         return first.hashCode()*163 + second.hashCode();
     89     }
     90     
     91     @Override
     92     public String toString() {
     93         // TODO Auto-generated method stub
     94         return first+"	"+second;
     95     }
     96     
     97     @Override
     98     public boolean equals(Object obj) {
     99         // TODO Auto-generated method stub
    100         if(obj instanceof CreateWritable){
    101             CreateWritable create = (CreateWritable) obj;
    102             return first.equals(create.getFirst()) && second.equals(create.getSecond());
    103         }
    104         return false;
    105     }
    106     
    107     
    108     //创建静态内部类:CreateWritable.Comparator
    109     public static class Comparator extends WritableComparator{
    110 
    111         public Comparator() {
    112             super(CreateWritable.class);
    113             // TODO Auto-generated constructor stub
    114         }
    115         private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
    116         
    117         @Override
    118         public int compare(byte[] b1, int s1, int l1, byte[] b2,
    119                 int s2, int l2) {
    120             // TODO Auto-generated method stub
    121             int firstL1 = 0,firstL2 = 0;
    122             try {
    123                 firstL1 = WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1, s1);
    124                 firstL2 = WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2, s2);
    125                 int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
    126                 if(cmp !=0){
    127                     return cmp;
    128                 }
    129             } catch (IOException e) {
    130                 // TODO Auto-generated catch block
    131                 e.printStackTrace();
    132             }
    133             return TEXT_COMPARATOR.compare(b1, s1+firstL1, l1-firstL1, b2, s2+firstL2, l2-firstL2);
    134         }
    135         
    136     }
    137     static{
    138         WritableComparator.define(CreateWritable.class,new Comparator());
    139     }
    140 }

     

    Writable07.java

     1 package cn.roboson.writable;
     2 
     3 import java.io.ByteArrayOutputStream;
     4 import java.io.DataOutputStream;
     5 import java.io.IOException;
     6 
     7 import org.apache.hadoop.io.RawComparator;
     8 import org.apache.hadoop.io.Text;
     9 import org.apache.hadoop.io.Writable;
    10 import org.apache.hadoop.io.WritableComparator;
    11 
    12 public class Writable07 {
    13     
    14     public static void main(String[] args) throws IOException {
    15         
    16         CreateWritable create01 = new CreateWritable(new Text("Hadoop"), new Text("1"));
    17         CreateWritable create02 = new CreateWritable(new Text("Hadoop"), new Text("2"));
    18         byte[] b1 = serialize(create01);
    19         byte[] b2 = serialize(create02);
    20         RawComparator<CreateWritable> comparator = WritableComparator.get(CreateWritable.class);
    21         int cmp =comparator.compare(b1, 0, b1.length, b2, 0, b2.length);
    22         if(cmp ==0){
    23             System.out.println("create01 == create02");
    24         }else if(cmp ==-1){
    25             System.out.println("create01<create02");
    26         }else{
    27             System.out.println("create01>create02");
    28         }
    29     }
    30     
    31     public static byte[] serialize(Writable writable) throws IOException{
    32         ByteArrayOutputStream out = new ByteArrayOutputStream();
    33         DataOutputStream dataOut = new DataOutputStream(out);
    34         writable.write(dataOut);
    35         return out.toByteArray();
    36         
    37     }
    38 }

     

    运行结果:

    分析:

    在上面的CreateWritable的内部类Comparator中,需要实现一个方法compare()方法,那么compare该如何实现,才能够对序列化数据流进行比较。

    CreateWritable是由两个Text对象组成的(Text first, Text second),而Text对象的二进制表示,是一个长度可变的整数,包含字符串之UTF-8表示的字节数以及UTF-8字节本身。诀窍在于读取该对象的起始长度,由此得知第一个Text对象的字节表示有多长;然后将该长度传递给Text对象的RawComparator方法,最后通过计算第一个字符串和第二个字符串恰当的偏移量,这样便可以实现对象的比较。

    也就是说Text对象序列化后,也是由两部分组成,一部分是记录本身有多少个字节,另一部分就是它自己的字节数!

    记录它本身有多少个字节所占用的字节长度:WritableUtils.decodeVintSize()方法返回一个整数,代表它的字节长度。

    它自己本身的字节数:WritableComparator的readVInt()方法返回一个整数,代表它本身的字节长度。

    这样就方便了,相比叫first的序列化数据流,根据结果,判断,看是否需要再比较second的序列化数据流。

     

  • 相关阅读:
    EasyHook库系列使用教程之四钩子的启动与停止
    inputclean插件的使用方法
    机器学习(十三)——机器学习中的矩阵方法(3)病态矩阵、协同过滤的ALS算法(1)
    &lt;LeetCode OJ&gt; 204. Count Primes
    leetcode 235: Lowest Common Ancestor of a Binary Search Tree
    数据结构经常使用算法
    调侃物联网开源框架,我们什么时候也来开源一个?
    字符编码简单介绍
    PriorityQueue ,ArrayList , 数组排序
    从struts2.1开始Convention零配置
  • 原文地址:https://www.cnblogs.com/robert-blue/p/4165602.html
Copyright © 2011-2022 走看看