zoukankan      html  css  js  c++  java
  • Mapreduce之序列化框架(转自http://blog.csdn.net/lastsweetop/article/details/9376495)

    框架简介

    MapReduce仅仅可以支持Writable做key,value吗?答案是否定的。事实上,一切类型都是支持的,只需满足一个小小的条件:每个类型是以二进制流的形式传输。为此Hadoop提供了一个序列化框架来支持,他们在org.apache.hadoop.io.serializer包中,Writable可以作为MapReduce支持的类型也是因为实现了这个框架,类不多,我们从几个接口说起。

    Serializer

    定义了一组接口,打开流,序列化,关闭流
    1. public interface Serializer <T>  {  
    2.     void open(java.io.OutputStream outputStream) throws java.io.IOException;  
    3.   
    4.     void serialize(T t) throws java.io.IOException;  
    5.   
    6.     void close() throws java.io.IOException;  
    7. }  

    Deserializer

    定义了一组接口,打开流,反序列化,关闭流
    1. public interface Deserializer <T>  {  
    2.     void open(java.io.InputStream inputStream) throws java.io.IOException;  
    3.   
    4.     T deserialize(T t) throws java.io.IOException;  
    5.   
    6.     void close() throws java.io.IOException;  
    7. }  

    Serialization

    定义了一组接口,判断是否支持输入的类,根据输入的类给出序列化接口和反序列化接口
    1. public interface Serialization <T>  {  
    2.     boolean accept(java.lang.Class<?> aClass);  
    3.   
    4.     org.apache.hadoop.io.serializer.Serializer<T> getSerializer(java.lang.Class<T> tClass);  
    5.   
    6.     org.apache.hadoop.io.serializer.Deserializer<T> getDeserializer(java.lang.Class<T> tClass);  
    7. }  

    WritableSerialization

    如果你想自己定义一个类似Writable这样的框架,那么你首先需要的就是实现上面三个接口,那么我们先来看下Writable是如何实现的。
    1. <pre style="word-wrap: break-word; white-space: pre-wrap; ">public class WritableSerialization extends Configured   
    2.   implements Serialization<Writable> {  
    3.     
    4.   static class WritableDeserializer extends Configured   
    5.     implements Deserializer<Writable> {  
    6.   
    7.     private Class<?> writableClass;  
    8.     private DataInputStream dataIn;  
    9.       
    10.     public WritableDeserializer(Configuration conf, Class<?> c) {  
    11.       setConf(conf);  
    12.       this.writableClass = c;  
    13.     }  
    14.       
    15.     public void open(InputStream in) {  
    16.       if (in instanceof DataInputStream) {  
    17.         dataIn = (DataInputStream) in;  
    18.       } else {  
    19.         dataIn = new DataInputStream(in);  
    20.       }  
    21.     }  
    22.       
    23.     public Writable deserialize(Writable w) throws IOException {  
    24.       Writable writable;  
    25.       if (w == null) {  
    26.         writable   
    27.           = (Writable) ReflectionUtils.newInstance(writableClass, getConf());  
    28.       } else {  
    29.         writable = w;  
    30.       }  
    31.       writable.readFields(dataIn);  
    32.       return writable;  
    33.     }  
    34.   
    35.     public void close() throws IOException {  
    36.       dataIn.close();  
    37.     }  
    38.       
    39.   }  
    40.     
    41.   static class WritableSerializer implements Serializer<Writable> {  
    42.   
    43.     private DataOutputStream dataOut;  
    44.       
    45.     public void open(OutputStream out) {  
    46.       if (out instanceof DataOutputStream) {  
    47.         dataOut = (DataOutputStream) out;  
    48.       } else {  
    49.         dataOut = new DataOutputStream(out);  
    50.       }  
    51.     }  
    52.   
    53.     public void serialize(Writable w) throws IOException {  
    54.       w.write(dataOut);  
    55.     }  
    56.   
    57.     public void close() throws IOException {  
    58.       dataOut.close();  
    59.     }  
    60.   
    61.   }  
    62.   
    63.   public boolean accept(Class<?> c) {  
    64.     return Writable.class.isAssignableFrom(c);  
    65.   }  
    66.   
    67.   public Deserializer<Writable> getDeserializer(Class<Writable> c) {  
    68.     return new WritableDeserializer(getConf(), c);  
    69.   }  
    70.   
    71.   public Serializer<Writable> getSerializer(Class<Writable> c) {  
    72.     return new WritableSerializer();  
    73.   }  
    74.   
    75. }</pre>  
    两个内部静态类分别实现Serializer和Deserializer接口,然后getSerializer和getDeserializer分别实例化WritableSerializer和WritableDeserializer,
    accept方法仅仅是判断输入类是否是Writable的子类。
    通过io.serializations指定已实现Serialization,各个类之间通过逗号隔开,默认的Serialization有WritableSerialization和Avro中Serialization,
    这也就是说默认情况下,只有Writable和Avro里的对象可以在MapReduce中使用。
    那么你可能有疑问了,hadoop是如何知道一个类该交给哪个Serialization呢,答案也在这个包中,请看

    SerializationFactory

    先看他的构造器
    1. public SerializationFactory(Configuration conf) {  
    2.     super(conf);  
    3.     for (String serializerName : conf.getStrings("io.serializations",   
    4.       new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"})) {  
    5.       add(conf, serializerName);  
    6.     }  
    7.   }  
    可知他是从io.serializations属性指定的实现了Serialization的类,然后再看他是如何知道选哪个Serialization的
    1. public <T> Serialization<T> getSerialization(Class<T> c) {  
    2.    for (Serialization serialization : serializations) {  
    3.      if (serialization.accept(c)) {  
    4.        return (Serialization<T>) serialization;  
    5.      }  
    6.    }  
    7.    return null;  
    8.  }  
    好吧,就是这么简单,判断一下是否是对应的子类而已。
    这个包里还实现了JavaSerialization,其实就是Java对象的序列化,很多人觉得,这个好简单的,我只要实现java中的序列化接口就可以了,
    不用那么费事搞什么Writable和Avro,但是,千万别这么想,非常不推荐使用java对象的序列化,并且详尽的解释为什么不推荐:

    为什么不使用java序列化

    1.java序列化不够灵活,为了更好的控制序列化的整个流程所以使用Writable

    2.java序列化不符合序列化的标准,没有做一定的压缩,java序列化首先写类名,然后再是整个类的数据,而且成员对象在序列化中只存引用,成员对象的可以出现的位置很随机,既可以在序列化的对象前,也可以在其后面,这样就对随机访问造成影响,一旦出错,整个后面的序列化就会全部错误,但是
    Writable完美的弥补了这一点,因为Writable中每一条纪录间是相互独立的
    3.Java序列化每次序列化都要重新创建对象,内存消耗大,而Writable是可以重用的。

    序列化IDL

    为了和其他语言交互,必须定义序列化的IDL,原先定义的IDL在org.apache.hadoop.record包里,但是后来一直没用起来就淘汰掉了,现在比较常用的就是Avro,后面我们会重点着墨讲解。
    Apache的Thrift和Google的Protocol Buffer也是比较流行的序列化框架,但是在Hadoop里使用是有限的,只用于RPC和数据交互,不过有一个开源项目elephant-bird可以把他们使用在MapReduce上。
  • 相关阅读:
    virtualBox下面安装linux系统如何共享目录
    PHP中spl_autoload_register()函数
    PHP 5.5 新特性
    useradd密码无效
    Linux audit安全审计工具
    Javascript class获取回调函数数据
    RPi 3B 无线连接配置
    Refused to execute inline event handler because it violates the following Content Security Policy directive: "xxx". Either the 'unsafe-inline' keyword, a hash ('sha256-...'), or a nonce ('nonce-...')
    options.html:1 Refused to load the script 'xxxx' because it violates the following Content Security Policy directive: "script-src 'self' blob: filesystem: chrome-extension-resource:".
    jQuery.Deferred exception: $.get is not a function TypeError: $.get is not a function
  • 原文地址:https://www.cnblogs.com/xuepei/p/3665482.html
Copyright © 2011-2022 走看看