zoukankan      html  css  js  c++  java
  • zookeeper(三)

    zookeeper源码分析

    下载zookeeper源码,这次git上下载的版本是3.4.13。因为是使用ant构建的,下载到本地后构建比较麻烦。所以取巧的使用了maven关联zookeeper对应版本并且关联maven上对应版本的源码的形式查看源码。这种方式是依赖的源码文件只读,不能在源码上面进行编辑。
    相关流程图:

    客户端启动,我们查看zkCli.sh 文件中内容发现入口ZooKeeperMain

    if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
      . "$ZOOBINDIR"/../libexec/zkEnv.sh
    else
      . "$ZOOBINDIR"/zkEnv.sh
    fi
    
    "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" 
         -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS 
         org.apache.zookeeper.ZooKeeperMain "$@"
    
    

    查看main方法

     public static void main(String args[])
            throws KeeperException, IOException, InterruptedException
        {
            ZooKeeperMain main = new ZooKeeperMain(args);
            main.run();@6
        }
    
    public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
            cl.parseOptions(args);//命令行输入参数解析
            System.out.println("Connecting to " + cl.getOption("server"));
            connectToZK(cl.getOption("server"));//连接到指定服务器
            //zk = new ZooKeeper(cl.getOption("server"),
            //Integer.parseInt(cl.getOption("timeout")), new MyWatcher());
        }
    protected void connectToZK(String newHost) throws InterruptedException, IOException {
            if (zk != null && zk.getState().isAlive()) {
                zk.close();
            }
            host = newHost;
            boolean readOnly = cl.getOption("readonly") != null;
            zk = new ZooKeeper(host,
                     Integer.parseInt(cl.getOption("timeout")),
                     new MyWatcher(), readOnly);
        }
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                boolean canBeReadOnly)
            throws IOException
        {
          ...
            //服务地址信息加入一个list并且打乱顺序
            HostProvider hostProvider = new StaticHostProvider(
                    connectStringParser.getServerAddresses());
            //创建一个ClientCnxnSocket对象(socketNIO)并且创建了两个线程sendThread = new SendThread(clientCnxnSocket)、eventThread = new EventThread()
            cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                    hostProvider, sessionTimeout, this, watchManager,
                    getClientCnxnSocket(), canBeReadOnly);
            cnxn.start();//启动上面的两个线程
        }
    SendThread.run(){...
        while (state.isAlive()) {
              try {
                   if (!clientCnxnSocket.isConnected()) {...
                        startConnect(serverAddress);//@1
                        clientCnxnSocket.updateLastSendAndHeard();//记录一些时间
                    ...}
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);@3
    }
    @1 startConnect{...
        logStartConnect(addr);
        clientCnxnSocket.connect(addr);//调用createSock()创建socketChannnel @2
    ...}
    @2 SocketChannel createSock() throws IOException {
            SocketChannel sock;
            sock = SocketChannel.open();
            sock.configureBlocking(false);
            sock.socket().setSoLinger(false, -1);
            sock.socket().setTcpNoDelay(true);
            return sock;
    }
    @3 doTransport{...
        synchronized (this) {
        selected = selector.selectedKeys();
         for (SelectionKey k : selected) {
                SocketChannel sc = ((SocketChannel) k.channel());
                if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                    if (sc.finishConnect()) {
                        updateLastSendAndHeard();//一些时间记录
                        sendThread.primeConnection();@4
                    }
                } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                    doIO(pendingQueue, outgoingQueue, cnxn);@5
                }
            }
            }
    ...}
    @4 primeConnection{...//将request包装成Packet放入outgoingQueue队列中
            ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
                synchronized (outgoingQueue) {
                    SetWatches sw = new SetWatches(setWatchesLastZxid,
                                        dataWatchesBatch,
                                        existWatchesBatch,
                                        childWatchesBatch);
                     RequestHeader h = new RequestHeader();
                     h.setType(ZooDefs.OpCode.setWatches);
                     h.setXid(-8);
                     Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
                     outgoingQueue.addFirst(packet);
                    ...
                    for (AuthData id : authInfo) {
                        outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                                OpCode.auth), null, new AuthPacket(0, id.scheme,
                                id.data), null, null));
                    }
                    outgoingQueue.addFirst(new Packet(null, null, conReq,
                                null, null, readOnly));
            }
    ...}
    @5 doIO{...//将outgoingQueue中的packet传输到服务端  outgoingQueue待发送队列 pendingQueue已发送等待结果队列
        if (sockKey.isReadable()) {int rc = sock.read(incomingBuffer);...
        //获取连接结果 连接成功回调下面这个方法
             readConnectResult(){...
                    sendThread.onConnected{...
                        eventThread.queueEvent(new WatchedEvent(
                        Watcher.Event.EventType.None,
                        eventState, null));...}
             enableRead()
    
        if (sockKey.isWritable()) {...
            Packet p = findSendablePacket{...
            ListIterator<Packet> iter = outgoingQueue.listIterator();
                while (iter.hasNext()) {
                    Packet p = iter.next();
                    if (p.requestHeader == null) {
                        // We've found the priming-packet. Move it to the beginning of the queue.
                        iter.remove();
                        outgoingQueue.add(0, p);//将packet放入队列
                ...
                p.createBB();
                sock.write(p.bb);//将请求写出去
            }
    
    ...}
    
    @6 run{...
            try {
                    Class<?> consoleC = Class.forName("jline.ConsoleReader");
                  ...
                    Method readLine = consoleC.getMethod("readLine", String.class);
                    while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
                        executeLine(line);
    ...}
    
    public void executeLine(String line)//执行命令并加入history
        throws InterruptedException, IOException, KeeperException {
          if (!line.equals("")) {
            cl.parseCommand(line);
            addToHistory(commandCount,line);
            processCmd(cl);
            commandCount++;
          }
        }
    protected boolean processCmd(MyCommandOptions co)
            throws KeeperException, IOException, InterruptedException
        {
            try {
                return processZKCmd(co){...
                if (cmd.equals("quit")) {
                if (cmd.equals("create") && args.length >= 3) {
                    String newPath = zk.create(path, args[first+2].getBytes(), acl,
                        flags){...
                                ReplyHeader r = cnxn.submitRequest(h, request, response, null);//提交request请求{...
                                    Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);//封装request到outgoingQueue中然后同步等待结果
                                        synchronized (packet) {
                                            while (!packet.finished) {
                                                packet.wait();
                                                   ...
    }
    

    服务端分析同理查看zkServer.sh启动入口:QuorumPeerMain

     if [ "x$JMXPORT" = "x" ]
      then
        # for some reason these two options are necessary on jdk6 on Ubuntu
        #   accord to the docs they are not necessary, but otw jconsole cannot
        #   do a local attach
        ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
      else
    
     public static void main(String[] args) {
            QuorumPeerMain main = new QuorumPeerMain();
            try {
                main.initializeAndRun(args);...
    
    initializeAndRun{
            QuorumPeerConfig config = new QuorumPeerConfig();
            if (args.length == 1) {
                config.parse(args[0]);//加载文件配置
            }...
             if (args.length == 1 && config.servers.size() > 0) {
                runFromConfig(config);
            } else {
                // there is only server in the quorum -- run as standalone
                ZooKeeperServerMain.main(args);@7
    ...}
    
    runFromConfig{...
                cnxnFactory = ServerCnxnFactory.createFactory();
                cnxnFactory.configure(config.getClientPortAddress(),
                        config.getMaxClientCnxns());
                cnxnFactory.startup(zkServer);
    ...}
    @7 main{
            ZooKeeperServerMain main = new ZooKeeperServerMain();
            try {
                main.initializeAndRun(args);
    
    initializeAndRun{...
        runFromConfig(config){...
             txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
                        config.dataDir));
           cnxnFactory.startup(zkServer);
    ...}
    startup{
            start();//启动NIOServerCnxnFactory.run() @8
            setZooKeeperServer(zks);
            zks.startdata();
            zks.startup();
        }
    startdata() {//从事务日志中加载数据到内存
            //check to see if zkDb is not null
            if (zkDb == null) {
                zkDb = new ZKDatabase(this.txnLogFactory);
            }  
            if (!zkDb.isInitialized()) {
                loadData();
            }
        }
    loadData{...
         if(zkDb.isInitialized()){
                setZxid(zkDb.getDataTreeLastProcessedZxid());
            }
            else {
                setZxid(zkDb.loadDataBase())->snapLog.restore()->snapLog.deserialize(dt, sessions)//从快照中反序列化数据到dataTree;fastForwardFromEdits(){...
                FileTxnLog txnLog = new FileTxnLog(dataDir);
                TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);...//读取快照最大事务id+1...
                try {
                        processTransaction{...//从快照(不是每次事务都会打快照)和事务日志中重新执行事务导出数据到内存(服务突然宕机等就可以恢复)
                            case OpCode.createSession:               
                                  rc = dt.processTxn(hdr, txn){
                                        switch (header.getType()) {
                                        case OpCode.create:
                                          CreateTxn createTxn = (CreateTxn) txn;
                                          rc.path = createTxn.getPath();
                                          createNode(...
                                  }
                                  break;
                            case OpCode.closeSession:
                      }
            ...}
            }
    ...}
    FinalRequestProcessor#processRequest{...//committedLog就是用来保存议案的列表 每次持久化会把保存到committedLog
    if (Request.isQuorum(request.type)) {
                    zks.getZKDatabase().addCommittedProposal(request){...
                          if (committedLog.size() > commitLogCount) {
                          committedLog.removeFirst();
                          minCommittedLog = committedLog.getFirst().packet.getZxid();
                      }
                      if (committedLog.size() == 0) {
                          minCommittedLog = request.zxid;
                          maxCommittedLog = request.zxid;
                byte[] data = SerializeUtils.serializeRequest(request);
                QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
                Proposal p = new Proposal();
                p.packet = pp;
                p.request = request;
                committedLog.add(p);
                maxCommittedLog = p.packet.getZxid();
                ...}
    ...}
    
  • 相关阅读:
    九、Shell 流程控制
    八、Shell test 命令
    七、Shell printf 命令
    六、Shell echo命令
    五、Shell 基本运算符
    四、Shell 数组
    三、Shell 传递参数
    二、Shell 变量
    一、Shell 教程
    KVM 介绍(1):简介及安装
  • 原文地址:https://www.cnblogs.com/leifonlyone/p/12826303.html
Copyright © 2011-2022 走看看