zoukankan      html  css  js  c++  java
  • Zookeeper-watcher机制源码分析(一)

    Watcher的基本流程

    ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理 Watcher 和客户端回调 Watcher

    客户端注册watcher有3种方式,getData、exists、getChildren;以如下代码为例来分析整个触发机制的原理

    ZooKeeper zookeeper=new ZooKeeper(“192.168.11.152:2181”,4000,new Watcher(){

    public void processor(WatchedEvent event){

     System.out.println(“event.type”);

    }

    });

    zookeeper.create(“/mic”,”0”.getByte(),ZooDefs.Ids. OPEN_ACL_UNSAFE,CreateModel. PERSISTENT); //创建节点

    zookeeper.exists(“/mic”,true); //注册监听

    zookeeper.setData(“/mic”, “1”.getByte(),-1) ; //修改节点的值触发监听

    ZooKeeper API的初始化过程

    ZooKeeper zookeeper=new ZooKeeper(“192.168.11.152:2181”,4000,new Watcher(){

    public void processor(WatchedEvent event){

     System.out.println(“event.type”);

    }

    });

    在创建一个 ZooKeeper 客户端对象实例时,我们通过new Watcher()向构造方法中传入一个默认的 Watcher, 这个 Watcher 将作为整个 ZooKeeper会话期间的默认 Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中;代码如下

        public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,

                boolean canBeReadOnly, HostProvider aHostProvider,

                ZKClientConfig clientConfig) throws IOException {

            LOG.info("Initiating client connection, connectString=" + connectString

                    + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

            if (clientConfig == null) {

                clientConfig = new ZKClientConfig();

            }

            this.clientConfig = clientConfig;

            watchManager = defaultWatchManager();

            watchManager.defaultWatcher = watcher;  --在这里将watcher设置到ZKWatchManager

            ConnectStringParser connectStringParser = new ConnectStringParser(

                    connectString);

            hostProvider = aHostProvider;

            --初始化了ClientCnxn,并且调用cnxn.start()方法

            cnxn = new ClientCnxn(connectStringParser.getChrootPath(),

                    hostProvider, sessionTimeout, this, watchManager,

                    getClientCnxnSocket(), canBeReadOnly);

            cnxn.start();

        }

    ClientCnxn:是Zookeeper客户端和Zookeeper服务器端进行通信和事件通知处理的主要类,它内部包含两个类,

    1. SendThread  :负责客户端和服务器端的数据通信, 也包括事件信息的传输

    2. EventThread :  主要在客户端回调注册的Watchers进行通知处理

    ClientCnxn初始化

      public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,

                ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,

                long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {

            this.zooKeeper = zooKeeper;

            this.watcher = watcher;

            this.sessionId = sessionId;

            this.sessionPasswd = sessionPasswd;

            this.sessionTimeout = sessionTimeout;

            this.hostProvider = hostProvider;

            this.chrootPath = chrootPath;

            connectTimeout = sessionTimeout / hostProvider.size();

            readTimeout = sessionTimeout * 2 / 3;

            readOnly = canBeReadOnly;

            sendThread = new SendThread(clientCnxnSocket);  --初始化sendThread

            eventThread = new EventThread();                --初始化eventThread

            this.clientConfig=zooKeeper.getClientConfig();

        }

        public void start() { --启动两个线程

            sendThread.start();

            eventThread.start();

        }

    客户端通过exists注册监听

    zookeeper.exists(“/mic”,true); //注册监听

    通过exists方法来注册监听,代码如下

        public Stat exists(final String path, Watcher watcher)

            throws KeeperException, InterruptedException

        {

            final String clientPath = path;

            PathUtils.validatePath(clientPath);

            // the watch contains the un-chroot path

            WatchRegistration wcb = null;

            if (watcher != null) {

                wcb = new ExistsWatchRegistration(watcher, clientPath); //构建ExistWatchRegistration

            }

            final String serverPath = prependChroot(clientPath);

            RequestHeader h = new RequestHeader();

            h.setType(ZooDefs.OpCode.exists);  //设置操作类型为exists

            ExistsRequest request = new ExistsRequest();  // 构造ExistsRequest

            request.setPath(serverPath);

            request.setWatch(watcher != null);  //是否注册监听

            SetDataResponse response = new SetDataResponse();  //设置服务端响应的接收类

    //将封装的RequestHeader、ExistsRequest、SetDataResponse、WatchRegistration添加到发送队列

            ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 

            if (r.getErr() != 0) {

                if (r.getErr() == KeeperException.Code.NONODE.intValue()) {

                    return null;

                }

                throw KeeperException.create(KeeperException.Code.get(r.getErr()),

                        clientPath);

            }

            //返回exists得到的结果(Stat信息)

            return response.getStat().getCzxid() == -1 ? null : response.getStat();

        }

    cnxn.submitRequest

    public ReplyHeader submitRequest(RequestHeader h, Record request,

                Record response, WatchRegistration watchRegistration,

                WatchDeregistration watchDeregistration)

                throws InterruptedException {

            ReplyHeader r = new ReplyHeader();

            //将消息添加到队列,并构造一个Packet传输对象

            Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);

            synchronized (packet) {

                while (!packet.finished) { //在数据包没有处理完成之前,一直阻塞

                    packet.wait();

                }

            }

            return r;

        }

        public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,

                Record response, AsyncCallback cb, String clientPath,

                String serverPath, Object ctx, WatchRegistration watchRegistration,

                WatchDeregistration watchDeregistration) {

            //将相关传输对象转化成Packet

            Packet packet = null;

            packet = new Packet(h, r, request, response, watchRegistration);

            packet.cb = cb;

            packet.ctx = ctx;

            packet.clientPath = clientPath;

            packet.serverPath = serverPath;

            packet.watchDeregistration = watchDeregistration;

            

            synchronized (state) {

                if (!state.isAlive() || closing) {

                    conLossPacket(packet);

                } else {

                    if (h.getType() == OpCode.closeSession) {

                        closing = true;

                    }

                    outgoingQueue.add(packet); //添加到outgoingQueue

                }

            }

            sendThread.getClientCnxnSocket().packetAdded();//此处是多路复用机制,唤醒Selector,告诉他有数据包添加过来了

            return packet;

        }

    在 ZooKeeper 中,Packet 是一个最小的通信协议单元,即数据包。Pakcet 用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个 Packet 对象。在 ClientCnxn 中 WatchRegistration 也会被封装到 Pakcet 中,然后由 SendThread 线程调用queuePacket方法把 Packet 放入发送队列中等待客户端发送,这又是一个异步过程,分布式系统采用异步通信是一个非常常见的手段

    SendThread的发送过程

    在初始化连接的时候,zookeeper初始化了两个线程并且启动了。接下来我们来分析SendThread的发送过程,因为是一个线程,所以启动的时候会调用SendThread.run方法

    public void run() {

                clientCnxnSocket.introduce(this, sessionId, outgoingQueue);

                clientCnxnSocket.updateNow();

                clientCnxnSocket.updateLastSendAndHeard();

                int to;

                long lastPingRwServer = Time.currentElapsedTime();

                final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds

                while (state.isAlive()) {

                    try {

                        if (!clientCnxnSocket.isConnected()) {// 如果没有连接:发起连接

                            // don't re-establish connection if we are closing

                            if (closing) {

                                break;

                            }

                            startConnect(); //发起连接

                            clientCnxnSocket.updateLastSendAndHeard();

                        }

                        if (state.isConnected()) { //如果是连接状态,则处理sasl的认证授权

                            // determine whether we need to send an AuthFailed event.

                            if (zooKeeperSaslClient != null) {

                                boolean sendAuthEvent = false;

                                if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {

                                    try {

                                        zooKeeperSaslClient.initialize(ClientCnxn.this);

                                    } catch (SaslException e) {

                                       LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);

                                        state = States.AUTH_FAILED;

                                        sendAuthEvent = true;

                                    }

                                }

                                KeeperState authState = zooKeeperSaslClient.getKeeperState();

                                if (authState != null) {

                                    if (authState == KeeperState.AuthFailed) {

                                        // An authentication error occurred during authentication with the Zookeeper Server.

                                        state = States.AUTH_FAILED;

                                        sendAuthEvent = true;

                                    } else {

                                        if (authState == KeeperState.SaslAuthenticated) {

                                            sendAuthEvent = true;

                                        }

                                    }

                                }

                                if (sendAuthEvent == true) {

                                    eventThread.queueEvent(new WatchedEvent(

                                          Watcher.Event.EventType.None,

                                          authState,null));

                                }

                            }

                            to = readTimeout - clientCnxnSocket.getIdleRecv();

                        } else {

                            to = connectTimeout - clientCnxnSocket.getIdleRecv();

                        }

                        //to,表示客户端距离timeout还剩多少时间,准备发起ping连接

                        if (to <= 0) {//表示已经超时了。

                            String warnInfo;

                            warnInfo = "Client session timed out, have not heard from server in "

                                + clientCnxnSocket.getIdleRecv()

                                + "ms"

                                + " for sessionid 0x"

                                + Long.toHexString(sessionId);

                            LOG.warn(warnInfo);

                            throw new SessionTimeoutException(warnInfo);

                        }

                        if (state.isConnected()) {

                            //计算下一次ping请求的时间

                            int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -

                             ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);

                            //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL

                            if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {

                                sendPing(); //发送ping请求

                                clientCnxnSocket.updateLastSend();

                            } else {

                                if (timeToNextPing < to) {

                                    to = timeToNextPing;

                                }

                            }

                        }

                        // If we are in read-only mode, seek for read/write server

                        if (state == States.CONNECTEDREADONLY) {

                            long now = Time.currentElapsedTime();

                            int idlePingRwServer = (int) (now - lastPingRwServer);

                            if (idlePingRwServer >= pingRwTimeout) {

                                lastPingRwServer = now;

                                idlePingRwServer = 0;

                                pingRwTimeout =

                                    Math.min(2*pingRwTimeout, maxPingRwTimeout);

                                pingRwServer();

                            }

                            to = Math.min(to, pingRwTimeout - idlePingRwServer);

                        }

                        调用clientCnxnSocket,发起传输

                        其中 pendingQueue是一个用来存放已经发送、等待回应的Packet队列,

    clientCnxnSocket默认使用ClientCnxnSocketNIO(ps:还记得在哪里初始化吗?在实例化zookeeper的时候)

                        clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);

                    } catch (Throwable e) {

                        if (closing) {

                            if (LOG.isDebugEnabled()) {

                                // closing so this is expected

                                LOG.debug("An exception was thrown while closing send thread for session 0x"

                                        + Long.toHexString(getSessionId())

                                        + " : " + e.getMessage());

                            }

                            break;

                        } else {

                            // this is ugly, you have a better way speak up

                            if (e instanceof SessionExpiredException) {

                                LOG.info(e.getMessage() + ", closing socket connection");

                            } else if (e instanceof SessionTimeoutException) {

                                LOG.info(e.getMessage() + RETRY_CONN_MSG);

                            } else if (e instanceof EndOfStreamException) {

                                LOG.info(e.getMessage() + RETRY_CONN_MSG);

                            } else if (e instanceof RWServerFoundException) {

                                LOG.info(e.getMessage());

                            } else {

                                LOG.warn(

                                        "Session 0x"

                                                + Long.toHexString(getSessionId())

                                                + " for server "

                                                + clientCnxnSocket.getRemoteSocketAddress()

                                                + ", unexpected error"

                                                + RETRY_CONN_MSG, e);

                            }

                            // At this point, there might still be new packets appended to outgoingQueue.

                            // they will be handled in next connection or cleared up if closed.

                            cleanup();

                            if (state.isAlive()) {

                                eventThread.queueEvent(new WatchedEvent(

                                        Event.EventType.None,

                                        Event.KeeperState.Disconnected,

                                        null));

                            }

                            clientCnxnSocket.updateNow();

                            clientCnxnSocket.updateLastSendAndHeard();

                        }

                    }

                }

                synchronized (state) {

                    // When it comes to this point, it guarantees that later queued

                    // packet to outgoingQueue will be notified of death.

                    cleanup();

                }

                clientCnxnSocket.close();

                if (state.isAlive()) {

                    eventThread.queueEvent(new WatchedEvent(Event.EventType.None,

                            Event.KeeperState.Disconnected, null));

                }

                ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),

                        "SendThread exited loop for session: 0x"

                               + Long.toHexString(getSessionId()));

            }

    client 和 server的网络交互

    @Override

        void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {

            try {

                if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {

                    return;

                }

                Packet head = null;

                if (needSasl.get()) {

                    if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {

                        return;

                    }

                } else {  

                    //判断outgoingQueue是否存在待发送的数据包,不存在则直接返回

                    if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {

                        return;

                    }

                }

                // check if being waken up on closing.

                if (!sendThread.getZkState().isAlive()) {

                    // adding back the patck to notify of failure in conLossPacket().

                    addBack(head);

                    return;

                }

                // channel disconnection happened

                if (disconnected.get()) { //异常流程,channel关闭了,讲当前的packet添加到addBack中

                    addBack(head);

                    throw new EndOfStreamException("channel for sessionid 0x"

                            + Long.toHexString(sessionId)

                            + " is lost");

                }

                if (head != null) { //如果当前存在需要发送的数据包,则调用doWrite方法,pendingQueue表示处于已经发送过等待响应的packet队列

                    doWrite(pendingQueue, head, cnxn);

                }

            } finally {

                updateNow();

            }

        }

    DoWrite方法

        private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {

            updateNow();

            while (true) {

                if (p != WakeupPacket.getInstance()) {

                    if ((p.requestHeader != null) && //判断请求头以及判断当前请求类型不是ping或者auth操作

                            (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&

                            (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {

                        p.requestHeader.setXid(cnxn.getXid());  //设置xid,这个xid用来区分请求类型

                        synchronized (pendingQueue) {

                            pendingQueue.add(p); //将当前的packet添加到pendingQueue队列中

                        }

                    }

                    sendPkt(p); //将数据包发送出去

                }

                if (outgoingQueue.isEmpty()) {

                    break;

                }

                p = outgoingQueue.remove();

            }

        }

    sendPkt

       private void sendPkt(Packet p) {

            // Assuming the packet will be sent out successfully. Because if it fails,

            // the channel will close and clean up queues.

            p.createBB(); //序列化请求数据

            updateLastSend(); //更新最后一次发送updateLastSend

            sentCount++; //更新发送次数

            channel.write(ChannelBuffers.wrappedBuffer(p.bb)); //通过nio channel发送字节缓存到服务端

        }

    createBB

    public void createBB() {

                try {

                    ByteArrayOutputStream baos = new ByteArrayOutputStream();

                    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);

                    boa.writeInt(-1, "len"); // We'll fill this in later

                    if (requestHeader != null) {

                        requestHeader.serialize(boa, "header"); //序列化header头(requestHeader)

                    }

                    if (request instanceof ConnectRequest) {

                        request.serialize(boa, "connect");

                        // append "am-I-allowed-to-be-readonly" flag

                        boa.writeBool(readOnly, "readOnly");

                    } else if (request != null) {

                        request.serialize(boa, "request"); //序列化request(request)

                    }

                    baos.close();

                    this.bb = ByteBuffer.wrap(baos.toByteArray());

                    this.bb.putInt(this.bb.capacity() - 4);

                    this.bb.rewind();

                } catch (IOException e) {

                    LOG.warn("Ignoring unexpected exception", e);

                }

            }

    从createBB方法中,我们看到在底层实际的网络传输序列化中,zookeeper只会讲requestHeader和request两个属性进行序列化,即只有这两个会被序列化到底层字节数组中去进行网络传输,不会将watchRegistration相关的信息进行网络传输。

    总结

    用户调用exists注册监听以后,会做几个事情

    1. 讲请求数据封装为packet,添加到outgoingQueue
    2. SendThread这个线程会执行数据发送操作,主要是将outgoingQueue队列中的数据发送到服务端
    3. 通过clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); 其中ClientCnxnSocket只zookeeper客户端和服务端的连接通信的封装,有两个具体的实现类ClientCnxnSocketNetty和ClientCnxnSocketNIO;具体使用哪一个类来实现发送,是在初始化过程是在实例化Zookeeper的时候设置的,代码如下

    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),

                    hostProvider, sessionTimeout, this, watchManager,

                    getClientCnxnSocket(), canBeReadOnly);

    private ClientCnxnSocket getClientCnxnSocket() throws IOException {

            String clientCnxnSocketName = getClientConfig().getProperty(

                    ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);

            if (clientCnxnSocketName == null) {

                clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();

            }

            try {

                Constructor<?> clientCxnConstructor =

    Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);

                ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());

                return clientCxnSocket;

            } catch (Exception e) {

                IOException ioe = new IOException("Couldn't instantiate "

                        + clientCnxnSocketName);

                ioe.initCause(e);

                throw ioe;

            }

        }

    4.基于第3步,最终会在ClientCnxnSocketNetty方法中执行sendPkt将请求的数据包发送到服务端

    对Java技术,架构技术感兴趣的同学,欢迎加QQ群619881427,一起学习,相互讨论。

    群内已经有小伙伴将知识体系整理好(源码,笔记,PPT,学习视频),欢迎加群免费领取。

    分享给喜欢Java,喜欢编程,有梦想成为架构师的程序员们,希望能够帮助到你们。

    不是Java程序员也没关系,帮忙转发给更多朋友!谢谢。

    分享一个小技巧点击阅读原文也可以轻松获取到学习资料哦!!

  • 相关阅读:
    解决Jenkins上git出现的“ERROR: Error fetching remote repo 'origin'”问题
    安装loadround时WebTours打不开的解决办法
    使用的postman心得
    IO流
    正则表达式用例
    一个纸杯该如何测试
    https与http的区别
    IO流操作之字符输入输出流简单操作
    手机APP测试获取上下文
    Appium第二天
  • 原文地址:https://www.cnblogs.com/xueSpring/p/9379228.html
Copyright © 2011-2022 走看看