zoukankan      html  css  js  c++  java
  • zookeeper server处理客户端命令的流程

    zk server处理命令涉及到3个类,2个线程:一个命令请求先后经过PrepRequestProcessor,SyncRequestProcessor,FinalRequestProcessor。

    PrepRequestProcessor类对应线程ProcessThread,SyncRequestProcessor类对应线程SyncThread。

    在命令到达PrepRequestProcessor之前,还有一段路程:

    //ZooKeeperServer
    public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        // We have the request, now process and setup for next
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        RequestHeader h = new RequestHeader();
        h.deserialize(bia, "header");
        // Through the magic of byte buffers, txn will not be
        // pointing
        // to the start of the txn
        incomingBuffer = incomingBuffer.slice();
        if (h.getType() == OpCode.auth) {
            ...
            return;
        } else {
            if (h.getType() == OpCode.sasl) {
                Record rsp = processSasl(incomingBuffer,cnxn);
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
                cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
            }
            else { //默认走这个分支
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                  h.getType(), incomingBuffer, cnxn.getAuthInfo());
                si.setOwner(ServerCnxn.me);
                submitRequest(si);
            }
        }
        cnxn.incrOutstandingRequests(h);
    }
    
    public void submitRequest(Request si) {
        //省略其他代码
        touch(si.cnxn);
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            LOG.warn("Received packet at server of unknown type " + si.type);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    }

    进firstProcessor

    //PrepRequestProcessor类片段
    public class PrepRequestProcessor extends Thread implements RequestProcessor {
        LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
        RequestProcessor nextProcessor;
    
        public void processRequest(Request request) {
           //这个方法只是把请求加入到阻塞队列中
           submittedRequests.add(request);
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    //从队列中取出请求
                    Request request = submittedRequests.take();
                    long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                    if (request.type == OpCode.ping) {
                        traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                    }
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                    }
                    if (Request.requestOfDeath == request) {
                        break;
                    }
                    //处理请求,给写请求设置hdr,读请求的hdr为null
                    pRequest(request);
                }
            } catch (InterruptedException e) {
                LOG.error("Unexpected interruption", e);
            } catch (RequestProcessorException e) {
                if (e.getCause() instanceof XidRolloverException) {
                    LOG.info(e.getCause().getMessage());
                }
                LOG.error("Unexpected exception", e);
            } catch (Exception e) {
                LOG.error("Unexpected exception", e);
            }
            LOG.info("PrepRequestProcessor exited loop!");
        }
        protected void pRequest(Request request) throws RequestProcessorException {
            //代码较长,只抓重要逻辑
            request.hdr = null;
            request.txn = null;
            ……
            request.zxid = zks.getZxid();
            //这个nextProcessor为SyncRequestProcessor,这个时候请求就到了SyncRequestProcessor的队列中了
            nextProcessor.processRequest(request);
        }
    }

    进入SyncRequestProcessor后:

    //SyncRequestProcessor代码片段
    public class SyncRequestProcessor extends Thread implements RequestProcessor {
        private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
        private final RequestProcessor nextProcessor;
        
        public void processRequest(Request request) {
            // request.addRQRec(">sync");
            queuedRequests.add(request);
        }
        
        //猜个大致原则吧
        //优先处理queuedRequests中的请求
        //处理写请求,先写log日志,然后存入toFlush列表
        //当queuedRequests中没有数据时,处理toFlush列表
        //或者toFlush.size() > 1000时,处理toFlush列表
        @Override
        public void run() {
            try {
                int logCount = 0;
    
                // we do this in an attempt to ensure that not all of the servers
                // in the ensemble take a snapshot at the same time
                setRandRoll(r.nextInt(snapCount/2));
                while (true) {
                    Request si = null;
                    if (toFlush.isEmpty()) {
                        si = queuedRequests.take();
                    } else {
                        si = queuedRequests.poll();
                        if (si == null) {
                            flush(toFlush);
                            continue;
                        }
                    }
                    if (si == requestOfDeath) {
                        break;
                    }
                    if (si != null) {
                        // track the number of records written to the log
                        // zks.getZKDatabase().append(si) 对于写请求,改调用返回true,读请求返回false
                        // 写请求有hdr,读请求hdr为null
                        if (zks.getZKDatabase().append(si)) {
                            logCount++;
                            if (logCount > (snapCount / 2 + randRoll)) {
                                randRoll = r.nextInt(snapCount/2);
                                // roll the log
                                zks.getZKDatabase().rollLog();
                                // take a snapshot
                                if (snapInProcess != null && snapInProcess.isAlive()) {
                                    LOG.warn("Too busy to snap, skipping");
                                } else {
                                    snapInProcess = new Thread("Snapshot Thread") {
                                            public void run() {
                                                try {
                                                    zks.takeSnapshot();
                                                } catch(Exception e) {
                                                    LOG.warn("Unexpected exception", e);
                                                }
                                            }
                                        };
                                    snapInProcess.start();
                                }
                                logCount = 0;
                            }
                        } else if (toFlush.isEmpty()) {
                            //toFlush列表为空时,读请求进入该分支
                            // optimization for read heavy workloads
                            // iff this is a read, and there are no pending
                            // flushes (writes), then just pass this to the next
                            // processor
                            if (nextProcessor != null) {
                                nextProcessor.processRequest(si);
                                if (nextProcessor instanceof Flushable) {
                                    ((Flushable)nextProcessor).flush();
                                }
                            }
                            continue;
                        }
                        toFlush.add(si);
                        if (toFlush.size() > 1000) {
                            flush(toFlush);
                        }
                    }
                }
            } catch (Throwable t) {
                LOG.error("Severe unrecoverable error, exiting", t);
                running = false;
                System.exit(11);
            }
            LOG.info("SyncRequestProcessor exited!");
        }
    }
  • 相关阅读:
    mysql 查询优化 ~ select count 知多少
    mongodb 案例 ~ 经典故障案例
    printk 驱动调试
    21天学通C++学习笔记(七):函数
    OPC UA
    MQTT
    分库分表
    水平、垂直权限问题(横向越权与纵向越权)
    数据库中的行转列和列转行
    面试知识点
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8042809.html
Copyright © 2011-2022 走看看