zoukankan      html  css  js  c++  java
  • zookeeper源码分析之三客户端发送请求流程

      znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等。

    知识准备:

    zookeeper定义的状态有:

    Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),SaslAuthenticated(6),Expired (-112);
    
    

    事件定义的的类型有:None (-1),NodeCreated (1),NodeDeleted (2),NodeDataChanged (3),NodeChildrenChanged (4),DataWatchRemoved (5),ChildWatchRemoved (6);

    watcher定义的的类型有Children(1), Data(2), Any(3);

    在上一篇

    zookeeper源码分析之一客户端

    中,我们连接zookeeper时,启动了一个MyWatcher

    protected void connectToZK(String newHost) throws InterruptedException, IOException {
            if (zk != null && zk.getState().isAlive()) {
                zk.close();
            }
            host = newHost;
            boolean readOnly = cl.getOption("readonly") != null;
            if (cl.getOption("secure") != null) {
                System.setProperty(ZooKeeper.SECURE_CLIENT, "true");
                System.out.println("Secure connection is enabled");
            }
            zk = new ZooKeeper(host,
                     Integer.parseInt(cl.getOption("timeout")),
                     new MyWatcher(), readOnly);
        }

    创建zookeeper示例时,使用到watchManager:

        public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                boolean canBeReadOnly, HostProvider aHostProvider)
                throws IOException {
            LOG.info("Initiating client connection, connectString=" + connectString
                    + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
    
            watchManager = defaultWatchManager();
            watchManager.defaultWatcher = watcher;
    
            ConnectStringParser connectStringParser = new ConnectStringParser(
                    connectString);
            hostProvider = aHostProvider;
    
            cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                    hostProvider, sessionTimeout, this, watchManager,
                    getClientCnxnSocket(), canBeReadOnly);
            cnxn.start();
        }

    将传进来的MyWatcher作为默认watcher,存入watchManager,然后通过ClientCnxn包装后,启动线程。

      那我们先了解一下ClientCnxn吧,ClientCnxn管理客户端socket的io,它维护了一组可以连接上的server及当需要转换时可以透明的转换到的一组server。

    先了解一下如何获取socket的吧:

        private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
            String clientCnxnSocketName = System
                    .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
            if (clientCnxnSocketName == null) {
                clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
            }
            try {
                return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
                        .newInstance();
            } catch (Exception e) {
                IOException ioe = new IOException("Couldn't instantiate "
                        + clientCnxnSocketName);
                ioe.initCause(e);
                throw ioe;
            }
        }

      接着启动ClientCnxn的start()方法,在此方法中启动了两个线程:

        public void start() {
            sendThread.start();
            eventThread.start();
        }

    其中SendThread类为发送的请求队列提供服务,并且产生心跳。它同时也产生ReadThread。

    我们看一下SendThread的run方法的主体:

                        if (!clientCnxnSocket.isConnected()) {
                            // don't re-establish connection if we are closing
                            if (closing) {
                                break;
                            }
                            startConnect();
                            clientCnxnSocket.updateLastSendAndHeard();
                        }
    
                        if (state.isConnected()) {
                            // determine whether we need to send an AuthFailed event.
                            if (zooKeeperSaslClient != null) {
                                boolean sendAuthEvent = false;
                                if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                    try {
                                        zooKeeperSaslClient.initialize(ClientCnxn.this);
                                    } catch (SaslException e) {
                                       LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                                        state = States.AUTH_FAILED;
                                        sendAuthEvent = true;
                                    }
                                }
                                KeeperState authState = zooKeeperSaslClient.getKeeperState();
                                if (authState != null) {
                                    if (authState == KeeperState.AuthFailed) {
                                        // An authentication error occurred during authentication with the Zookeeper Server.
                                        state = States.AUTH_FAILED;
                                        sendAuthEvent = true;
                                    } else {
                                        if (authState == KeeperState.SaslAuthenticated) {
                                            sendAuthEvent = true;
                                        }
                                    }
                                }
    
                                if (sendAuthEvent == true) {
                                    eventThread.queueEvent(new WatchedEvent(
                                          Watcher.Event.EventType.None,
                                          authState,null));
                                }
                            }
                            to = readTimeout - clientCnxnSocket.getIdleRecv();
                        } else {
                            to = connectTimeout - clientCnxnSocket.getIdleRecv();
                        }
                        
                        if (to <= 0) {
                            String warnInfo;
                            warnInfo = "Client session timed out, have not heard from server in "
                                + clientCnxnSocket.getIdleRecv()
                                + "ms"
                                + " for sessionid 0x"
                                + Long.toHexString(sessionId);
                            LOG.warn(warnInfo);
                            throw new SessionTimeoutException(warnInfo);
                        }
                        if (state.isConnected()) {
                            //1000(1 second) is to prevent race condition missing to send the second ping
                            //also make sure not to send too many pings when readTimeout is small 
                            int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                                    ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                            //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                            if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                                sendPing();
                                clientCnxnSocket.updateLastSend();
                            } else {
                                if (timeToNextPing < to) {
                                    to = timeToNextPing;
                                }
                            }
                        }
    
                        // If we are in read-only mode, seek for read/write server
                        if (state == States.CONNECTEDREADONLY) {
                            long now = Time.currentElapsedTime();
                            int idlePingRwServer = (int) (now - lastPingRwServer);
                            if (idlePingRwServer >= pingRwTimeout) {
                                lastPingRwServer = now;
                                idlePingRwServer = 0;
                                pingRwTimeout =
                                    Math.min(2*pingRwTimeout, maxPingRwTimeout);
                                pingRwServer();
                            }
                            to = Math.min(to, pingRwTimeout - idlePingRwServer);
                        }
    
                        clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                    

    ClientCnxnSocketNetty实现了ClientCnxnSocket的抽象方法,它负责连接到server,读取/写入网络流量,并作为网络数据层和更高packet层的中间层。其生命周期如下:

         loop:
         - try:
         - - !isConnected()
         - - - connect()
         - - doTransport()
         - catch:
         - - cleanup()
         close()

    从上述描述中,我们可以看到ClientCnxnSocket的工作流程,先判断是否连接,没有连接则调用connect方法进行连接,有连接则直接使用;然后调用doTransport方法进行通信,若连接过程中出现异常,则调用cleanup()方法;最后关闭连接。故最主要的流程为doTransport()方法:

     @Override
        void doTransport(int waitTimeOut,
                         List<Packet> pendingQueue,
                         ClientCnxn cnxn)
                throws IOException, InterruptedException {
            try {
                if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
                    return;
                }
                Packet head = null;
                if (needSasl.get()) {
                    if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                        return;
                    }
                } else {
                    if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
                        return;
                    }
                }
                // check if being waken up on closing.
                if (!sendThread.getZkState().isAlive()) {
                    // adding back the patck to notify of failure in conLossPacket().
                    addBack(head);
                    return;
                }
                // channel disconnection happened
                if (disconnected.get()) {
                    addBack(head);
                    throw new EndOfStreamException("channel for sessionid 0x"
                            + Long.toHexString(sessionId)
                            + " is lost");
                }
                if (head != null) {
                    doWrite(pendingQueue, head, cnxn);
                }
            } finally {
                updateNow();
            }
        }

    我们简化一下上面的程序,一个是异常处理addBack(head),另一个正常流程处理doWrite(pendingQueue, head, cnxn),我们先抛掉异常,走正常流程看看:

    先获取Packet:

    Packet head = null;
                if (needSasl.get()) {
                    if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                        return;
                    }
                } else {
                    if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
                        return;
                    }
                }

    其中,protected LinkedBlockingDeque<Packet> outgoingQueue是一个链表阻塞队列,保存发出的请求;

    然后执行doWrite方法:

     /**
         * doWrite handles writing the packets from outgoingQueue via network to server.
         */
        private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
            updateNow();
            while (true) {
                if (p != WakeupPacket.getInstance()) {
                    if ((p.requestHeader != null) &&
                            (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
                            (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                        synchronized (pendingQueue) {
                            pendingQueue.add(p);
                        }
                    }
                    sendPkt(p);
                }
                if (outgoingQueue.isEmpty()) {
                    break;
                }
                p = outgoingQueue.remove();
            }
        }

    dowrite方法负责将outgoingQueue的报文通过网络写到服务器上。发送报文程序如上红色所示:

        private void sendPkt(Packet p) {
            // Assuming the packet will be sent out successfully. Because if it fails,
            // the channel will close and clean up queues.
            p.createBB();
            updateLastSend();
            sentCount++;
            channel.write(ChannelBuffers.wrappedBuffer(p.bb));
        }

    1. Packet报文的结构如下:

     /**
         * This class allows us to pass the headers and the relevant records around.
         */
        static class Packet {
            RequestHeader requestHeader;
    
            ReplyHeader replyHeader;
    
            Record request;
    
            Record response;
    
            ByteBuffer bb;
    
            /** Client's view of the path (may differ due to chroot) **/
            String clientPath;
            /** Servers's view of the path (may differ due to chroot) **/
            String serverPath;
    
            boolean finished;
    
            AsyncCallback cb;
    
            Object ctx;
    
            WatchRegistration watchRegistration;
    
            public boolean readOnly;
    
            WatchDeregistration watchDeregistration;
    
            /** Convenience ctor */
            Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
                   Record request, Record response,
                   WatchRegistration watchRegistration) {
                this(requestHeader, replyHeader, request, response,
                     watchRegistration, false);
            }
    
            Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
                   Record request, Record response,
                   WatchRegistration watchRegistration, boolean readOnly) {
    
                this.requestHeader = requestHeader;
                this.replyHeader = replyHeader;
                this.request = request;
                this.response = response;
                this.readOnly = readOnly;
                this.watchRegistration = watchRegistration;
            }
    
            public void createBB() {
                try {
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                    boa.writeInt(-1, "len"); // We'll fill this in later
                    if (requestHeader != null) {
                        requestHeader.serialize(boa, "header");
                    }
                    if (request instanceof ConnectRequest) {
                        request.serialize(boa, "connect");
                        // append "am-I-allowed-to-be-readonly" flag
                        boa.writeBool(readOnly, "readOnly");
                    } else if (request != null) {
                        request.serialize(boa, "request");
                    }
                    baos.close();
                    this.bb = ByteBuffer.wrap(baos.toByteArray());
                    this.bb.putInt(this.bb.capacity() - 4);
                    this.bb.rewind();
                } catch (IOException e) {
                    LOG.warn("Ignoring unexpected exception", e);
                }
            }
    
            @Override
            public String toString() {
                StringBuilder sb = new StringBuilder();
    
                sb.append("clientPath:" + clientPath);
                sb.append(" serverPath:" + serverPath);
                sb.append(" finished:" + finished);
    
                sb.append(" header:: " + requestHeader);
                sb.append(" replyHeader:: " + replyHeader);
                sb.append(" request:: " + request);
                sb.append(" response:: " + response);
    
                // jute toString is horrible, remove unnecessary newlines
                return sb.toString().replaceAll("
    *
    +", " ");
            }
        }

    从createBB方法中,我们看到在底层实际的网络传输序列化中,zookeeper只会讲requestHeader和request两个属性进行序列化,即只有这两个会被序列化到底层字节数组中去进行网络传输,不会将watchRegistration相关的信息进行网络传输。

    2. 更新最后一次发送updateLastSend

        void updateLastSend() {
            this.lastSend = now;
        }

    3. 使用nio channel 发送字节缓存到server

    channel.write(ChannelBuffers.wrappedBuffer(p.bb));

    其中,bb的类型为ByteBuffer,在packet中进行了初始化。

                    this.bb = ByteBuffer.wrap(baos.toByteArray());
                    this.bb.putInt(this.bb.capacity() - 4);
                    this.bb.rewind();

    小结:

    zookeeper客户端和服务器的连接主要是通过ClientCnxnSocket来实现的,有两个具体的实现类ClientCnxnSocketNetty和ClientCnxnSocketNIO,其工作流程如下:

      先判断是否连接,没有连接则调用connect方法进行连接,有连接则进入下一步;

      然后调用doTransport方法进行通信,若连接过程中出现异常,则调用cleanup()方法;

      最后关闭连接。

    上述的发现可以在SendThread的run方法中体现。

    另:Zookeeper的特性--》顺序一致性:按照客户端发送请求的顺序更新数据。我们再sendThread里可以看到多次更新时间戳来保证顺序一致性,如下:

  • 相关阅读:
    Python 编码错误解决方案
    Slf4j 打日志的问题 Exception 没有堆栈信息
    朋友遇到过的t厂面试题
    php 加密解密算法
    mysql replace into用法详细说明
    python3 return print 之间的差异
    mac多版本python安装 pymysql
    thinkphp 在做搜索时的注意点
    get_object_vars()
    php中var关键字的作用和意义
  • 原文地址:https://www.cnblogs.com/davidwang456/p/5000927.html
Copyright © 2011-2022 走看看