org.apache.zookeeper.ClientCnxn.java
该类管理客户端到HBase集群的socket I/O
所有需要通过socket发送的消息全部打包成Packet对象,然后放到ClentCnxn的outgoingQueue(LinkedList<Packet>)中,对outgoingQueue的操作需要同步控制。需要接受的消息也会被打包成Packet对象,放入pendingQueue(LinkedList<Packet>)中,等待一个答复。下面介绍一下设计的其他类及各类的使用方法及处理逻辑。
(一)Packet.java
static class Packet { RequestHeader header; ByteBuffer bb; /** Client's view of the path (may differ due to chroot) **/ String clientPath; /** Servers's view of the path (may differ due to chroot) **/ String serverPath; ReplyHeader replyHeader; Record request; Record response; boolean finished; AsyncCallback cb; Object ctx; WatchRegistration watchRegistration; Packet(RequestHeader header, ReplyHeader replyHeader, Record record, Record response, ByteBuffer bb, WatchRegistration watchRegistration) { this.header = header; this.replyHeader = replyHeader; this.request = record; this.response = response; if (bb != null) { this.bb = bb; } else { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive .getArchive(baos); boa.writeInt(-1, "len"); // We'll fill this in later header.serialize(boa, "header"); if (record != null) { record.serialize(boa, "request"); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); } } this.watchRegistration = watchRegistration; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("clientPath:" + clientPath); sb.append(" serverPath:" + serverPath); sb.append(" finished:" + finished); sb.append(" header:: " + header); sb.append(" replyHeader:: " + replyHeader); sb.append(" request:: " + request); sb.append(" response:: " + response); // jute toString is horrible, remove unnecessary newlines return sb.toString().replaceAll("\r*\n+", " "); } }
该类含多个成员变量,主要的逻辑集中在构造函数中,若传入的字节数组(bb)内容非空,就将输入的"请求头(RequestHeader)"和“请求内容”(如:ExistRequest)转换成字节数组放入ByteBuffer中。其toString()方法将所有的字段输出,并最后替换掉回车与换行之间的部分成空格。
1)RequestHeader.java
在RequestHeader中有两个整型变量,Xid和type,type标志着操作类型,如:ping,auth,详见OpCode接口中的静态常量。
Xid是一个操作序号,针对非ping和auth的操作,对于特定的ClientCnxn每次都会将该序号加1,然后传递给RequestHeader。
2)ReplyHeader extends record
在PendingQueue接收返回数据,
3)Request extends record
记录请求内容
4)Response extends record
记录回应内容