zoukankan      html  css  js  c++  java
  • zookeeper源码分析之四服务端(单机)处理请求流程

    上文:

    zookeeper源码分析之一服务端启动过程

    中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析各自一下消息处理过程:

      前文可以看到在

    1.在单机情况下NettyServerCnxnFactory中启动ZookeeperServer来处理消息:

        public synchronized void startup() {
            if (sessionTracker == null) {
                createSessionTracker();
            }
            startSessionTracker();
            setupRequestProcessors();
    
            registerJMX();
    
            state = State.RUNNING;
            notifyAll();
        }

    消息处理器的调用如下:

        protected void setupRequestProcessors() {
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                    finalProcessor);
            ((SyncRequestProcessor)syncProcessor).start();
            firstProcessor = new PrepRequestProcessor(this, syncProcessor);
            ((PrepRequestProcessor)firstProcessor).start();
        }

    我们看到启动两个消息处理器来处理请求:第一个同步消息处理器预消息服务器,最后一个同步请求处理器和异步请求处理器。

      1.1  第一个消息服务器处理器预消息服务器PrepRequestProcessor

      

     @Override
        public void run() {
            try {
                while (true) {
                    Request request = submittedRequests.take();
                    long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                    if (request.type == OpCode.ping) {
                        traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                    }
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                    }
                    if (Request.requestOfDeath == request) {
                        break;
                    }
                    pRequest(request);
                }
            } catch (RequestProcessorException e) {
                if (e.getCause() instanceof XidRolloverException) {
                    LOG.info(e.getCause().getMessage());
                }
                handleException(this.getName(), e);
            } catch (Exception e) {
                handleException(this.getName(), e);
            }
            LOG.info("PrepRequestProcessor exited loop!");
        }

     可以看到,while(true)是一个一直循环处理的过程,其中红色的部分为处理的主体。

    /**
         * This method will be called inside the ProcessRequestThread, which is a
         * singleton, so there will be a single thread calling this code.
         *
         * @param request
         */
        protected void pRequest(Request request) throws RequestProcessorException {
            // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
            // request.type + " id = 0x" + Long.toHexString(request.sessionId));
            request.setHdr(null);
            request.setTxn(null);
    
            try {
                switch (request.type) {
                case OpCode.createContainer:
                case OpCode.create:
                case OpCode.create2:
                    CreateRequest create2Request = new CreateRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                    break;
                case OpCode.deleteContainer:
                case OpCode.delete:
                    DeleteRequest deleteRequest = new DeleteRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
                    break;
                case OpCode.setData:
                    SetDataRequest setDataRequest = new SetDataRequest();                
                    pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
                    break;
                case OpCode.reconfig:
                    ReconfigRequest reconfigRequest = new ReconfigRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
                    pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
                    break;
                case OpCode.setACL:
                    SetACLRequest setAclRequest = new SetACLRequest();                
                    pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
                    break;
                case OpCode.check:
                    CheckVersionRequest checkRequest = new CheckVersionRequest();              
                    pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
                    break;
                case OpCode.multi:
                    MultiTransactionRecord multiRequest = new MultiTransactionRecord();
                    try {
                        ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                    } catch(IOException e) {
                        request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
                                Time.currentWallTime(), OpCode.multi));
                        throw e;
                    }
                    List<Txn> txns = new ArrayList<Txn>();
                    //Each op in a multi-op must have the same zxid!
                    long zxid = zks.getNextZxid();
                    KeeperException ke = null;
    
                    //Store off current pending change records in case we need to rollback
                    Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
    
                    for(Op op: multiRequest) {
                        Record subrequest = op.toRequestRecord();
                        int type;
                        Record txn;
    
                        /* If we've already failed one of the ops, don't bother
                         * trying the rest as we know it's going to fail and it
                         * would be confusing in the logfiles.
                         */
                        if (ke != null) {
                            type = OpCode.error;
                            txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                        }
    
                        /* Prep the request and convert to a Txn */
                        else {
                            try {
                                pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                                type = request.getHdr().getType();
                                txn = request.getTxn();
                            } catch (KeeperException e) {
                                ke = e;
                                type = OpCode.error;
                                txn = new ErrorTxn(e.code().intValue());
    
                                LOG.info("Got user-level KeeperException when processing "
                                        + request.toString() + " aborting remaining multi ops."
                                        + " Error Path:" + e.getPath()
                                        + " Error:" + e.getMessage());
    
                                request.setException(e);
    
                                /* Rollback change records from failed multi-op */
                                rollbackPendingChanges(zxid, pendingChanges);
                            }
                        }
    
                        //FIXME: I don't want to have to serialize it here and then
                        //       immediately deserialize in next processor. But I'm
                        //       not sure how else to get the txn stored into our list.
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                        txn.serialize(boa, "request") ;
                        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
    
                        txns.add(new Txn(type, bb.array()));
                    }
    
                    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                            Time.currentWallTime(), request.type));
                    request.setTxn(new MultiTxn(txns));
    
                    break;
    
                //create/close session don't require request record
                case OpCode.createSession:
                case OpCode.closeSession:
                    if (!request.isLocalSession()) {
                        pRequest2Txn(request.type, zks.getNextZxid(), request,
                                     null, true);
                    }
                    break;
    
                //All the rest don't need to create a Txn - just verify session
                case OpCode.sync:
                case OpCode.exists:
                case OpCode.getData:
                case OpCode.getACL:
                case OpCode.getChildren:
                case OpCode.getChildren2:
                case OpCode.ping:
                case OpCode.setWatches:
                case OpCode.checkWatches:
                case OpCode.removeWatches:
                    zks.sessionTracker.checkSession(request.sessionId,
                            request.getOwner());
                    break;
                default:
                    LOG.warn("unknown type " + request.type);
                    break;
                }
            } catch (KeeperException e) {
                if (request.getHdr() != null) {
                    request.getHdr().setType(OpCode.error);
                    request.setTxn(new ErrorTxn(e.code().intValue()));
                }
                LOG.info("Got user-level KeeperException when processing "
                        + request.toString()
                        + " Error Path:" + e.getPath()
                        + " Error:" + e.getMessage());
                request.setException(e);
            } catch (Exception e) {
                // log at error level as we are returning a marshalling
                // error to the user
                LOG.error("Failed to process " + request, e);
    
                StringBuilder sb = new StringBuilder();
                ByteBuffer bb = request.request;
                if(bb != null){
                    bb.rewind();
                    while (bb.hasRemaining()) {
                        sb.append(Integer.toHexString(bb.get() & 0xff));
                    }
                } else {
                    sb.append("request buffer is null");
                }
    
                LOG.error("Dumping request buffer: 0x" + sb.toString());
                if (request.getHdr() != null) {
                    request.getHdr().setType(OpCode.error);
                    request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
                }
            }
            request.zxid = zks.getZxid();
            nextProcessor.processRequest(request);
        }

    排除异常的逻辑,该方法是处理不同类型的request,根据type选择一个处理分支,ProcessRequestThread内部调用该方法,它是单例的,因此只有一个单线程调用此代码。以create请求为例(红色部分),了解工作机制:

                    CreateRequest createRequest = (CreateRequest)record;
                    if (deserialize) {
                        ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
                    }
                    CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
                    validateCreateRequest(createMode, request);
                    String path = createRequest.getPath();
                    String parentPath = validatePathForCreate(path, request.sessionId);
    
                    List<ACL> listACL = fixupACL(path, request.authInfo, createRequest.getAcl());
                    ChangeRecord parentRecord = getRecordForPath(parentPath);
    
                    checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
                    int parentCVersion = parentRecord.stat.getCversion();
                    if (createMode.isSequential()) {
                        path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
                    }
                    validatePath(path, request.sessionId);
                    try {
                        if (getRecordForPath(path) != null) {
                            throw new KeeperException.NodeExistsException(path);
                        }
                    } catch (KeeperException.NoNodeException e) {
                        // ignore this one
                    }
                    boolean ephemeralParent = (parentRecord.stat.getEphemeralOwner() != 0) &&
                            (parentRecord.stat.getEphemeralOwner() != DataTree.CONTAINER_EPHEMERAL_OWNER);
                    if (ephemeralParent) {
                        throw new KeeperException.NoChildrenForEphemeralsException(path);
                    }
                    int newCversion = parentRecord.stat.getCversion()+1;
                    if (type == OpCode.createContainer) {
                        request.setTxn(new CreateContainerTxn(path, createRequest.getData(), listACL, newCversion));
                    } else {
                        request.setTxn(new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(),
                                newCversion));
                    }
                    StatPersisted s = new StatPersisted();
                    if (createMode.isEphemeral()) {
                        s.setEphemeralOwner(request.sessionId);
                    }
                    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
                    parentRecord.childCount++;
                    parentRecord.stat.setCversion(newCversion);
                    addChangeRecord(parentRecord);
                    addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
                    break;
                

    调用方法,处理变化:

        private void addChangeRecord(ChangeRecord c) {
            synchronized (zks.outstandingChanges) {
                zks.outstandingChanges.add(c);
                zks.outstandingChangesForPath.put(c.path, c);
            }
        }

    继续向下

        private void addChangeRecord(ChangeRecord c) {
            synchronized (zks.outstandingChanges) {
                zks.outstandingChanges.add(c);
                zks.outstandingChangesForPath.put(c.path, c);
            }
        }

    其中:outstandingChanges 是一组ChangeRecord,outstandingChangesForPath是map的ChangeRecord,如下定义:

    final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
    // this data structure must be accessed under the outstandingChanges lock
    final HashMap<String, ChangeRecord> outstandingChangesForPath =
    new HashMap<String, ChangeRecord>();

    ChangeRecord是一个数据结构,方便PrepRP和FinalRp共享信息。

            ChangeRecord(long zxid, String path, StatPersisted stat, int childCount,
                    List<ACL> acl) {
                this.zxid = zxid;
                this.path = path;
                this.stat = stat;
                this.childCount = childCount;
                this.acl = acl;
            }

      1.2 先看一下同步请求处理器FinalRequestProcessor,这个请求处理器实际上应用到一个请求的所有事务,针对任何查询提供服务。它通常处于请求处理的最后(不会有下一个消息处理器),故此得名。 它是如何处理请求呢?

    public void processRequest(Request request) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Processing request:: " + request);
            }
            // request.addRQRec(">final");
            long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
            if (request.type == OpCode.ping) {
                traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
            }
            if (LOG.isTraceEnabled()) {
                ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
            }
            ProcessTxnResult rc = null;
            synchronized (zks.outstandingChanges) {
                // Need to process local session requests
                rc = zks.processTxn(request);
    
                // request.hdr is set for write requests, which are the only ones
                // that add to outstandingChanges.
                if (request.getHdr() != null) {
                    TxnHeader hdr = request.getHdr();
                    Record txn = request.getTxn();
                    long zxid = hdr.getZxid();
                    while (!zks.outstandingChanges.isEmpty()
                           && zks.outstandingChanges.get(0).zxid <= zxid) {
                        ChangeRecord cr = zks.outstandingChanges.remove(0);
                        if (cr.zxid < zxid) {
                            LOG.warn("Zxid outstanding " + cr.zxid
                                     + " is less than current " + zxid);
                        }
                        if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                            zks.outstandingChangesForPath.remove(cr.path);
                        }
                    }
                }
    
                // do not add non quorum packets to the queue.
                if (request.isQuorum()) {
                    zks.getZKDatabase().addCommittedProposal(request);
                }
            }
    
            // ZOOKEEPER-558:
            // In some cases the server does not close the connection (e.g., closeconn buffer
            // was not being queued — ZOOKEEPER-558) properly. This happens, for example,
            // when the client closes the connection. The server should still close the session, though.
            // Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
            if (request.type == OpCode.closeSession && connClosedByClient(request)) {
                // We need to check if we can close the session id.
                // Sometimes the corresponding ServerCnxnFactory could be null because
                // we are just playing diffs from the leader.
                if (closeSession(zks.serverCnxnFactory, request.sessionId) ||
                        closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
                    return;
                }
            }
    
            if (request.cnxn == null) {
                return;
            }
            ServerCnxn cnxn = request.cnxn;
    
            String lastOp = "NA";
            zks.decInProcess();
            Code err = Code.OK;
            Record rsp = null;
            try {
                if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
                    /*
                     * When local session upgrading is disabled, leader will
                     * reject the ephemeral node creation due to session expire.
                     * However, if this is the follower that issue the request,
                     * it will have the correct error code, so we should use that
                     * and report to user
                     */
                    if (request.getException() != null) {
                        throw request.getException();
                    } else {
                        throw KeeperException.create(KeeperException.Code
                                .get(((ErrorTxn) request.getTxn()).getErr()));
                    }
                }
    
                KeeperException ke = request.getException();
                if (ke != null && request.type != OpCode.multi) {
                    throw ke;
                }
    
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{}",request);
                }
                switch (request.type) {
                case OpCode.ping: {
                    zks.serverStats().updateLatency(request.createTime);
    
                    lastOp = "PING";
                    cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                            request.createTime, Time.currentElapsedTime());
    
                    cnxn.sendResponse(new ReplyHeader(-2,
                            zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
                    return;
                }
                case OpCode.createSession: {
                    zks.serverStats().updateLatency(request.createTime);
    
                    lastOp = "SESS";
                    cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                            request.createTime, Time.currentElapsedTime());
    
                    zks.finishSessionInit(request.cnxn, true);
                    return;
                }
                case OpCode.multi: {
                    lastOp = "MULT";
                    rsp = new MultiResponse() ;
    
                    for (ProcessTxnResult subTxnResult : rc.multiResult) {
    
                        OpResult subResult ;
    
                        switch (subTxnResult.type) {
                            case OpCode.check:
                                subResult = new CheckResult();
                                break;
                            case OpCode.create:
                                subResult = new CreateResult(subTxnResult.path);
                                break;
                            case OpCode.create2:
                            case OpCode.createContainer:
                                subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
                                break;
                            case OpCode.delete:
                            case OpCode.deleteContainer:
                                subResult = new DeleteResult();
                                break;
                            case OpCode.setData:
                                subResult = new SetDataResult(subTxnResult.stat);
                                break;
                            case OpCode.error:
                                subResult = new ErrorResult(subTxnResult.err) ;
                                break;
                            default:
                                throw new IOException("Invalid type of op");
                        }
    
                        ((MultiResponse)rsp).add(subResult);
                    }
    
                    break;
                }
                case OpCode.create: {
                    lastOp = "CREA";
                    rsp = new CreateResponse(rc.path);
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.create2:
                case OpCode.createContainer: {
                    lastOp = "CREA";
                    rsp = new Create2Response(rc.path, rc.stat);
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.delete:
                case OpCode.deleteContainer: {
                    lastOp = "DELE";
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.setData: {
                    lastOp = "SETD";
                    rsp = new SetDataResponse(rc.stat);
                    err = Code.get(rc.err);
                    break;
                }           
                case OpCode.reconfig: {
                    lastOp = "RECO";               
                    rsp = new GetDataResponse(((QuorumZooKeeperServer)zks).self.getQuorumVerifier().toString().getBytes(), rc.stat);
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.setACL: {
                    lastOp = "SETA";
                    rsp = new SetACLResponse(rc.stat);
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.closeSession: {
                    lastOp = "CLOS";
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.sync: {
                    lastOp = "SYNC";
                    SyncRequest syncRequest = new SyncRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            syncRequest);
                    rsp = new SyncResponse(syncRequest.getPath());
                    break;
                }
                case OpCode.check: {
                    lastOp = "CHEC";
                    rsp = new SetDataResponse(rc.stat);
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.exists: {
                    lastOp = "EXIS";
                    // TODO we need to figure out the security requirement for this!
                    ExistsRequest existsRequest = new ExistsRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            existsRequest);
                    String path = existsRequest.getPath();
                    if (path.indexOf('') != -1) {
                        throw new KeeperException.BadArgumentsException();
                    }
                    Stat stat = zks.getZKDatabase().statNode(path, existsRequest
                            .getWatch() ? cnxn : null);
                    rsp = new ExistsResponse(stat);
                    break;
                }
                case OpCode.getData: {
                    lastOp = "GETD";
                    GetDataRequest getDataRequest = new GetDataRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            getDataRequest);
                    DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                    if (n == null) {
                        throw new KeeperException.NoNodeException();
                    }
                    Long aclL;
                    synchronized(n) {
                        aclL = n.acl;
                    }
                    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
                            ZooDefs.Perms.READ,
                            request.authInfo);
                    Stat stat = new Stat();
                    byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                            getDataRequest.getWatch() ? cnxn : null);
                    rsp = new GetDataResponse(b, stat);
                    break;
                }
                case OpCode.setWatches: {
                    lastOp = "SETW";
                    SetWatches setWatches = new SetWatches();
                    // XXX We really should NOT need this!!!!
                    request.request.rewind();
                    ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
                    long relativeZxid = setWatches.getRelativeZxid();
                    zks.getZKDatabase().setWatches(relativeZxid,
                            setWatches.getDataWatches(),
                            setWatches.getExistWatches(),
                            setWatches.getChildWatches(), cnxn);
                    break;
                }
                case OpCode.getACL: {
                    lastOp = "GETA";
                    GetACLRequest getACLRequest = new GetACLRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            getACLRequest);
                    Stat stat = new Stat();
                    List<ACL> acl =
                        zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
                    rsp = new GetACLResponse(acl, stat);
                    break;
                }
                case OpCode.getChildren: {
                    lastOp = "GETC";
                    GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            getChildrenRequest);
                    DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
                    if (n == null) {
                        throw new KeeperException.NoNodeException();
                    }
                    Long aclG;
                    synchronized(n) {
                        aclG = n.acl;
    
                    }
                    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
                            ZooDefs.Perms.READ,
                            request.authInfo);
                    List<String> children = zks.getZKDatabase().getChildren(
                            getChildrenRequest.getPath(), null, getChildrenRequest
                                    .getWatch() ? cnxn : null);
                    rsp = new GetChildrenResponse(children);
                    break;
                }
                case OpCode.getChildren2: {
                    lastOp = "GETC";
                    GetChildren2Request getChildren2Request = new GetChildren2Request();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            getChildren2Request);
                    Stat stat = new Stat();
                    DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());
                    if (n == null) {
                        throw new KeeperException.NoNodeException();
                    }
                    Long aclG;
                    synchronized(n) {
                        aclG = n.acl;
                    }
                    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
                            ZooDefs.Perms.READ,
                            request.authInfo);
                    List<String> children = zks.getZKDatabase().getChildren(
                            getChildren2Request.getPath(), stat, getChildren2Request
                                    .getWatch() ? cnxn : null);
                    rsp = new GetChildren2Response(children, stat);
                    break;
                }
                case OpCode.checkWatches: {
                    lastOp = "CHKW";
                    CheckWatchesRequest checkWatches = new CheckWatchesRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            checkWatches);
                    WatcherType type = WatcherType.fromInt(checkWatches.getType());
                    boolean containsWatcher = zks.getZKDatabase().containsWatcher(
                            checkWatches.getPath(), type, cnxn);
                    if (!containsWatcher) {
                        String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
                                new Object[] { checkWatches.getPath(), type });
                        throw new KeeperException.NoWatcherException(msg);
                    }
                    break;
                }
                case OpCode.removeWatches: {
                    lastOp = "REMW";
                    RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            removeWatches);
                    WatcherType type = WatcherType.fromInt(removeWatches.getType());
                    boolean removed = zks.getZKDatabase().removeWatch(
                            removeWatches.getPath(), type, cnxn);
                    if (!removed) {
                        String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
                                new Object[] { removeWatches.getPath(), type });
                        throw new KeeperException.NoWatcherException(msg);
                    }
                    break;
                }
                }
            } catch (SessionMovedException e) {
                // session moved is a connection level error, we need to tear
                // down the connection otw ZOOKEEPER-710 might happen
                // ie client on slow follower starts to renew session, fails
                // before this completes, then tries the fast follower (leader)
                // and is successful, however the initial renew is then
                // successfully fwd/processed by the leader and as a result
                // the client and leader disagree on where the client is most
                // recently attached (and therefore invalid SESSION MOVED generated)
                cnxn.sendCloseSession();
                return;
            } catch (KeeperException e) {
                err = e.code();
            } catch (Exception e) {
                // log at error level as we are returning a marshalling
                // error to the user
                LOG.error("Failed to process " + request, e);
                StringBuilder sb = new StringBuilder();
                ByteBuffer bb = request.request;
                bb.rewind();
                while (bb.hasRemaining()) {
                    sb.append(Integer.toHexString(bb.get() & 0xff));
                }
                LOG.error("Dumping request buffer: 0x" + sb.toString());
                err = Code.MARSHALLINGERROR;
            }
    
            long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
            ReplyHeader hdr =
                new ReplyHeader(request.cxid, lastZxid, err.intValue());
    
            zks.serverStats().updateLatency(request.createTime);
            cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
                        request.createTime, Time.currentElapsedTime());
    
            try {
                cnxn.sendResponse(hdr, rsp, "response");
                if (request.type == OpCode.closeSession) {
                    cnxn.sendCloseSession();
                }
            } catch (IOException e) {
                LOG.error("FIXMSG",e);
            }
        }

       第一步,根据共享的outstandingChanges,

    先处理事务后处理session:

    private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
                                            Record txn) {
            ProcessTxnResult rc;
            int opCode = request != null ? request.type : hdr.getType();
            long sessionId = request != null ? request.sessionId : hdr.getClientId();
            if (hdr != null) {
                rc = getZKDatabase().processTxn(hdr, txn);
            } else {
                rc = new ProcessTxnResult();
            }
            if (opCode == OpCode.createSession) {
                if (hdr != null && txn instanceof CreateSessionTxn) {
                    CreateSessionTxn cst = (CreateSessionTxn) txn;
                    sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
                } else if (request != null && request.isLocalSession()) {
                    request.request.rewind();
                    int timeout = request.request.getInt();
                    request.request.rewind();
                    sessionTracker.addSession(request.sessionId, timeout);
                } else {
                    LOG.warn("*****>>>>> Got "
                            + txn.getClass() + " "
                            + txn.toString());
                }
            } else if (opCode == OpCode.closeSession) {
                sessionTracker.removeSession(sessionId);
            }
            return rc;
        }

    处理事务,本地和数据库的不同分支, DataTree创建节点

                        CreateTxn createTxn = (CreateTxn) txn;
                        rc.path = createTxn.getPath();
                        createNode(
                                createTxn.getPath(),
                                createTxn.getData(),
                                createTxn.getAcl(),
                                createTxn.getEphemeral() ? header.getClientId() : 0,
                                createTxn.getParentCVersion(),
                                header.getZxid(), header.getTime(), null);
                        break;

    新增一个节点的逻辑是:

     /**
         * Add a new node to the DataTree.
         * @param path
         *               Path for the new node.
         * @param data
         *            Data to store in the node.
         * @param acl
         *            Node acls
         * @param ephemeralOwner
         *            the session id that owns this node. -1 indicates this is not
         *            an ephemeral node.
         * @param zxid
         *            Transaction ID
         * @param time
         * @param outputStat
         *               A Stat object to store Stat output results into.
         * @throws NodeExistsException 
         * @throws NoNodeException 
         * @throws KeeperException
         */
        public void createNode(final String path, byte data[], List<ACL> acl,
                long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat)
                throws KeeperException.NoNodeException,
                KeeperException.NodeExistsException {
            int lastSlash = path.lastIndexOf('/');
            String parentName = path.substring(0, lastSlash);
            String childName = path.substring(lastSlash + 1);
            StatPersisted stat = new StatPersisted();
            stat.setCtime(time);
            stat.setMtime(time);
            stat.setCzxid(zxid);
            stat.setMzxid(zxid);
            stat.setPzxid(zxid);
            stat.setVersion(0);
            stat.setAversion(0);
            stat.setEphemeralOwner(ephemeralOwner);
            DataNode parent = nodes.get(parentName);
            if (parent == null) {
                throw new KeeperException.NoNodeException();
            }
            synchronized (parent) {
                Set<String> children = parent.getChildren();
                if (children != null && children.contains(childName)) {
                    throw new KeeperException.NodeExistsException();
                }
    
                if (parentCVersion == -1) {
                    parentCVersion = parent.stat.getCversion();
                    parentCVersion++;
                }
                parent.stat.setCversion(parentCVersion);
                parent.stat.setPzxid(zxid);
                Long longval = convertAcls(acl);
                DataNode child = new DataNode(data, longval, stat);
                parent.addChild(childName);
                nodes.put(path, child);
                if (ephemeralOwner == CONTAINER_EPHEMERAL_OWNER) {
                    containers.add(path);
                } else if (ephemeralOwner != 0) {
                    HashSet<String> list = ephemerals.get(ephemeralOwner);
                    if (list == null) {
                        list = new HashSet<String>();
                        ephemerals.put(ephemeralOwner, list);
                    }
                    synchronized (list) {
                        list.add(path);
                    }
                }
                if (outputStat != null) {
                    child.copyStat(outputStat);
                }
            }
            // now check if its one of the zookeeper node child
            if (parentName.startsWith(quotaZookeeper)) {
                // now check if its the limit node
                if (Quotas.limitNode.equals(childName)) {
                    // this is the limit node
                    // get the parent and add it to the trie
                    pTrie.addPath(parentName.substring(quotaZookeeper.length()));
                }
                if (Quotas.statNode.equals(childName)) {
                    updateQuotaForPath(parentName
                            .substring(quotaZookeeper.length()));
                }
            }
            // also check to update the quotas for this node
            String lastPrefix = getMaxPrefixWithQuota(path);
            if(lastPrefix != null) {
                // ok we have some match and need to update
                updateCount(lastPrefix, 1);
                updateBytes(lastPrefix, data == null ? 0 : data.length);
            }
            dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
            childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                    Event.EventType.NodeChildrenChanged);
        }

    最后的逻辑是触发创建节点和子节点改变事件。

        Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
            WatchedEvent e = new WatchedEvent(type,
                    KeeperState.SyncConnected, path);
            HashSet<Watcher> watchers;
            synchronized (this) {
                watchers = watchTable.remove(path);
                if (watchers == null || watchers.isEmpty()) {
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logTraceMessage(LOG,
                                ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                "No watchers for " + path);
                    }
                    return null;
                }
                for (Watcher w : watchers) {
                    HashSet<String> paths = watch2Paths.get(w);
                    if (paths != null) {
                        paths.remove(path);
                    }
                }
            }
            for (Watcher w : watchers) {
                if (supress != null && supress.contains(w)) {
                    continue;
                }
                w.process(e);
            }
            return watchers;
        }

    WatcherManager调用定义的watcher进行事件处理。

      1.3. 再看异步消息处理器SyncRequestProcessor

    @Override
        public void run() {
            try {
                int logCount = 0;
    
                // we do this in an attempt to ensure that not all of the servers
                // in the ensemble take a snapshot at the same time
                int randRoll = r.nextInt(snapCount/2);
                while (true) {
                    Request si = null;
                    if (toFlush.isEmpty()) {
                        si = queuedRequests.take();
                    } else {
                        si = queuedRequests.poll();
                        if (si == null) {
                            flush(toFlush);
                            continue;
                        }
                    }
                    if (si == requestOfDeath) {
                        break;
                    }
                    if (si != null) {
                        // track the number of records written to the log
                        if (zks.getZKDatabase().append(si)) {
                            logCount++;
                            if (logCount > (snapCount / 2 + randRoll)) {
                                randRoll = r.nextInt(snapCount/2);
                                // roll the log
                                zks.getZKDatabase().rollLog();
                                // take a snapshot
                                if (snapInProcess != null && snapInProcess.isAlive()) {
                                    LOG.warn("Too busy to snap, skipping");
                                } else {
                                    snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                            public void run() {
                                                try {
                                                    zks.takeSnapshot();
                                                } catch(Exception e) {
                                                    LOG.warn("Unexpected exception", e);
                                                }
                                            }
                                        };
                                    snapInProcess.start();
                                }
                                logCount = 0;
                            }
                        } else if (toFlush.isEmpty()) {
                            // optimization for read heavy workloads
                            // iff this is a read, and there are no pending
                            // flushes (writes), then just pass this to the next
                            // processor
                            if (nextProcessor != null) {
                                nextProcessor.processRequest(si);
                                if (nextProcessor instanceof Flushable) {
                                    ((Flushable)nextProcessor).flush();
                                }
                            }
                            continue;
                        }
                        toFlush.add(si);
                        if (toFlush.size() > 1000) {
                            flush(toFlush);
                        }
                    }
                }
            } catch (Throwable t) {
                handleException(this.getName(), t);
            } finally{
                running = false;
            }
            LOG.info("SyncRequestProcessor exited!");
        }

       异步处理日志和快照,启动ZooKeeperThread线程来生成快照。

        public void takeSnapshot(){
            try {
                txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
            } catch (IOException e) {
                LOG.error("Severe unrecoverable error, exiting", e);
                // This is a severe error that we cannot recover from,
                // so we need to exit
                System.exit(10);
            }
        }

    FileTxnSnapLog是个工具类,帮助处理txtlog和snapshot。

     /**
         * save the datatree and the sessions into a snapshot
         * @param dataTree the datatree to be serialized onto disk
         * @param sessionsWithTimeouts the sesssion timeouts to be
         * serialized onto disk
         * @throws IOException
         */
        public void save(DataTree dataTree,
                ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
            throws IOException {
            long lastZxid = dataTree.lastProcessedZxid;
            File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
            LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
                    snapshotFile);
            snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
    
        }

    持久化为文件

        /**
         * serialize the datatree and session into the file snapshot
         * @param dt the datatree to be serialized
         * @param sessions the sessions to be serialized
         * @param snapShot the file to store snapshot into
         */
        public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
                throws IOException {
            if (!close) {
                OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
                CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32());
                //CheckedOutputStream cout = new CheckedOutputStream()
                OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
                FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
                serialize(dt,sessions,oa, header);
                long val = crcOut.getChecksum().getValue();
                oa.writeLong(val, "val");
                oa.writeString("/", "path");
                sessOS.flush();
                crcOut.close();
                sessOS.close();
            }
        }

    至此,整个流程已经走完。

      2. 集群情况下

     集群情况和单机略有不同,集群中使用QuorumPeer来启动ServerCnxnFactory,绑定本地地址

        @Override
        public void start() {
            LOG.info("binding to port " + localAddress);
            parentChannel = bootstrap.bind(localAddress);
        }

    限于篇幅,后面的逻辑将在下篇中详细描述。

     

    小结

      从上面的代码流程中,我们可以看出服务器处理请求要么通过Noi要不通过框架Netty来处理请求,请求通过先通过PrepRequestProcessor接收请求,并进行包装,然后请求类型的不同,设置同享数据;然后通过SyncRequestProcessor来序列化快照和事务日志,并根据命令类型改变db的内容,在日志和快照没有写入前不会进行下一个消息处理器;最后调用FinalRequestProcessor来作为消息处理器的终结者,发送响应消息,并触发watcher的处理程序 。

  • 相关阅读:
    Java偏向锁浅析
    Activity 之 Launch Mode
    ActionBarSherlock & ViewPagerIndicator
    JNI 和 NDK 介绍
    Android 应用缓存技术提高程序性能
    设计模式之抽象工厂模式
    设计模式之工厂方法模式
    Android 2.3 AsyncQueryHandler Cursor内存泄漏问题
    ubuntu系统右键菜单添加【当前路径打开终端】选项
    设计模式之单例模式
  • 原文地址:https://www.cnblogs.com/davidwang456/p/5001244.html
Copyright © 2011-2022 走看看