zoukankan      html  css  js  c++  java
  • 自定义Writable类型

      Hadoop的MapReduce计算模型的Key,Value值都是采用的自定义Writable类型,我们也可以自定义Writable来实现不同的业务需求。

      1.定义:

        a.基于DataInput和DataOutput实现简单,高效,序列协议的可序列化对象;

        b.在hadoop的MapReduce计算编程模型中,必须实现Writable接口

        c.实现通常实现静态read(DataInput)方法,该方法构造一个新的实例,调用readFields(DataInput)并返回实例。

      2.接口说明

        public interface Writable {

          /**
          * 序列化对象的field(字段)到DataOutput中
          * (序列化过程)
          */
          void write(DataOutput out) throws IOException;

          /**
          * 从DataInput中将对象的field(字段)反序列化
          *(反序列化的过程)
          */
          void readFields(DataInput in) throws IOException;
        }

      与java的区别:

        java序列户过程是面向流的;hadoop中Writable序列化面向对象的;

      3.序列化过程

        

        public static byte[] serialize(Writable writable) throws IOException {
          ByteArrayOutputStream out = new ByteArrayOutputStream();
          DataOutputStream dataOut = new DataOutputStream(out);
          writable.write(dataOut);
          dataOut.close();

        }

    4.反序列化过程:
      public static byte[] deserialize(Writable writable, byte[] bytes)throws IOException {
        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
        DataInputStream dataIn = new DataInputStream(in);
        writable.readFields(dataIn);
        dataIn.close();
        return bytes;
      }

    分析序列化和反序列化的过程
    ------------------------------------------------
    0.结合MapWritable、IntWritable和Text跟踪底层源码
    1.构建MapWritable:
    MapWritable _map = new MapWritable();
    _map.put(new IntWritable(12), new Text("zhangsan"));
    _map.put(new IntWritable(130), new Text("lisi"));
    _map.put(new IntWritable(16), new Text("hanmeimei"));
    _map.put(new IntWritable(7778878), new Text("lilei"));
    2.序列化mapWritable对象,调用serialize(Writable writable)
    //序列化过程,调用writable的write()
    byte[] tt = serialize(_map);
    System.out.println(tt.length);
    说明:【MapWritable.class】-->write(DataOutput out)方法
    public void write(DataOutput out) throws IOException {
    super.write(out); 【说明:调用AbstractMapWritable.class的write()方法:将newclass添加至out中】
    // Write out the number of entries in the map
    out.writeInt(instance.size()); 【说明:将mapwritable对象的个数写入到out中】
    // Then write out each key/value pair 【说明:循环遍历,写入顺序为:KeyClassID+key+ValueClassID+value】
    for (Map.Entry<Writable, Writable> e: instance.entrySet()) {
    out.writeByte(getId(e.getKey().getClass()));
    e.getKey().write(out);
    out.writeByte(getId(e.getValue().getClass()));
    e.getValue().write(out);
    }
    }
    3.反序列化,定义新的mapWritable对象,调用deserialize(Writable writable, byte[] bytes)
    MapWritable map2 = new MapWritable();
    deserialize(map2, tt);
    System.out.println(map2.size());
    说明:【MapWritable.class】-->readFields(DataInput in)方法
    public void readFields(DataInput in) throws IOException {
    super.readFields(in); 【说明:调用AbstractMapWritable.class的readFields()方法:获取newclass的类型】
    // First clear the map. Otherwise we will just accumulate
    // entries every time this method is called.
    this.instance.clear(); 【说明:清空map对象中的数据】
    // Read the number of entries in the map
    int entries = in.readInt(); 【说明:通过in读取一个int(整型位)获得序列化中数据的个数】
    // Then read each key/value pair 【说明:循环遍历个数,利用反射机制获取keyclass和valueclass,并通过Writable的readFields方法读取key值和value值,并将其写入至instance(当前MapWritable对象中)】
    for (int i = 0; i < entries; i++) {
    Writable key = (Writable) ReflectionUtils.newInstance(getClass(in.readByte()), getConf());
    key.readFields(in);
    Writable value = (Writable) ReflectionUtils.newInstance(getClass(in.readByte()), getConf());
    value.readFields(in);
    instance.put(key, value);
    }
    }

      

  • 相关阅读:
    转:同步、异步、阻塞和非阻塞
    转:回调函数
    转:同步/异步与阻塞/非阻塞的区别
    转:Socket在阻塞模式下的信息收发和文件接收
    转:直接用socket实现HTTP协议
    链接错误 LINK : fatal error LNK1104: 无法打开文件“XX.obj”
    转:MFC中常用类,宏,函数介绍
    转:线程同步技术剖析
    转:线程同步
    转:C++回调函数用法
  • 原文地址:https://www.cnblogs.com/lyr999736/p/9381637.html
Copyright © 2011-2022 走看看