zoukankan      html  css  js  c++  java
  • ZooKeeper源码分析(二)

      上一节分析了ZooKeeper的部分代码,下面我们看看客户端网络连接器的部分代码

    /**
       这个类管理客户端的socket I/O。ClientCnxn维护一个可用服务器列表可以根据需要透明地切换服务器
     *
     */
    public class ClientCnxn {
        private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class);
    
        private static final String ZK_SASL_CLIENT_USERNAME =
            "zookeeper.sasl.client.username";
    
        /** 客户端在会话重连接时自动复位监视器,这个操作允许客户端通过设置环境变量zookeeper.disableAutoWatchReset=true来关闭这个行为
    	 */
        private static boolean disableAutoWatchReset;
        static {
            disableAutoWatchReset =
                Boolean.getBoolean("zookeeper.disableAutoWatchReset");
            if (LOG.isDebugEnabled()) {
                LOG.debug("zookeeper.disableAutoWatchReset is "
                        + disableAutoWatchReset);
            }
        }
    
        static class AuthData {
            AuthData(String scheme, byte data[]) {
                this.scheme = scheme;
                this.data = data;
            }
    
            String scheme;
    
            byte data[];
        }
    	
        private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();
    
        /**
    	 *哪些已经发送出去的目前正在等待响应的包
         */
        private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
    
        /**
         * 那些需要发送的包
         */
        private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
    
    	// 超时时间
        private int connectTimeout;
    
        /**
    	 *客户端与服务器协商的超时时间,以毫秒为单位。这是真正的超时时间,而不是客户端的超时请求
         */
        private volatile int negotiatedSessionTimeout;
    	
    	// 读取超时时间
        private int readTimeout;
    
    	// 会话超时时间
        private final int sessionTimeout;
    
    	// ZooKeeper
        private final ZooKeeper zooKeeper;
    
    	//客户端监视器管理器
        private final ClientWatchManager watcher;
    
    	//会话ID
        private long sessionId;
    
    	//会话密钥
        private byte sessionPasswd[] = new byte[16];
    
        // 是否只读
        private boolean readOnly;
    
        final String chrootPath;
       // 发送线程
        final SendThread sendThread;
    	// 事件回调线程
        final EventThread eventThread;
    
        /**
         * Set to true when close is called. Latches the connection such that we
         * don't attempt to re-connect to the server if in the middle of closing the
         * connection (client sends session disconnect to server as part of close
         * operation)
         */
        private volatile boolean closing = false;
        
        /**
    	  一组客户端可以连接的Zk主机
         */
        private final HostProvider hostProvider;
    
        /**
    	 * 第一次和读写服务器建立连接时设置为true,之后不再改变。
    	   这个值用来处理客户端没有sessionId连接只读模式服务器的场景.
    	   客户端从只读服务器收到一个假的sessionId,这个sessionId对于其他服务器是无效的。所以
    	   当客户端寻找一个读写服务器时,它在连接握手时发送0代替假的sessionId,建立一个新的,有效的会话
    	   如果这个属性是false(这就意味着之前没有找到过读写服务器)则表示非0的sessionId是假的否则就是有效的 
         */
        volatile boolean seenRwServerBefore = false;
    
    
        public ZooKeeperSaslClient zooKeeperSaslClient;
    
        public long getSessionId() {
            return sessionId;
        }
    
        public byte[] getSessionPasswd() {
            return sessionPasswd;
        }
    
        public int getSessionTimeout() {
            return negotiatedSessionTimeout;
        }
    
        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
    
            SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
            SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
            sb
                .append("sessionid:0x").append(Long.toHexString(getSessionId()))
                .append(" local:").append(local)
                .append(" remoteserver:").append(remote)
                .append(" lastZxid:").append(lastZxid)
                .append(" xid:").append(xid)
                .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
                .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
                .append(" queuedpkts:").append(outgoingQueue.size())
                .append(" pendingresp:").append(pendingQueue.size())
                .append(" queuedevents:").append(eventThread.waitingEvents.size());
    
            return sb.toString();
        }
    
       
    
        /**
         * 创建一个连接对象。真正的网路连接直到需要的时候才建立。start()方法在执行构造方法后一定要调用
         * 这个构造方法在ZooKeeper的初始化时调用,用于初始化一个客户端网路管理器
         */
        public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
                ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
                throws IOException {
            this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
                 clientCnxnSocket, 0, new byte[16], canBeReadOnly);
        }
    
        
        public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
                ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
                long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
            //客户端实例
            this.zooKeeper = zooKeeper;
            // 客户端Watcher管理器
            this.watcher = watcher;
            //sessionId
            this.sessionId = sessionId;
            //会话密钥
            this.sessionPasswd = sessionPasswd;
            //会话超时时间
            this.sessionTimeout = sessionTimeout;
            //服务器地址列表管理器
            this.hostProvider = hostProvider;
             //根路径
            this.chrootPath = chrootPath;
           // 连接超时时间是会话超时时间和服务器数量的比值
            connectTimeout = sessionTimeout / hostProvider.size();
           // 读超时时间是会话超时时间的2/3
            readTimeout = sessionTimeout * 2 / 3;
    		
            readOnly = canBeReadOnly;
            //创建发送和事件处理线程
            sendThread = new SendThread(clientCnxnSocket);
            eventThread = new EventThread();
    
        }
    
       
        public static boolean getDisableAutoResetWatch() {
            return disableAutoWatchReset;
        }
       
        public static void setDisableAutoResetWatch(boolean b) {
            disableAutoWatchReset = b;
        }
    	//启动发送和事件处理线程
        public void start() {
            sendThread.start();
            eventThread.start();
        }
    
    // 事件处理线程
    class EventThread extends Thread {
    	    //等待处理的事件
            private final LinkedBlockingQueue<Object> waitingEvents =
                new LinkedBlockingQueue<Object>();
    
    		 /**
    		  *这个是真正的排队会话的状态,知道事件处理线程真正处理事件并将其返回给监视器。
    		  **/
            private volatile KeeperState sessionState = KeeperState.Disconnected;
    
           private volatile boolean wasKilled = false;
           private volatile boolean isRunning = false;
    
            EventThread() {
    		   // 构造一个线程名
                super(makeThreadName("-EventThread"));
                setUncaughtExceptionHandler(uncaughtExceptionHandler);
    			// 设置为守护线程
                setDaemon(true);
            }
            // 
            public void queueEvent(WatchedEvent event) {
    			// 如果WatchedEvent的类型是None状态是sessionStat的值则不处理
                if (event.getType() == EventType.None
                        && sessionState == event.getState()) {
                    return;
                }
    			// 获取事件的状态
                sessionState = event.getState();
    
                // 构建一个基于事件的监视器
                WatcherSetEventPair pair = new WatcherSetEventPair(
                        watcher.materialize(event.getState(), event.getType(),
                                event.getPath()),
                                event);
                // 排队pair,稍后处理
                waitingEvents.add(pair);
            }
    		
    		// 排队Packet
           public void queuePacket(Packet packet) {
              if (wasKilled) {
                 synchronized (waitingEvents) {
                    if (isRunning) waitingEvents.add(packet);
                    else processEvent(packet);
                 }
              } else {
                 waitingEvents.add(packet);
              }
           }
    
            public void queueEventOfDeath() {
                waitingEvents.add(eventOfDeath);
            }
    
            @Override
            public void run() {
               try {
                  isRunning = true;
                  while (true) {
    			     //从等待处理的事件队列中获取事件
                     Object event = waitingEvents.take();
    				 
                     if (event == eventOfDeath) {
                        wasKilled = true;
                     } else {
                        processEvent(event);
                     }
                     if (wasKilled)
                        synchronized (waitingEvents) {
                           if (waitingEvents.isEmpty()) {
                              isRunning = false;
                              break;
                           }
                        }
                  }
               } catch (InterruptedException e) {
                  LOG.error("Event thread exiting due to interruption", e);
               }
    
                LOG.info("EventThread shut down");
            }
    
    		// 真正处理事件的入口,主要是回调处理
           private void processEvent(Object event) {
              try {
    			// 如果事件是WatcherSetEventPair
                  if (event instanceof WatcherSetEventPair) {
                      //每个监视器都会处理这个事件
                      WatcherSetEventPair pair = (WatcherSetEventPair) event;
                      for (Watcher watcher : pair.watchers) {
                          try {
                              watcher.process(pair.event);
                          } catch (Throwable t) {
                              LOG.error("Error while calling watcher ", t);
                          }
                      }
                  } else {
                      Packet p = (Packet) event;
                      int rc = 0;
    				  // 获取客户端路径
                      String clientPath = p.clientPath;
                      if (p.replyHeader.getErr() != 0) {
                          rc = p.replyHeader.getErr();
                      }
                      if (p.cb == null) {
                          LOG.warn("Somehow a null cb got to EventThread!");
                      } else if (p.response instanceof ExistsResponse
                              || p.response instanceof SetDataResponse
                              || p.response instanceof SetACLResponse) {
    						 // 获取回调对象
                          StatCallback cb = (StatCallback) p.cb;
    					  // 如果处理成功回调方法会传入响应状态,否则响应状态为null
                          if (rc == 0) {
                              if (p.response instanceof ExistsResponse) {
                                  cb.processResult(rc, clientPath, p.ctx,
                                          ((ExistsResponse) p.response)
                                                  .getStat());
                              } else if (p.response instanceof SetDataResponse) {
                                  cb.processResult(rc, clientPath, p.ctx,
                                          ((SetDataResponse) p.response)
                                                  .getStat());
                              } else if (p.response instanceof SetACLResponse) {
                                  cb.processResult(rc, clientPath, p.ctx,
                                          ((SetACLResponse) p.response)
                                                  .getStat());
                              }
                          } else {
                              cb.processResult(rc, clientPath, p.ctx, null);
                          }
                      } else if (p.response instanceof GetDataResponse) {
                          DataCallback cb = (DataCallback) p.cb;
                          GetDataResponse rsp = (GetDataResponse) p.response;
                          if (rc == 0) {
                              cb.processResult(rc, clientPath, p.ctx, rsp
                                      .getData(), rsp.getStat());
                          } else {
                              cb.processResult(rc, clientPath, p.ctx, null,
                                      null);
                          }
                      } else if (p.response instanceof GetACLResponse) {
                          ACLCallback cb = (ACLCallback) p.cb;
                          GetACLResponse rsp = (GetACLResponse) p.response;
                          if (rc == 0) {
                              cb.processResult(rc, clientPath, p.ctx, rsp
                                      .getAcl(), rsp.getStat());
                          } else {
                              cb.processResult(rc, clientPath, p.ctx, null,
                                      null);
                          }
                      } else if (p.response instanceof GetChildrenResponse) {
                          ChildrenCallback cb = (ChildrenCallback) p.cb;
                          GetChildrenResponse rsp = (GetChildrenResponse) p.response;
                          if (rc == 0) {
                              cb.processResult(rc, clientPath, p.ctx, rsp
                                      .getChildren());
                          } else {
                              cb.processResult(rc, clientPath, p.ctx, null);
                          }
                      } else if (p.response instanceof GetChildren2Response) {
                          Children2Callback cb = (Children2Callback) p.cb;
                          GetChildren2Response rsp = (GetChildren2Response) p.response;
                          if (rc == 0) {
                              cb.processResult(rc, clientPath, p.ctx, rsp
                                      .getChildren(), rsp.getStat());
                          } else {
                              cb.processResult(rc, clientPath, p.ctx, null, null);
                          }
                      } else if (p.response instanceof CreateResponse) {
                          StringCallback cb = (StringCallback) p.cb;
                          CreateResponse rsp = (CreateResponse) p.response;
                          if (rc == 0) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      (chrootPath == null
                                              ? rsp.getPath()
                                              : rsp.getPath()
                                        .substring(chrootPath.length())));
                          } else {
                              cb.processResult(rc, clientPath, p.ctx, null);
                          }
                      } else if (p.cb instanceof VoidCallback) {
                          VoidCallback cb = (VoidCallback) p.cb;
                          cb.processResult(rc, clientPath, p.ctx);
                      }
                  }
              } catch (Throwable t) {
                  LOG.error("Caught unexpected throwable", t);
              }
           }
        }
  • 相关阅读:
    sqlserver和Oracle内部的错误数据修复(DBCC、DBMS_REPAIR)
    通过Oracle补充日志,找到锁阻塞源头的SQL
    禁用sqlserver的锁升级
    [转]SQLServer2008日志文件无法收缩处理方法
    Oracle警告、跟踪文件(10046、死锁等跟踪)
    dbms_stats包更新、导出、导入、锁定统计信息
    BulkCopy频繁执行产生的性能问题
    Oracle表空间不足
    组合索引字段顺序引发的死锁问题
    如何清除某条SQL的执行计划
  • 原文地址:https://www.cnblogs.com/wei-zw/p/8797762.html
Copyright © 2011-2022 走看看