zoukankan      html  css  js  c++  java
  • zookeeper源码分析之三客户端发送请求流程

      znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等。

    知识准备:

    zookeeper定义的状态有:

    Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),SaslAuthenticated(6),Expired (-112);
    
    

    事件定义的的类型有:None (-1),NodeCreated (1),NodeDeleted (2),NodeDataChanged (3),NodeChildrenChanged (4),DataWatchRemoved (5),ChildWatchRemoved (6);

    watcher定义的的类型有Children(1), Data(2), Any(3);

    在上一篇

    zookeeper源码分析之一客户端

    中,我们连接zookeeper时,启动了一个MyWatcher

    protected void connectToZK(String newHost) throws InterruptedException, IOException {
            if (zk != null && zk.getState().isAlive()) {
                zk.close();
            }
            host = newHost;
            boolean readOnly = cl.getOption("readonly") != null;
            if (cl.getOption("secure") != null) {
                System.setProperty(ZooKeeper.SECURE_CLIENT, "true");
                System.out.println("Secure connection is enabled");
            }
            zk = new ZooKeeper(host,
                     Integer.parseInt(cl.getOption("timeout")),
                     new MyWatcher(), readOnly);
        }

    创建zookeeper示例时,使用到watchManager:

        public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                boolean canBeReadOnly, HostProvider aHostProvider)
                throws IOException {
            LOG.info("Initiating client connection, connectString=" + connectString
                    + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
    
            watchManager = defaultWatchManager();
            watchManager.defaultWatcher = watcher;
    
            ConnectStringParser connectStringParser = new ConnectStringParser(
                    connectString);
            hostProvider = aHostProvider;
    
            cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                    hostProvider, sessionTimeout, this, watchManager,
                    getClientCnxnSocket(), canBeReadOnly);
            cnxn.start();
        }

    将传进来的MyWatcher作为默认watcher,存入watchManager,然后通过ClientCnxn包装后,启动线程。

      那我们先了解一下ClientCnxn吧,ClientCnxn管理客户端socket的io,它维护了一组可以连接上的server及当需要转换时可以透明的转换到的一组server。

    先了解一下如何获取socket的吧:

        private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
            String clientCnxnSocketName = System
                    .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
            if (clientCnxnSocketName == null) {
                clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
            }
            try {
                return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
                        .newInstance();
            } catch (Exception e) {
                IOException ioe = new IOException("Couldn't instantiate "
                        + clientCnxnSocketName);
                ioe.initCause(e);
                throw ioe;
            }
        }

      接着启动ClientCnxn的start()方法,在此方法中启动了两个线程:

        public void start() {
            sendThread.start();
            eventThread.start();
        }

    其中SendThread类为发送的请求队列提供服务,并且产生心跳。它同时也产生ReadThread。

    我们看一下SendThread的run方法的主体:

                        if (!clientCnxnSocket.isConnected()) {
                            // don't re-establish connection if we are closing
                            if (closing) {
                                break;
                            }
                            startConnect();
                            clientCnxnSocket.updateLastSendAndHeard();
                        }
    
                        if (state.isConnected()) {
                            // determine whether we need to send an AuthFailed event.
                            if (zooKeeperSaslClient != null) {
                                boolean sendAuthEvent = false;
                                if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                    try {
                                        zooKeeperSaslClient.initialize(ClientCnxn.this);
                                    } catch (SaslException e) {
                                       LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                                        state = States.AUTH_FAILED;
                                        sendAuthEvent = true;
                                    }
                                }
                                KeeperState authState = zooKeeperSaslClient.getKeeperState();
                                if (authState != null) {
                                    if (authState == KeeperState.AuthFailed) {
                                        // An authentication error occurred during authentication with the Zookeeper Server.
                                        state = States.AUTH_FAILED;
                                        sendAuthEvent = true;
                                    } else {
                                        if (authState == KeeperState.SaslAuthenticated) {
                                            sendAuthEvent = true;
                                        }
                                    }
                                }
    
                                if (sendAuthEvent == true) {
                                    eventThread.queueEvent(new WatchedEvent(
                                          Watcher.Event.EventType.None,
                                          authState,null));
                                }
                            }
                            to = readTimeout - clientCnxnSocket.getIdleRecv();
                        } else {
                            to = connectTimeout - clientCnxnSocket.getIdleRecv();
                        }
                        
                        if (to <= 0) {
                            String warnInfo;
                            warnInfo = "Client session timed out, have not heard from server in "
                                + clientCnxnSocket.getIdleRecv()
                                + "ms"
                                + " for sessionid 0x"
                                + Long.toHexString(sessionId);
                            LOG.warn(warnInfo);
                            throw new SessionTimeoutException(warnInfo);
                        }
                        if (state.isConnected()) {
                            //1000(1 second) is to prevent race condition missing to send the second ping
                            //also make sure not to send too many pings when readTimeout is small 
                            int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                                    ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                            //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                            if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                                sendPing();
                                clientCnxnSocket.updateLastSend();
                            } else {
                                if (timeToNextPing < to) {
                                    to = timeToNextPing;
                                }
                            }
                        }
    
                        // If we are in read-only mode, seek for read/write server
                        if (state == States.CONNECTEDREADONLY) {
                            long now = Time.currentElapsedTime();
                            int idlePingRwServer = (int) (now - lastPingRwServer);
                            if (idlePingRwServer >= pingRwTimeout) {
                                lastPingRwServer = now;
                                idlePingRwServer = 0;
                                pingRwTimeout =
                                    Math.min(2*pingRwTimeout, maxPingRwTimeout);
                                pingRwServer();
                            }
                            to = Math.min(to, pingRwTimeout - idlePingRwServer);
                        }
    
                        clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                    

    ClientCnxnSocketNetty实现了ClientCnxnSocket的抽象方法,它负责连接到server,读取/写入网络流量,并作为网络数据层和更高packet层的中间层。其生命周期如下:

         loop:
         - try:
         - - !isConnected()
         - - - connect()
         - - doTransport()
         - catch:
         - - cleanup()
         close()

    从上述描述中,我们可以看到ClientCnxnSocket的工作流程,先判断是否连接,没有连接则调用connect方法进行连接,有连接则直接使用;然后调用doTransport方法进行通信,若连接过程中出现异常,则调用cleanup()方法;最后关闭连接。故最主要的流程为doTransport()方法:

     @Override
        void doTransport(int waitTimeOut,
                         List<Packet> pendingQueue,
                         ClientCnxn cnxn)
                throws IOException, InterruptedException {
            try {
                if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
                    return;
                }
                Packet head = null;
                if (needSasl.get()) {
                    if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                        return;
                    }
                } else {
                    if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
                        return;
                    }
                }
                // check if being waken up on closing.
                if (!sendThread.getZkState().isAlive()) {
                    // adding back the patck to notify of failure in conLossPacket().
                    addBack(head);
                    return;
                }
                // channel disconnection happened
                if (disconnected.get()) {
                    addBack(head);
                    throw new EndOfStreamException("channel for sessionid 0x"
                            + Long.toHexString(sessionId)
                            + " is lost");
                }
                if (head != null) {
                    doWrite(pendingQueue, head, cnxn);
                }
            } finally {
                updateNow();
            }
        }

    我们简化一下上面的程序,一个是异常处理addBack(head),另一个正常流程处理doWrite(pendingQueue, head, cnxn),我们先抛掉异常,走正常流程看看:

    先获取Packet:

    Packet head = null;
                if (needSasl.get()) {
                    if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                        return;
                    }
                } else {
                    if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
                        return;
                    }
                }

    其中,protected LinkedBlockingDeque<Packet> outgoingQueue是一个链表阻塞队列,保存发出的请求;

    然后执行doWrite方法:

     /**
         * doWrite handles writing the packets from outgoingQueue via network to server.
         */
        private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
            updateNow();
            while (true) {
                if (p != WakeupPacket.getInstance()) {
                    if ((p.requestHeader != null) &&
                            (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
                            (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                        synchronized (pendingQueue) {
                            pendingQueue.add(p);
                        }
                    }
                    sendPkt(p);
                }
                if (outgoingQueue.isEmpty()) {
                    break;
                }
                p = outgoingQueue.remove();
            }
        }

    dowrite方法负责将outgoingQueue的报文通过网络写到服务器上。发送报文程序如上红色所示:

        private void sendPkt(Packet p) {
            // Assuming the packet will be sent out successfully. Because if it fails,
            // the channel will close and clean up queues.
            p.createBB();
            updateLastSend();
            sentCount++;
            channel.write(ChannelBuffers.wrappedBuffer(p.bb));
        }

    1. Packet报文的结构如下:

     /**
         * This class allows us to pass the headers and the relevant records around.
         */
        static class Packet {
            RequestHeader requestHeader;
    
            ReplyHeader replyHeader;
    
            Record request;
    
            Record response;
    
            ByteBuffer bb;
    
            /** Client's view of the path (may differ due to chroot) **/
            String clientPath;
            /** Servers's view of the path (may differ due to chroot) **/
            String serverPath;
    
            boolean finished;
    
            AsyncCallback cb;
    
            Object ctx;
    
            WatchRegistration watchRegistration;
    
            public boolean readOnly;
    
            WatchDeregistration watchDeregistration;
    
            /** Convenience ctor */
            Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
                   Record request, Record response,
                   WatchRegistration watchRegistration) {
                this(requestHeader, replyHeader, request, response,
                     watchRegistration, false);
            }
    
            Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
                   Record request, Record response,
                   WatchRegistration watchRegistration, boolean readOnly) {
    
                this.requestHeader = requestHeader;
                this.replyHeader = replyHeader;
                this.request = request;
                this.response = response;
                this.readOnly = readOnly;
                this.watchRegistration = watchRegistration;
            }
    
            public void createBB() {
                try {
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                    boa.writeInt(-1, "len"); // We'll fill this in later
                    if (requestHeader != null) {
                        requestHeader.serialize(boa, "header");
                    }
                    if (request instanceof ConnectRequest) {
                        request.serialize(boa, "connect");
                        // append "am-I-allowed-to-be-readonly" flag
                        boa.writeBool(readOnly, "readOnly");
                    } else if (request != null) {
                        request.serialize(boa, "request");
                    }
                    baos.close();
                    this.bb = ByteBuffer.wrap(baos.toByteArray());
                    this.bb.putInt(this.bb.capacity() - 4);
                    this.bb.rewind();
                } catch (IOException e) {
                    LOG.warn("Ignoring unexpected exception", e);
                }
            }
    
            @Override
            public String toString() {
                StringBuilder sb = new StringBuilder();
    
                sb.append("clientPath:" + clientPath);
                sb.append(" serverPath:" + serverPath);
                sb.append(" finished:" + finished);
    
                sb.append(" header:: " + requestHeader);
                sb.append(" replyHeader:: " + replyHeader);
                sb.append(" request:: " + request);
                sb.append(" response:: " + response);
    
                // jute toString is horrible, remove unnecessary newlines
                return sb.toString().replaceAll("
    *
    +", " ");
            }
        }

    从createBB方法中,我们看到在底层实际的网络传输序列化中,zookeeper只会讲requestHeader和request两个属性进行序列化,即只有这两个会被序列化到底层字节数组中去进行网络传输,不会将watchRegistration相关的信息进行网络传输。

    2. 更新最后一次发送updateLastSend

        void updateLastSend() {
            this.lastSend = now;
        }

    3. 使用nio channel 发送字节缓存到server

    channel.write(ChannelBuffers.wrappedBuffer(p.bb));

    其中,bb的类型为ByteBuffer,在packet中进行了初始化。

                    this.bb = ByteBuffer.wrap(baos.toByteArray());
                    this.bb.putInt(this.bb.capacity() - 4);
                    this.bb.rewind();

    小结:

    zookeeper客户端和服务器的连接主要是通过ClientCnxnSocket来实现的,有两个具体的实现类ClientCnxnSocketNetty和ClientCnxnSocketNIO,其工作流程如下:

      先判断是否连接,没有连接则调用connect方法进行连接,有连接则进入下一步;

      然后调用doTransport方法进行通信,若连接过程中出现异常,则调用cleanup()方法;

      最后关闭连接。

    上述的发现可以在SendThread的run方法中体现。

    另:Zookeeper的特性--》顺序一致性:按照客户端发送请求的顺序更新数据。我们再sendThread里可以看到多次更新时间戳来保证顺序一致性,如下:

  • 相关阅读:
    CF1539 VP 记录
    CF1529 VP 记录
    CF875C National Property 题解
    CF1545 比赛记录
    CF 1550 比赛记录
    CF1539E Game with Cards 题解
    CF1202F You Are Given Some Letters... 题解
    vmware Linux虚拟机挂载共享文件夹
    利用SOLR搭建企业搜索平台 之九(solr的查询语法)
    利用SOLR搭建企业搜索平台 之四(MultiCore)
  • 原文地址:https://www.cnblogs.com/davidwang456/p/5000927.html
Copyright © 2011-2022 走看看