zoukankan      html  css  js  c++  java
  • zookeeper原理解析-客户端与服务器端交互

    Zookeeper集群中server数量总是确定的,所以集群中的server交互采用比较可靠的bio长连接模型;不同于集群中sever间交互zookeeper客户端其实数量是未知的,为了提高zookeeper并发性能,zookeeper客户端与服务器端交互采用nio模型。下面我们主要来讲讲zookeeper的服务器端与客户端的交互。读者对nio不了解的话不妨抽点时间去了解下,对于一些nio框架如netty,mina再如一些web容器如tomcat,jetty底层都实现一套nio框架,对于实现nio框架模型大家不妨去谷歌百度搜一下Doug Lea的scalable io in Java 这个ppt。

    客户端

    ClientCnxnSocketNIO是zookeeper的nio通讯层的客户端部分,下面伪代码示例其核心代码:

    ClientCnxnSocketNIO{

          doTransport() {

                   if (如果之前连接没有立马连上,则在这里处理OP_CONNECT事件) {

                       sendThread.primeConnection();

                   } else {

                        doIO

                    }

     

                 //队列中有发送的消息, 开启写

            }

     

           doIO() {

                  if (sockKey.isReadable()) {

                        sendThread.readResponse(incomingBuffer);

                        updateLastHeard();

                   } 

     

                   if(sockKey.isWritable()) {

                          Packetp = outgoingQueue.getFirst() //从发送队列取

                          updateLastSend

                          p.requestHeader.setXid(cnxn.getXid());//设置客户端的xid

                         序列化

                         发送

                         从发送队列删除

                         加入到pendingQueue队列

                    }

            }

    }

    ClientCnxn 是客户端操作ClientCnxnSocketNIO的工具,维护了发送任务线程SendThread,事件任务线程EventThead, 发送队列OutgoingQueue以及请求消息的等待队列PendingQueue。下面以伪代码来示例其核心代码

    ClientCnxn {

        outgoingQueue//待向服务器端发送的队列, 客户端提交请求放入这个队列

        pendingQueue //发送以后等待响应的队列,

        

        submitRequest(){

           //client端一个封装成一个packet

            outgoingQueue.add(packet);

            selector.wakeup();

            packet.wait(); //如果是同步调用wait,应该反馈后会

        }

     

       SendThread {

           run() {

               1.设置clientCnxnSocket 最后发送时间,最后的心跳时间

               2. if(!clientCnxnSocket.isConnected()) {

                        startConnect()  //主要工作clientCnxnSocket做

                  } else {

                        计算下次ping的时间, 发送心跳

                      委托给 clientCnxnSocket.doTranspor进行底层的nio传输

                   } 

             }

     

            primeConnection(){

               //构建ConnectRequest

               //组合成通讯层的Packet对象,添加到发送队列,对于ConnectRequest其requestHeader为null

                outgoingQueue.addFirst

                clientCnxnSocket.enableReadWriteOnly();//确保读写事件都监听 

            }

     

            readResponse(){

               1.先读响应头,先对特殊的xid进行处理

               2. packet = pendingQueue.remove() //由于client和server都是单线程处理,多队列处理,所以认为全局有序

               3. 反序列化响应体response, 并设置到packet上

               4.finishPacket 1)同步notifyAll,结束 2)异步加入到event线程的队列

            }

        }

     

        EventThread{ //主要支持异步的回调

           run() {

     

            }

        }

    }

    大家观察客户端操作类Zookeeper里面的操作类主要分为两个参数不带callback的同步方法和参数带callback的异步方法。

    1.      同步调用方法实现类似Future同步转异步模式实现

    1)  Client提交请求对象封装成packet对象放入OutgoingQueue队列,并调用packet.wait()阻塞当前线程。

    2)  每个Client都只有一个SendThread线程是线性处理OutgoingQueue中的请求消息的,SendThread线程通过ClientCnxnSocketNIO工具顺序从OutgoingQueue队列中取请求消息发送到服务器端,同时将请求packet加入到PendingQueue中

    3)  ClientCnxnSocketNIO工具接收处理服务器端响应

    4)  从PendingQueue队列取出对应的packet,并调packet.notifyAll()唤醒阻塞的线程完成同步调用

    2.      异步调用的总体流程跟同步类似关键区别在于

    1)  向OutgoingQueue队列提交请求后,不会调用packet.wait()阻塞当前线程,主流程继续执行

    2)  同同步调用

    3)  同同步调用

    4)  从PendingQueue队列取出对应的packet,并调packet.callback方法完成回调处理

    Zookeeper服务器端

    NIOServerCnxnFactory工厂类,zookeeperserver用来启动监听客户端连接,每当有客户端请求连接进来,NIOServerCnxnFactory都会为这个链接构建NIOServerCnxn 实例来单独处理与这个客户端的交互

    NIOServerCnxn封装了处理读取客户端请求数据与及向客户端响应数据

    下面通过伪代码来实例:

    NIOServerCnxnFactory  {

       configure {

            绑定端口

            作为server监听

            注册selectkey 的连接时间

        }

        run {  //起到accept的作用

           1. OP_ACCEPT, 将NIOServerCnxn(handler) attach到selectkey以便被读写事件使用

           2. OP_READ 和 OP_WRITE取出handler NIOServerCnxn,并调doIo

        }

    }

    NIOServerCnxn {

        构造器 {

            //设置selectkey对read感兴趣

        }

        doIo {

            if(k.isReadable()) {

                 1. 读前四个字节, 代表请求内容长度,不包括自己的4字节

                 2. 读取到字节数组中

                 3. zkServer.processPacket()或者zkServer.processConnectRequest()

            }

            if(k.isWritable()) {

                  1.从outgoingBuffers取ByteBuffer

                  2.发送bytes

            }

        }

    }

  • 相关阅读:
    jquery How can I recurse up a DOM tree? Stack Overflow
    分享:RazorSQL 6.0 发布,数据库客户端工具
    recursion How to write a simple preorder DOM tree traversal algorithm in jQuery? Stack Overflow
    循环、迭代、遍历和递归
    JB´s trash: Recursive DOM walk with Dojo
    JavaScript Lab Articles Nonrecursive Preorder Traversal Part 1
    JQuery. Parse XML children recursively. How? Stack Overflow
    QDomNode Class Reference
    分享:Guacamole 0.7.1 发布,基于Web的VNC客户端
    How to traverse dom elements in raw JavaScript? Stack Overflow
  • 原文地址:https://www.cnblogs.com/wxd0108/p/6251767.html
Copyright © 2011-2022 走看看