zoukankan      html  css  js  c++  java
  • 5.3.4 Hadoop序列化框架

      序列化框架

    除了writable实现序列化之外,只要实现让类型和二进制流相互转换,都可以作为hadoop的序列化类型,为此Hadoop提供了一个序列化框架接口,他们在org.apache.hadoop.io.serializer包中,Writable可以作为MapReduce支持的类型也是因为实现了这个框架接口。使用流程是定义序列化类实现框架接口->io.serializations参数配置序列化类名称,用一个逗号隔开的类名列表—> SerializationFactory构造函数中会读取配置,根据反射机制和类名创建序列化对象,保存在队列中—>通过SerializationFactory的函数getSerializer(Class<T> c)获取入序列化对象,入参是要获取对象的类名称。

    在先来看看序列化开接口,以及writable是怎么实现的。

    (1)序列化接口Serializer

    打开流,序列化,关闭流

    public interface Serializer <T>  {

        void open(java.io.OutputStream outputStream) throws java.io.IOException;

        void serialize(T t) throws java.io.IOException;

        void close() throws java.io.IOException;

    }

    (2)反序列化接口:Deserializer

    定义了一组接口,打开流,反序列化,关闭流

    public interface Deserializer <T>  {

        void open(java.io.InputStream inputStream) throws java.io.IOException;

        T deserialize(T t) throws java.io.IOException;

        void close() throws java.io.IOException;

    }

    (3)序列化判断和实例获取接口

    Accept函数判断是否支持序列化,要求是writable的子类,getSerializer函数返回序列化的实例,getDeserializer获取反序列化实例。通过实例调用接口函数去实现序列化函数。

    public interface Serialization <T>  {

        boolean accept(java.lang.Class<?> aClass);

        org.apache.hadoop.io.serializer.Serializer<T> getSerializer(java.lang.Class<T> tClass);

        org.apache.hadoop.io.serializer.Deserializer<T> getDeserializer(java.lang.Class<T> tClass);

    }

    (4)定义序列化类

    Writable定义序列化类要实现上面的三个接口, 实现接口中的序列化和反序列化函数。

    public class WritableSerialization extends Configured

      implements Serialization<Writable> {

      //定义静态反序列化类

      static class WritableDeserializer extends Configured

        implements Deserializer<Writable> {

        private Class<?> writableClass;

        private DataInputStream dataIn;

        //定义构造函数

        public WritableDeserializer(Configuration conf, Class<?> c) {

          setConf(conf);

          this.writableClass = c;

        }

        //打开输入流

        public void open(InputStream in) {

          if (in instanceof DataInputStream) {

            dataIn = (DataInputStream) in;

          } else {

            dataIn = new DataInputStream(in);

          }

        }

        //反序列化函数,读取数据

        public Writable deserialize(Writable w) throws IOException {

          Writable writable;

          if (w == null) {

            writable

              = (Writable) ReflectionUtils.newInstance(writableClass, getConf());

          } else {

            writable = w;

          }

          writable.readFields(dataIn);

          return writable;

        }

     //关闭输入流

        public void close() throws IOException {

          dataIn.close();

        }

       

      }

      序列化类

      static class WritableSerializer implements Serializer<Writable> {

        private DataOutputStream dataOut;

        //打开输出流

        public void open(OutputStream out) {

          if (out instanceof DataOutputStream) {

            dataOut = (DataOutputStream) out;

          } else {

            dataOut = new DataOutputStream(out);

          }

        }

     序列化函数写入数据

        public void serialize(Writable w) throws IOException {

          w.write(dataOut);

        }

     //关闭输出流

        public void close() throws IOException {

          dataOut.close();

        }

      }

     //判断是否是writable的子类

      public boolean accept(Class<?> c) {

        return Writable.class.isAssignableFrom(c);

      }

     //返回反序列化对象

      public Deserializer<Writable> getDeserializer(Class<Writable> c) {

        return new WritableDeserializer(getConf(), c);

      }

     //返回序列化对象

      public Serializer<Writable> getSerializer(Class<Writable> c) {

        return new WritableSerializer();

      }

    }

    (5)序列化工厂

    public class SerializationFactory extends Configured {

        private static final Log LOG = LogFactory.getLog(SerializationFactory.class.getName());

        private List<Serialization<?>> serializations = new ArrayList();

        public SerializationFactory(Configuration conf) {

            super(conf);

    //通过读取配置信息conf中的io.serializations参数来确定Serializations,这个参数是一个逗号隔开的类名列表

            String[] arr$ = conf.getStrings("io.serializations", new String[]{WritableSerialization.class.getName(), //默认包含Writable和Avro

    AvroSpecificSerialization.class.getName(), AvroReflectSerialization.class.getName()});

            int len$ = arr$.length;

            for(int i$ = 0; i$ < len$; ++i$) {

                String serializerName = arr$[i$];

                this.add(conf, serializerName);

            }

        }

    //添加到list的函数

        private void add(Configuration conf, String serializationName) {

            try {

                Class<? extends Serialization> serializionClass = conf.getClassByName(serializationName);

    //根据类名和反射机制创建序列化对象,保存到  serializations的List中

     this.serializations.add((Serialization)ReflectionUtils.newInstance(serializionClass, this.getConf()));

            } catch (ClassNotFoundException var4) {

                LOG.warn("Serialization class not found: ", var4);

            }

        }

        public <T> Serializer<T> getSerializer(Class<T> c) {

            Serialization<T> serializer = this.getSerialization(c);

            return serializer != null ? serializer.getSerializer(c) : null;

        }

        public <T> Deserializer<T> getDeserializer(Class<T> c) {

            Serialization<T> serializer = this.getSerialization(c);

            return serializer != null ? serializer.getDeserializer(c) : null;

        }

        public <T> Serialization<T> getSerialization(Class<T> c) {

            Iterator i$ = this.serializations.iterator();

            Serialization serialization;

            do {

                if (!i$.hasNext()) {

                    return null;

                }

                serialization = (Serialization)i$.next();

            } while(!serialization.accept(c));

            return serialization;

        }

    }

    其他序列化框架对比

    ObjectInput(Out)Stream

    1.无法跨语言。内部私有的协议, 2.序列后的码流太大。java序列化的大小是二进制编码的5倍多!
    3.序列化性能太低。java序列化的性能只有二进制编码的6.17倍。

    google的Protobuf

    1.结构化数据存储格式(xml,json等)
    2.高性能编解码技术
    3.语言和平台无关,扩展性好
    4.支持java,C++,Python三种语言。protostuff也许是最佳选择。protostuff相比于kyro还有一个额外的好处,就是如果序列化之后,反序列化之前这段时间内,java class增加了字段(这在实际业务中是无法避免的事情),kyro就废了。 但是protostuff只要保证新字段添加在类的最后,而且用的是sun系列的JDK, 是可以正常使用的。

    faceBook的Thrift

    1.Thrift支持多种语言(C++,C#,Cocoa,Erlag,Haskell,java,Ocami,Perl,PHP,Python,Ruby,和SmallTalk)
    2.Thrift适用了组建大型数据交换及存储工具,对于大型系统中的内部数据传输,相对于Json和xml在性能上和传输大小上都有明显的优势。
    3.Thrift支持三种比较典型的编码方式。(通用二进制编码,压缩二进制编码,优化的可选字段压缩编解码)

    Kryo

    1.速度快,序列化后体积小
    2.跨语言支持较复杂

    hessian

    1.默认支持跨语言
    2.较慢

    fst

    fst是完全兼容JDK序列化协议的系列化框架,序列化速度大概是JDK的4-10倍,大小是JDK大小的1/3左右。

    Gson

    Gson是目前功能最全的Json解析神器, Gson的应用主要为toJson与fromJson两个转换函数,无依赖,不需要例外额外的jar,能够直接跑在JDK上。而在使用这种对象转换之前需先创建好对象的类型以及其成员才能成功的将JSON字符串成功转换成相对应的对象。类里面只要有get和set方法,Gson完全可以将复杂类型的json到bean或bean到json的转换,是JSON解析的神器。

    FastJson

    Fastjson是一个Java语言编写的高性能的JSON处理器,由阿里巴巴公司开发。无依赖,不需要例外额外的jar,能够直接跑在JDK上。FastJson在复杂类型的Bean转换Json上会出现一些问题,可能会出现引用的类型,导致Json转换出错,需要制定引用。FastJson采用独创的算法,将parse的速度提升到极致,超过所有json库。

    引用:

    https://www.jianshu.com/p/937883b6b2e5

    https://www.helplib.com/Java_API_Classes/article_62465

    https://blog.csdn.net/lipeng_bigdata/article/details/51202764

    自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

    https://www.cnblogs.com/bclshuai/p/11380657.html

  • 相关阅读:
    简单错误记录
    识别有效的IP地址和掩码并进行分类统
    爬虫必备—BeautifulSoup
    爬虫必备—requests
    Shellinabox安装及使用教程
    Django——REST framework
    SaltStack部署
    使用js在HTML中自定义字符串格式化方法
    3种上传图片并实现预览的方法
    Ajax
  • 原文地址:https://www.cnblogs.com/bclshuai/p/11795911.html
Copyright © 2011-2022 走看看