zoukankan      html  css  js  c++  java
  • zookeeper之服务端接收请求处理流程

    服务端有一个 NIOServerCnxn 类,用来处理客户端发送过来的请求
    NIOServerCnxn
    ZookeeperServer-zks.processPacket(this, bb);
    处理客户端传送过来的数据包
     
    public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    		// We have the request, now process and setup for next
    		InputStream bais = new ByteBufferInputStream(incomingBuffer);
    		BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
    		RequestHeader h = new RequestHeader();
    		h.deserialize(bia, "header"); // 反序列化客户端 header 头信息
    		// Through the magic of byte buffers, txn will not be
    		// pointing
    		// to the start of the txn
    		incomingBuffer = incomingBuffer.slice();
    		if (h.getType() == OpCode.auth) { // 判断当前操作类型,如果是 auth操作,则执行下面的代码
    			LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
    			AuthPacket authPacket = new AuthPacket();
    			ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
    			String scheme = authPacket.getScheme();
    			ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
    			Code authReturn = KeeperException.Code.AUTHFAILED;
    			if (ap != null) {
    				try {
    					authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn),
    							authPacket.getAuth());
    				} catch (RuntimeException e) {
    					LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);
    					authReturn = KeeperException.Code.AUTHFAILED;
    				}
    			}
    			if (authReturn == KeeperException.Code.OK) {
    				if (LOG.isDebugEnabled()) {
    					LOG.debug("Authentication succeeded for scheme: " + scheme);
    				}
    				LOG.info("auth success " + cnxn.getRemoteSocketAddress());
    				ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
    				cnxn.sendResponse(rh, null, null);
    			} else {
    				if (ap == null) {
    					LOG.warn("No authentication provider for scheme: " + scheme + " has "
    							+ ProviderRegistry.listProviders());
    				} else {
    					LOG.warn("Authentication failed for scheme: " + scheme);
    				}
    				// send a response...
    				ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
    				cnxn.sendResponse(rh, null, null);
    				// ... and close connection
    				cnxn.sendBuffer(ServerCnxnFactory.closeConn);
    				cnxn.disableRecv();
    			}
    			return;
    		} else { // 如果不是授权操作,再判断是否为 sasl 操作
    			if (h.getType() == OpCode.sasl) {
    				Record rsp = processSasl(incomingBuffer, cnxn);
    				ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
    				cnxn.sendResponse(rh, rsp, "response"); // not sure about 3rd
    														// arg..what is it?
    				return;
    			} else {// 最终进入这个代码块进行处理
    				// 封装请求对象
    				Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer,
    						cnxn.getAuthInfo());
    				si.setOwner(ServerCnxn.me);
    				// Always treat packet from the client as a possible
    				// local request.
    				setLocalSessionFlag(si);
    				submitRequest(si); // 提交请求
    			}
    		}
    		cnxn.incrOutstandingRequests(h);
    	}
    
    submitRequest
    负责在服务端提交当前请求
    public void submitRequest(Request si) {
    		if (firstProcessor == null) { // processor 处理器,request
    										// 过来以后会经历一系列处理器的处理过程
    			synchronized (this) {
    				try {
    					// Since all requests are passed to the request
    					// processor it should wait for setting up the request
    					// processor chain. The state will be updated to RUNNING
    					// after the setup.
    					while (state == State.INITIAL) {
    						wait(1000);
    					}
    				} catch (InterruptedException e) {
    					LOG.warn("Unexpected interruption", e);
    				}
    				if (firstProcessor == null || state != State.RUNNING) {
    					throw new RuntimeException("Not started");
    				}
    			}
    		}
    		try {
    			touch(si.cnxn);
    			boolean validpacket = Request.isValid(si.type); // 判断是否合法
    			if (validpacket) {
    				firstProcessor.processRequest(si); // 调用 firstProcessor发起请求,而这个
    													// firstProcess
    													// 是一个接口,有多个实现类,具体的调用链是怎么样的?往下看吧
    				if (si.cnxn != null) {
    					incInProcess();
    				}
    			} else {
    				LOG.warn("Received packet at server of unknown type " + si.type);
    				new UnimplementedRequestProcessor().processRequest(si);
    			}
    		} catch (MissingSessionException e) {
    			if (LOG.isDebugEnabled()) {
    				LOG.debug("Dropping request: " + e.getMessage());
    			}
    		} catch (RequestProcessorException e) {
    			LOG.error("Unable to process request:" + e.getMessage(), e);
    		}
    	}
    
    firstProcessor 的请求链组成
      1. firstProcessor 的初始化是在 ZookeeperServer 的 setupRequestProcessor 中完成的,代码如下:
    protected void setupRequestProcessors() {
    	RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    	RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
    	((SyncRequestProcessor) syncProcessor).start();
    	firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    	// 需要注意的是,PrepRequestProcessor 中传递的是一个 syncProcessor
    	((PrepRequestProcessor) firstProcessor).start();
    } 
    从上面我们可以看到 firstProcessor 的实例是一个 PrepRequestProcessor,而这个
    构造方法中又传递了一个 Processor 构成了一个调用链。
    RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
    
    而 syncProcessor 的构造方法传递的又是一个 Processor,对应的是FinalRequestProcessor
      1. 所以整个调用链是 PrepRequestProcessor -> SyncRequestProcessor ->FinalRequestProcessor
     
    PredRequestProcessor.processRequest(si);
    通过上面了解到调用链关系以后,我们继续再看 firstProcessor.processRequest(si);会调用到 PrepRequestProcessor
    public void processRequest(Request request) {
         submittedRequests.add(request);
    }
    
    唉,很奇怪,processRequest 只是把 request 添加到 submittedRequests 中,根据前面的经验,很自然的想到这里又是一个异步操作。而 subittedRequests 又是一个阻塞队列
    LinkedBlockingQueue submittedRequests = new LinkedBlockingQueue();
    
    而 PrepRequestProcessor 这个类又继承了线程类,因此我们直接找到当前类中的run 方法如下
    public void run() {
    		try {
    			while (true) {
    				Request request = submittedRequests.take(); // ok,从队列中拿到请求进行处理
    				long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
    				if (request.type == OpCode.ping) {
    					traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
    				}
    				if (LOG.isTraceEnabled()) {
    					ZooTrace.logRequest(LOG, traceMask, 'P', request, " ");
    				}
    				if (Request.requestOfDeath == request) {
    					break;
    				}
    				pRequest(request); // 调用 pRequest 进行预处理
    			}
    		} catch (RequestProcessorException e) {
    			if (e.getCause() instanceof XidRolloverException) {
    				LOG.info(e.getCause().getMessage());
    			}
    			handleException(this.getName(), e);
    		} catch (Exception e) {
    			handleException(this.getName(), e);
    		}
    		LOG.info("PrepRequestProcessor exited loop!");
    	}
    
     
    pRequest
    预处理这块的代码太长,就不好贴了。前面的 N 行代码都是根据当前的 OP 类型进行判断和做相应的处理,在这个方法中的最后一行中,我们会看到如下代码
    nextProcessor.processRequest(request);
    SyncRequestProcessor. processRequest
    public void processRequest(Request request) {
     // request.addRQRec(">sync");
         queuedRequests.add(request);
    }
    
    这个方法的代码也是一样,基于异步化的操作,把请求添加到 queuedRequets 中,那么我们继续在当前类找到 run 方法
    public void run() {
    		try {
    			int logCount = 0;
    			// we do this in an attempt to ensure that not all of the servers
    			// in the ensemble take a snapshot at the same time
    			int randRoll = r.nextInt(snapCount / 2);
    			while (true) {
    				Request si = null;
    				// 从阻塞队列中获取请求
    				if (toFlush.isEmpty()) {
    					si = queuedRequests.take();
    				} else {
    					si = queuedRequests.poll();
    					if (si == null) {
    						flush(toFlush);
    						continue;
    					}
    				}
    				if (si == requestOfDeath) {
    					break;
    				}
    				if (si != null) {
    					// track the number of records written to the log
    					// 下面这块代码,粗略看来是触发快照操作,启动一个处理快照的线程
    					if (zks.getZKDatabase().append(si)) {
    						logCount++;
    						if (logCount > (snapCount / 2 + randRoll)) {
    							randRoll = r.nextInt(snapCount / 2);
    							// roll the log
    							zks.getZKDatabase().rollLog();
    							// take a snapshot
    							if (snapInProcess != null && snapInProcess.isAlive()) {
    								LOG.warn("Too busy to snap, skipping");
    							} else {
    								snapInProcess = new ZooKeeperThread("Snapshot Thread") {
    									public void run() {
    										try {
    											zks.takeSnapshot();
    										} catch (Exception e) {
    											LOG.warn("Unexpected exception", e);
    										}
    									}
    								};
    								snapInProcess.start();
    							}
    							logCount = 0;
    						}
    					} else if (toFlush.isEmpty()) {
    						// optimization for read heavy workloads
    						// iff this is a read, and there are no pending
    						// flushes (writes), then just pass this to the next
    						// processor
    						if (nextProcessor != null) {
    							nextProcessor.processRequest(si); // 继续调用下一个处理器来处理请求
    							if (nextProcessor instanceof Flushable) {
    								((Flushable) nextProcessor).flush();
    							}
    						}
    						continue;
    					}
    					toFlush.add(si);
    					if (toFlush.size() > 1000) {
    						flush(toFlush);
    					}
    				}
    			}
    		} catch (Throwable t) {
    			handleException(this.getName(), t);
    		} finally {
    			running = false;
    		}
    		LOG.info("SyncRequestProcessor exited!");
    	}
    
    FinalRequestProcessor. processRequest
    这个方法就是我们在课堂上分析到的方法了,FinalRequestProcessor.processRequest 方法并根据 Request 对象中的操作更新内存中 Session 信息或者 znode 数据。这块代码有小 300 多行,就不全部贴出来了,我们直接定位到关键代码,根据客户端的 OP 类型找到如下的代码:
    case OpCode.exists:
    	{
    	     lastOp = "EXIS";
    	     // TODO we need to figure out the security requirement for this!
    		 ExistsRequest existsRequest = new ExistsRequest();
    		 //反序列化 (将 ByteBuffer 反序列化成为 ExitsRequest.这个就是我们在客户端发起请求的时候传递过来的 Request 对象
    		 ByteBufferInputStream.byteBuffer2Record(request.request,existsRequest);
    		 String path = existsRequest.getPath(); //得到请求的路径
    		 if (path.indexOf('') != -1) {
    			 throw new KeeperException.BadArgumentsException();
    		 }
    	      //终于找到一个很关键的代码,判断请求的 getWatch 是否存在,如果存在,则传递 cnxn(servercnxn)
    	      //对于 exists 请求,需要监听 data 变化事件,添加 watcher 
    	 	 Stat stat = zks.getZKDatabase().statNode(path, existsRe
    		 quest.getWatch() ? cnxn : null);
    	 	 rsp = new ExistsResponse(stat); //在服务端内存数据库中根据路径得到结果进行组装,设置为 ExistsResponse
    	 	 break;
    	 }
    

     

  • 相关阅读:
    JS 可选链操作符?. 空值合并运算符?? 详解,更精简的安全取值与默认值设置小技巧
    手写一个 Promise
    Leetcode 403 青蛙过河 DP
    Leeetcode 221 最大正方形 DP
    Leetcode 139 单词拆分
    Unity周记: 2021.07.26-08.15
    Unity周记: 2021.07.19-07.25
    Unity周记: 2020.07.12-07.18
    Unity周记: 2020.07.05-07.11
    线性规划
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13568094.html
Copyright © 2011-2022 走看看