zoukankan      html  css  js  c++  java
  • zookeeper客户端源码分析

    zookeeper客户端源码分析

    zookeeper简介

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    使用案例

    dubbo、disconf、elastic-job、分布式锁、leader选举

    客户端代码解析

    接下来的代码可以从的github项目找到https://github.com/yaojf/zookeeper-learn
    这里我从Client的main方法开始解析(前提启动了Server,即zk服务端),调用点为ZooKeeperMain.main(args),以下代码都是从这个调用到开始。

    • 构造ZooKeeperMain

      默认的连接地址是localhost:2181,会话超时为30000毫秒,核心代码

       zk = new ZooKeeper(host,
                   Integer.parseInt(cl.getOption("timeout")),
                   new MyWatcher(), readOnly);
      

      参数依次为zk服务器地址,会话超时时间,默认的观察者类,还有是否只读。
      ZooKeeper构造方法里,初始化watchManager(处理各种观察,比如dubbo服务提供者监听),解析服务器地址,默认的根地址,构造ClientCnxn

      cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                   hostProvider, sessionTimeout, this, watchManager,
                   getClientCnxnSocket(), canBeReadOnly);
      

      第六个参数默认为ClientCnxnSocketNIO(jdk nio连接),核心为构造2个线程

       sendThread = new SendThread(clientCnxnSocket);
           eventThread = new EventThread();
      

      SendThread线程处理异步连接,获取初始化sessionId,发送心跳,处理读写IO,断开重连等功能。EventThread线程处理事件监听,它通过不停的获取waitingEvents同步队列的数据,做Watcher的处理(比如连接成功的事件)。
      最后就是调用

      cnxn.start();
      

      启动SendThread和EventThread线程。

    • SendThread线程功能

      1. 发起异步连接

        if (!clientCnxnSocket.isConnected()) {
                        // don't re-establish connection if we are closing
                        if (closing) {
                            break;
                        }
                        startConnect();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
        

        startConnect方法随机选取一个zk服务器地址发起异步连接, 然后更新最后的发送和接受的时间戳(用于判断是否到心跳的发送时间)。

      2. 计算selector的select方法的等待时间

        	if (state.isConnected()) {  
                                       // determine whether we need to send an AuthFailed event. 
                            to = readTimeout - clientCnxnSocket.getIdleRecv();
                        } else {
                            to = connectTimeout - clientCnxnSocket.getIdleRecv();
                        }
        

        如果还未连接则用connectTimeout计算,否则用readTimeout计算,这2个参数是在构造ClientCnxn对象时设置的,和sessionTimeout相关

           connectTimeout = sessionTimeout / hostProvider.size();
           readTimeout = sessionTimeout * 2 / 3;
        
      3. 处理事件选择

        clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        

        pendingQueue为等待结果的队列,outgoingQueue为发送请求的队列。
        内部逻辑为调用selector.select(waitTimeOut),获取激活的事件,并处理,或 者超时跳出。

      void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    updateSocketAddresses();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            synchronized(outgoingQueue) {
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    enableWrite();
                }
            }
        }
        selected.clear();
      

      }
      ```

        如果是连接事件,则完成连接,并往outgoingQueue放入ConnectRequest,然后设置与服务端连接的sockKey感兴趣事件为读写。
        如果是读写事件,则走doIO方法。该方法判断如果是读事件激活则读取数据,这里读数据分2个步骤,第一步读4字节的数据长度,然后新建对应长度的ByteBuffer分配给incomingBuffer(读取数据用),首次读取数据会初始化连接。
      
         ```
      

      else if (!initialized) {
      readConnectResult();
      enableRead();
      if (findSendablePacket(outgoingQueue,
      cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
      // Since SASL authentication has completed (if client is configured to do so),
      // outgoing packets waiting in the outgoingQueue can now be sent.
      enableWrite();
      }
      lenBuffer.clear();
      incomingBuffer = lenBuffer;
      updateLastHeard();
      initialized = true;
      }
      ```

        读取服务端返回结果,获取服务端分配的sessionId,并且ClientCnxn的连接状态为States.CONNECTED,然后发送连接成功事件(EventThread处理默认的监听器回调)。
      
        不是首次连接则是普通的读数据,反序列化数据,从pendingQueue获取数据(处理完会remove掉),然后notify阻塞在Packet的数据请求。
      
        如果是写事件激活,则从outgoingQueue获取请求Packet,然后发送数据到服务端,如果不是ping请求,则把Packet重新放到pendingQueue.
      
         
         ```
      

      sock.write(p.bb);
      if (!p.bb.hasRemaining()) {
      sentCount++;
      outgoingQueue.removeFirstOccurrence(p);
      if (p.requestHeader != null
      && p.requestHeader.getType() != OpCode.ping
      && p.requestHeader.getType() != OpCode.auth) {
      synchronized (pendingQueue) {
      pendingQueue.add(p);
      }
      }
      }
      ```

    • EventThread线程功能

      事件处理线程功能比较简单,通过waitingEvents这个LinkedBlockingQueue不停的抓取数据,处理事件监听(zk默认的事件监听都是一次性的,使用一次后会从map里面去除,但是一些框架比如zkClient等他自己封装了非一次性监听的逻辑)。

    • ZooKeeperMain的run方法

      这个run方法是从控制台不停的读取请求,然后解释器解释请求,合成对应的请求到zk服务端,我们可以从MyCommandOptions对象里看到所有支持的请求命令,比如LsCommand。具体不同的命令我们可以自行debug,做了解。

    总结

    ZooKeeper会随机连接一个服务端,然后通过主线程操做ZooKeeper的命令请求方法,配合sendThread和eventThread完成我们对服务端的各种请求。

    备注

    下一篇博客会分享zookeeper服务端源码分析,尽情期待。

  • 相关阅读:
    [C#/.NET]Entity Framework(EF) Code First 多对多关系的实体增,删,改,查操作全程详细示例
    玩转Asp.net MVC 的八个扩展点
    float实例讲解
    C#高性能TCP服务的多种实现方式
    如何把SQLServer数据库从高版本降级到低版本?
    ASP.NET MVC Area使用-将Area设置成独立项目
    如何使用ping和tracert命令测试网站访问速度
    ASP.NET MVC 入门10、Action Filter 与 内置的Filter实现(实例-防盗链)
    MVC Action Filter
    c#中单元测试
  • 原文地址:https://www.cnblogs.com/yaojf/p/10902483.html
Copyright © 2011-2022 走看看