zoukankan      html  css  js  c++  java
  • Zookeeper 3.4.6 Client端流程粗略梳理

    首先从Zookeeper入手,Zookeeper-->ClientCnxn-->sendThread/eventThread

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
    
        watchManager.defaultWatcher = watcher;
    
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
    }
    

    这里默认使用getClientCnxnSocket(),使用NIO实现ClientCnxnSocketNIO。

    cnxn.start();

    public void start() {
        sendThread.start();
        eventThread.start();
    }
    

    Zookeeper的APi与Server交互,cnxn.submitRequest提交请求,getData为例,各种xxRequest,xxRespone

    public byte[] getData(final String path, Watcher watcher, Stat stat)
        throws KeeperException, InterruptedException
     {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);
    
        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new DataWatchRegistration(watcher, clientPath);
        }
    
        final String serverPath = prependChroot(clientPath);
    
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }
    

    cnxn.submitRequest先丢到通过queuePacket将请求加入队列outgoingQueue,同步请求阻塞在packet.wait(),这个最后在SendThread.readRespons()-->finishPacket(packet);中被唤醒

    public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }
    

    加入outgoingQueue队列

    Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;
    
        // Note that we do not generate the Xid for the packet yet. It is
        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
        // where the packet is actually sent.
        synchronized (outgoingQueue) {
            packet = new Packet(h, r, request, response, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                //添加到outgoingQueue发送队列
                outgoingQueue.add(packet);
            }
        }
        //唤醒Selector
        sendThread.getClientCnxnSocket().wakeupCnxn();
        return packet;
    }
    

    ClientCnxn的后台线程SendThread负责发送outgoingQueue中的请求,以及接受Server端返回的数据,还包括心跳的发送,run()

    先看主要IO流程
    clientCnxnSocket.doTransport-->doIO(pendingQueue, outgoingQueue, cnxn)-->sockKey.isWritable()-->sock.write(p.bb)-->sockKey.isReadable()--> sendThread.readResponse(incomingBuffer)--> finishPacket(packet)-->p.notifyAll();

    这里在sock.write(p.bb),hui将Packet加入到pendingQueue队列

    如果是读取完了之后,自然要从pendingQueue移除remove

    
    clientCnxnSocket.introduce(this,sessionId);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = System.currentTimeMillis();
    while (state.isAlive()) {
        try {
            if (!clientCnxnSocket.isConnected()) {
                if(!isFirstConnect){
                    try {
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                // don't re-establish connection if we are closing
                if (closing || !state.isAlive()) {
                    break;
                }
                startConnect();
                clientCnxnSocket.updateLastSendAndHeard();
            }
    
            if (state.isConnected()) {
                // determine whether we need to send an AuthFailed event.
                if (zooKeeperSaslClient != null) {
                    boolean sendAuthEvent = false;
                    if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                        try {
                            zooKeeperSaslClient.initialize(ClientCnxn.this);
                        } catch (SaslException e) {
                           LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        }
                    }
                    KeeperState authState = zooKeeperSaslClient.getKeeperState();
                    if (authState != null) {
                        if (authState == KeeperState.AuthFailed) {
                            // An authentication error occurred during authentication with the Zookeeper Server.
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        } else {
                            if (authState == KeeperState.SaslAuthenticated) {
                                sendAuthEvent = true;
                            }
                        }
                    }
    
                    if (sendAuthEvent == true) {
                        eventThread.queueEvent(new WatchedEvent(
                              Watcher.Event.EventType.None,
                              authState,null));
                    }
                }
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else {
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }
            
            if (to <= 0) {
                throw new SessionTimeoutException(
                        "Client session timed out, have not heard from server in "
                                + clientCnxnSocket.getIdleRecv() + "ms"
                                + " for sessionid 0x"
                                + Long.toHexString(sessionId));
            }
            if (state.isConnected()) {
                int timeToNextPing = readTimeout / 2
                        - clientCnxnSocket.getIdleSend();
                if (timeToNextPing <= 0) {
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                }
            }
    
            // If we are in read-only mode, seek for read/write server
            if (state == States.CONNECTEDREADONLY) {
                long now = System.currentTimeMillis();
                int idlePingRwServer = (int) (now - lastPingRwServer);
                if (idlePingRwServer >= pingRwTimeout) {
                    lastPingRwServer = now;
                    idlePingRwServer = 0;
                    pingRwTimeout =
                        Math.min(2*pingRwTimeout, maxPingRwTimeout);
                    pingRwServer();
                }
                to = Math.min(to, pingRwTimeout - idlePingRwServer);
            }
    
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        } catch (Throwable e) {
            if (closing) {
                if (LOG.isDebugEnabled()) {
                    // closing so this is expected
                    LOG.debug("An exception was thrown while closing send thread for session 0x"
                            + Long.toHexString(getSessionId())
                            + " : " + e.getMessage());
                }
                break;
            } else {
                // this is ugly, you have a better way speak up
                if (e instanceof SessionExpiredException) {
                    LOG.info(e.getMessage() + ", closing socket connection");
                } else if (e instanceof SessionTimeoutException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof EndOfStreamException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof RWServerFoundException) {
                    LOG.info(e.getMessage());
                } else {
                    LOG.warn(
                            "Session 0x"
                                    + Long.toHexString(getSessionId())
                                    + " for server "
                                    + clientCnxnSocket.getRemoteSocketAddress()
                                    + ", unexpected error"
                                    + RETRY_CONN_MSG, e);
                }
                cleanup();
                if (state.isAlive()) {
                    eventThread.queueEvent(new WatchedEvent(
                            Event.EventType.None,
                            Event.KeeperState.Disconnected,
                            null));
                }
                clientCnxnSocket.updateNow();
                clientCnxnSocket.updateLastSendAndHeard();
            }
        }
    }
    cleanup();
    clientCnxnSocket.close();
    if (state.isAlive()) {
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                Event.KeeperState.Disconnected, null));
    }
    ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                             "SendThread exitedloop.");
            
    
    
  • 相关阅读:
    mysql general log使用介绍
    是否可以根据GTID 选出日志最新的实例
    python踩坑现场,看起来一样的两个字符串,却不相等
    sql case when的使用
    golang 匿名结构体成员,具名结构体成员,继承,组合
    golang go-sql-driver/mysql基本原理
    raft协议中的日志安全性
    go get 安装 go.etcd.io etcd clientv3 报错
    ZGC
    发现jdk9之后,AQS代码有啥变化了吗
  • 原文地址:https://www.cnblogs.com/donganwangshi/p/4120080.html
Copyright © 2011-2022 走看看