zoukankan      html  css  js  c++  java
  • Zookeeper 源码(二)序列化组件 Jute

    Zookeeper 源码(二)序列化组件 Jute

    一、序列化组件 Jute

    对于一个网络通信,首先需要解决的就是对数据的序列化和反序列化处理,在 ZooKeeper 中,使用了Jute 这一序列化组件来进行数据的序列化和反序列化操作。同时,为了实现一个高效的网络通信程序,良好的通信协议设计也是至关重要的。Zookeeper 团队曾想过将 Jute 替换成 Apache 的 Avro 或是 Google 的 protobuf 但是考虑到新老版本序列化组件的兼容性和当前 Jute 的序列化能力并不是 ZooKeeper 的性能瓶颈。

    import org.apache.jute.*;
    import org.apache.zookeeper.server.ByteBufferInputStream;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.nio.ByteBuffer;
    
    public class JuteHeader implements Record {
    
        private long sessionId;
        private String type;
    
        public JuteHeader() {
        }
    
        public JuteHeader(long sessionId, String type) {
            this.sessionId = sessionId;
            this.type = type;
        }
    
        @Override
        public void serialize(OutputArchive outputArchive, String tag) throws IOException {
            outputArchive.startRecord(this, tag);
            outputArchive.writeLong(sessionId, "sessionId");
            outputArchive.writeString(type, "type");
            outputArchive.endRecord(this, tag);
        }
    
        @Override
        public void deserialize(InputArchive inputArchive, String tag) throws IOException {
            inputArchive.startRecord(tag);
            sessionId = inputArchive.readLong("sessionId");
            type = inputArchive.readString("type");
            inputArchive.endRecord(tag);
        }
    
        public static void main(String[] args) throws IOException {
    
            final String tag = "header";
            // 序列化
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
            new JuteHeader(100001L, "xxx").serialize(boa, tag);
    
            // 包装成 ByteBuffer 用于网络传输
            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
    
            // 反序列化
            ByteBufferInputStream bbis = new ByteBufferInputStream(bb);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            JuteHeader header = new JuteHeader();
            header.deserialize(bbia, tag);
    
            baos.close();
            bbis.close();
        }
    
    }
    

    二、Zookeeper 通信协议

    基于 TCP/IP 协议,ZooKeeper 实现了自己的通信协议来完成客户端与服务端、服务端与服务端之间的网络通信。ZooKeeper 通信协议整体上的设计非常简单,对于请求,主要包含请求头和请求体,而对于晌应,则主要包含响应头和响应体,

    Zookeeper 通信协议

    协议的请求头组成如下:

    协议的请求头

    Packet 类中的 createBB() 将 packet 对象序列化:

    public void createBB() {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
            boa.writeInt(-1, "len"); // We'll fill this in later
            if (requestHeader != null) {
                requestHeader.serialize(boa, "header");
            }
            if (request instanceof ConnectRequest) {
                request.serialize(boa, "connect");
                // append "am-I-allowed-to-be-readonly" flag
                boa.writeBool(readOnly, "readOnly");
            } else if (request != null) {
                request.serialize(boa, "request");
            }
            baos.close();
            this.bb = ByteBuffer.wrap(baos.toByteArray());
            this.bb.putInt(this.bb.capacity() - 4);
            this.bb.rewind();
        } catch (IOException e) {
            LOG.warn("Ignoring unexpected exception", e);
        }
    }
    

    请求头:

    public class RequestHeader implements Record {
        private int xid;
        private int type;
    
        public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
            a_.startRecord(this,tag);
            a_.writeInt(xid,"xid");
            a_.writeInt(type,"type");
            a_.endRecord(this,tag);
        }
    
        public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
            a_.startRecord(tag);
            xid=a_.readInt("xid");
            type=a_.readInt("type");
            a_.endRecord(tag);
        }
    }
    

    请求体有多种,如 ConnectRequest、CreateRequest ...:

    public class ConnectRequest implements Record {
        private int protocolVersion;
        private long lastZxidSeen;
        private int timeOut;
        private long sessionId;
        private byte[] passwd;
    
        public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
            a_.startRecord(this,tag);
            a_.writeInt(protocolVersion,"protocolVersion");
            a_.writeLong(lastZxidSeen,"lastZxidSeen");
            a_.writeInt(timeOut,"timeOut");
            a_.writeLong(sessionId,"sessionId");
            a_.writeBuffer(passwd,"passwd");
            a_.endRecord(this,tag);
        }
        public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
            a_.startRecord(tag);
            protocolVersion=a_.readInt("protocolVersion");
            lastZxidSeen=a_.readLong("lastZxidSeen");
            timeOut=a_.readInt("timeOut");
            sessionId=a_.readLong("sessionId");
            passwd=a_.readBuffer("passwd");
            a_.endRecord(tag);
        }
    }
    

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    harbor无法登陆解决
    k8s中使用harbor
    harbor扩容
    harbor设置开机自启
    设置开机自启
    关Java的内存模型(JMM)
    多线程相关概念
    多线程(JDK1.5的新特性互斥锁)
    synchronized关键字
    【异常】redis.clients.jedis.exceptions.JedisDataException: ERR unknown command 'PSETEX'
  • 原文地址:https://www.cnblogs.com/binarylei/p/9927057.html
Copyright © 2011-2022 走看看