zoukankan      html  css  js  c++  java
  • Zookeeper的RPC框架

    org.apache.zookeeper.ClientCnxn.java

    该类管理客户端到HBase集群的socket I/O

    所有需要通过socket发送的消息全部打包成Packet对象,然后放到ClentCnxn的outgoingQueue(LinkedList<Packet>)中,对outgoingQueue的操作需要同步控制。需要接受的消息也会被打包成Packet对象,放入pendingQueue(LinkedList<Packet>)中,等待一个答复。下面介绍一下设计的其他类及各类的使用方法及处理逻辑。

    (一)Packet.java

    static class Packet {
            RequestHeader header;
    
            ByteBuffer bb;
    
            /** Client's view of the path (may differ due to chroot) **/
            String clientPath;
            /** Servers's view of the path (may differ due to chroot) **/
            String serverPath;
    
            ReplyHeader replyHeader;
    
            Record request;
    
            Record response;
    
            boolean finished;
    
            AsyncCallback cb;
    
            Object ctx;
    
            WatchRegistration watchRegistration;
    
            Packet(RequestHeader header, ReplyHeader replyHeader, Record record,
                    Record response, ByteBuffer bb,
                    WatchRegistration watchRegistration) {
                this.header = header;
                this.replyHeader = replyHeader;
                this.request = record;
                this.response = response;
                if (bb != null) {
                    this.bb = bb;
                } else {
                    try {
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        BinaryOutputArchive boa = BinaryOutputArchive
                                .getArchive(baos);
                        boa.writeInt(-1, "len"); // We'll fill this in later
                        header.serialize(boa, "header");
                        if (record != null) {
                            record.serialize(boa, "request");
                        }
                        baos.close();
                        this.bb = ByteBuffer.wrap(baos.toByteArray());
                        this.bb.putInt(this.bb.capacity() - 4);
                        this.bb.rewind();
                    } catch (IOException e) {
                        LOG.warn("Ignoring unexpected exception", e);
                    }
                }
                this.watchRegistration = watchRegistration;
            }
    
            @Override
            public String toString() {
                StringBuilder sb = new StringBuilder();
    
                sb.append("clientPath:" + clientPath);
                sb.append(" serverPath:" + serverPath);
                sb.append(" finished:" + finished);
    
                sb.append(" header:: " + header);
                sb.append(" replyHeader:: " + replyHeader);
                sb.append(" request:: " + request);
                sb.append(" response:: " + response);
    
                // jute toString is horrible, remove unnecessary newlines
                return sb.toString().replaceAll("\r*\n+", " ");
            }
        }
    

      

     

     该类含多个成员变量,主要的逻辑集中在构造函数中,若传入的字节数组(bb)内容非空,就将输入的"请求头(RequestHeader)"和“请求内容”(如:ExistRequest)转换成字节数组放入ByteBuffer中。其toString()方法将所有的字段输出,并最后替换掉回车与换行之间的部分成空格。

    1)RequestHeader.java

    在RequestHeader中有两个整型变量,Xid和type,type标志着操作类型,如:ping,auth,详见OpCode接口中的静态常量。

    Xid是一个操作序号,针对非ping和auth的操作,对于特定的ClientCnxn每次都会将该序号加1,然后传递给RequestHeader。

    2)ReplyHeader extends record

     在PendingQueue接收返回数据,

    3)Request extends record

    记录请求内容

    4)Response extends record

     记录回应内容

     

  • 相关阅读:
    中文字体在CSS中的表达方式
    图片上传+预览+剪切解决方案我们到底能走多远系列(20)
    C# — 饼形图之插入介绍文字
    CSS 网页布局中文排版的9则技巧
    Android UI 优化 使用<include/>和 <merge />标签
    SQLite 的日期时间函数
    GSM、GPRS、EDGE、2G、3G与WAP的关系
    WPF中的Style(风格,样式)
    给力分享新的ORM => Dapper
    WCF开发框架形成之旅如何实现X509证书加密
  • 原文地址:https://www.cnblogs.com/legendary/p/2717266.html
Copyright © 2011-2022 走看看