zoukankan      html  css  js  c++  java
  • zookeeper客户端访问服务端时,基于NIO的线程池绑定

     SelectorThread可以通过JVM参数指定,表示多路复用器。

    每一个客户端生成一个SocketChannel,多个或一个SocketChannel与SelectorThread绑定。

    详细描述如下:

    1. 会有若干个客户端由AcceptThread线程去接收socket连接,客户端与ZKServer取得连接。每存在一个连接就有一个SocketChannel

      1.   同时也会生成一个NIOServerCnxn,这个上下文对象在下文有“大用处”
    2. SocketChannel与SelectorThread(多路复用选择器)绑定到一起,作为请求的传输通道

      1. 每产生一个SocketChannel对象就会作为SelectorThread对象的acceptedQueue属性与SelectorThread绑定。
    3. SelectorThread监听接收就绪事件

      1. SelectorThread会不断的从acceptedQueue中读取SocketChannel对象,作为注册事件读到SelectorThread的selector中。同时SelectorThread也会不断地从selector中读取就绪事件
    4. 处理请求的方式

      1.   每获取到一个就绪事件就会封装成一个IOWorkRequest对象
      2.        然后会将IOWorkRequest对象封装成一个ScheduleWorkRequest对象(线程)
      3.        将Schedule对象放到线程池中执行
        1.   线程池执行时调用IOWorkRequest对象的doWork()方法。
                  public void doWork() throws InterruptedException {
          
          
                      if (!key.isValid()) {
                          selectorThread.cleanupSelectionKey(key);
                          return;
                      }
          
                      // 读就绪或写就绪
                      if (key.isReadable() || key.isWritable()) {
          
                          // 处理key
                          // 到这里,多个客户端请求还是并发处理的
          
                          cnxn.doIO(key); // 顺序
          
                          // Check if we shutdown or doIO() closed this connection
                          if (stopped) {
                              cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                              return;
                          }
                          if (!key.isValid()) {
                              selectorThread.cleanupSelectionKey(key);
                              return;
                          }
                          touchCnxn(cnxn);
                      }
          
                      // Mark this connection as once again ready for selection
                      cnxn.enableSelectable();
                      // Push an update request on the queue to resume selecting
                      // on the current set of interest ops, which may have changed
                      // as a result of the I/O operations we just performed.
                      if (!selectorThread.addInterestOpsUpdateRequest(key)) {
                          cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
                      }
                  }
        2. 可以看到,doWork方法内部调用的就是NIOServerCnxn的doIO方法。所以doIO()方法是zkServer真正处理客户端请求的方法(处理CRUD等请求)
          void doIO(SelectionKey k) throws InterruptedException {
                  try {
                      if (!isSocketOpen()) {
                          LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));
          
                          return;
                      }
                      if (k.isReadable()) {
                          // 读就绪,把数据读到incomingBuffer中,
                          int rc = sock.read(incomingBuffer); // 45 一开始读4个字节数据,也就是读数据包的长度
          
                          //
                          if (rc < 0) {
                              // 没有读到数据则报错
                              handleFailedRead();
                          }
          
                          // 表示还有没有剩余空间可以读数据
                          if (incomingBuffer.remaining() == 0) {
                              boolean isPayload;
          
                              // 读到的是长度
                              if (incomingBuffer == lenBuffer) { // start of next request
                                  incomingBuffer.flip();
                                  isPayload = readLength(k);
                                  incomingBuffer.clear(); // 54byte
                              } else {
                                  // 读到的是真正的packet数据(也就是命令)
                                  // continuation
                                  isPayload = true;
                              }
          
                              if (isPayload) { // not the case for 4letterword
                                  // 处理命令
                                  readPayload();//读数据
                              } else {
                                  // four letter words take care
                                  // need not do anything else
                                  return;
                              }
                          }
                      }
                      if (k.isWritable()) {
                          // 从outgoingBuffers中获取数据进行写入(返回给客户端)
                          handleWrite(k);
          
                          if (!initialized && !getReadInterest() && !getWriteInterest()) {
                              throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE);
                          }
                      }
                  } catch (CancelledKeyException e) {
                      LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId));
          
                      LOG.debug("CancelledKeyException stack trace", e);
          
                      close(DisconnectReason.CANCELLED_KEY_EXCEPTION);
                  } catch (CloseRequestException e) {
                      // expecting close to log session closure
                      // 移除watcher,并关闭socket
                      close();
                  } catch (EndOfStreamException e) {
                      LOG.warn("Unexpected exception", e);
                      // expecting close to log session closure
                      close(e.getReason());
                  } catch (ClientCnxnLimitException e) {
                      // Common case exception, print at debug level
                      ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
                      LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e);
                      close(DisconnectReason.CLIENT_CNX_LIMIT);
                  } catch (IOException e) {
                      LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e);
                      close(DisconnectReason.IO_EXCEPTION);
                  }
              }

  • 相关阅读:
    Jenkins配置钉钉通知
    Jenkins 学习笔记
    2020年10月26日Britain suggests it may overturn parts of the EU withdrawal agreement
    【火爆抢答中】HarmonyOS有奖问答,更多惊喜等你来拿!
    三七互娱《斗罗大陆:魂师对决》上线,Network Kit助力玩家即刻畅玩
    运动健康南向设备接入服务传输数据解析举例
    华为商品管理系统批量更新商品时提示:请至少输入一组国家码和价格
    云空间服务,助力用户数据存储与协同
    Input组件无点击效果
    华为视频编辑服务(Video Editor Kit),助力开发者高效构建应用视频编辑能力
  • 原文地址:https://www.cnblogs.com/yibao/p/14044769.html
Copyright © 2011-2022 走看看