zoukankan      html  css  js  c++  java
  • ZooKeeper源码阅读(二):客户端

    源代码:

    http://svn.apache.org/repos/asf/zookeeper/trunk/

    导入eclipse:

    在包含build.xml目录下执行ant eclipse将产生.classpath文件

    目录结构:

    src/recipes:提供了各种Zookeeper应用例子

    src/c:提供了c版客户端。zookeeper_st,zookeeper_mt两个library

    src/contrib:别人贡献的代码?

    src/generated:由jute生成的java实体类

    客户端入口:org.apache.zookeeper.ZooKeeperMain

    //读取命令行输入,用MyCommandOptions解析。

    //内部类MyCommandOptions包含成员命令名command、参数列表cmdArgs

    -option value –option value command cmdArgs

    //根据以上解析的ip、端口,连接到ZooKeeper

            zk = newZooKeeper(host,

                    Integer.parseInt(cl.getOption("timeout")),

                     newMyWatcher(), readOnly);

    //执行命令,在ZooKeeperMain.run()

    //ZooKeeperMain只是一个外壳,使用jline实现了命令提示功能。

    //commandMapCli将提供的命令命令名与执行体CliCommand关联

            //execute from commandMap

            CliCommandcliCmd = commandMapCli.get(cmd);

            if(cliCmd!=null) {

               cliCmd.setZk(zk);

                watch =cliCmd.parse(args).exec();

                       }

    //最终转到调用ZooKeeper方法

    //提供的命令:

    quit:Zk.close()关闭zk连接,调用cnxn.close()

    history:列出历史记录

    redo index:重新执行历史记录

    printwatches [on]:查看/设置watche开关状态

    connect:connectToZK(host)连接zk


    //ZooKeeper内部连接

            cnxn = newClientCnxn(connectStringParser.getChrootPath(),

                    hostProvider,sessionTimeout,this,watchManager,

                    getClientCnxnSocket(),canBeReadOnly);

            cnxn.start();

    ClientCnxn包含SendThread和EventThread两个线程

    SendThread将事件添加到waitEvents队列中,EventThread线程消费该队列。

    //下面以ls命令为例

    //调用zk.getChildren

        public boolean exec() throwsKeeperException, InterruptedException {

            String path= args[1];

            boolean watch =cl.hasOption("w");

           List<String> children = zk.getChildren(path, watch);

            out.println(children);

            return watch;

        }

    //getChildren生成request

           RequestHeader h = newRequestHeader();

           h.setType(ZooDefs.OpCode.getChildren);

           GetChildrenRequest request = newGetChildrenRequest();

           request.setPath(serverPath);

           request.setWatch(watcher != null);

           GetChildrenResponse response = newGetChildrenResponse();

            ReplyHeader r = cnxn.submitRequest(h, request,response, wcb);

    //submitRequest调用queuePacket

        publicReplyHeadersubmitRequest(RequestHeaderh, Record request,

                Recordresponse, WatchRegistration watchRegistration)

                throwsInterruptedException {

            ReplyHeaderr = new ReplyHeader();

            Packetpacket = queuePacket(h,r, request, response,null,null,null,

                       null, watchRegistration);

            synchronized(packet) {

                while (!packet.finished) {

                   packet.wait();

                }

            }

            return r;

        }

    //queuePacket将Packet添加到outgoingQueue队列中

                packet= new Packet(h, r, request, response,watchRegistration);

                packet.cb = cb;

                packet.ctx = ctx;

                packet.clientPath =clientPath;

                packet.serverPath =serverPath;

     

                    outgoingQueue.add(packet);

     

             //然后唤醒selector

            sendThread.getClientCnxnSocket().wakeupCnxn();


    //sendThread.run消费outgoingQueue

            clientCnxnSocket.doTransport(to,pendingQueue,outgoingQueue,ClientCnxn.this);

    //selector判断读/写事件

    //doTransport调用doIO,doIO解析Response

             //读事件

            int rc =sock.read(incomingBuffer);

            sendThread.readResponse(incomingBuffer);

             //写事件

           sock.write(p.bb);


    //readResponse在finally块中调用finishPacket,finishPacket将设置packet.finish,

    //此时submitRequest返回response。

        try {

            packet.replyHeader.setXid(replyHdr.getXid());

            packet.replyHeader.setErr(replyHdr.getErr());

            packet.replyHeader.setZxid(replyHdr.getZxid());

            if(replyHdr.getZxid() > 0) {

                lastZxid =replyHdr.getZxid();

            }

            if(packet.response !=null&& replyHdr.getErr() == 0) {

                packet.response.deserialize(bbia,"response");

            }

        } finally {

            finishPacket(packet);

        }


    以下图片转自:http://www.spnguru.com/2010/08/zookeeper%E5%85%A8%E8%A7%A3%E6%9E%90%E2%80%94%E2%80%94client%E7%AB%AF/

     


  • 相关阅读:
    Live2D 看板娘
    Live2D 看板娘
    Live2D 看板娘
    Live2D 看板娘
    Live2D 看板娘
    Live2D 看板娘
    Live2D 看板娘
    Live2D 看板娘
    Live2D 看板娘
    Live2D 看板娘
  • 原文地址:https://www.cnblogs.com/javawebsoa/p/3209189.html
Copyright © 2011-2022 走看看