zoukankan      html  css  js  c++  java
  • Tomcat如何实现AIO

    异步最大的特点 是,应用程序不需要自己去触发数据从内核空间到用户空间的拷贝。为什么是应用程序去“触发”数据的拷贝,而不是直接从内核拷贝数据呢?这是因为应用程 序是不能访问内核空间的,因此数据拷贝肯定是由内核来做,关键是谁来触发这个动作。是内核主动将数据拷贝到用户空间并通知应用程序。还是等待应用程序通过 Selector 来查 询,当数据就绪后,应用程序再发起一个 read 调用,这时内核再把数据从内核空间拷贝到用户空间。

    首先,应用程序在调用 read API 的同时告诉内核两件事情:数据准备好了以后拷贝到哪个 Buffer,以及调用哪个回调函数去处理这些数据。之后,内核接到这个 read 指令后,等待网卡数据到达,数据到了后,产生硬件中断,内核 在中断程序里把数据从网卡拷贝到内核空间,接着做 TCP/IP 协议层面的数据解包和重组, 再把数据拷贝到应用程序指定的 Buffer,最后调用应用程序指定的回调函数。Windows 的 IOCP 和 Linux 内核 2.6 的 AIO 都提供了异步 I/O 的 支持,Java 的 NIO.2 API 就是对操作系统异步 I/O API 的封装。

    下面是 Java 的 NIO.2 API 编写的一个服务端程序:

    public class Nio2Server {
     
       void listen(){
          //1. 创建一个线程池
          ExecutorService es = Executors.newCachedThreadPool();
     
          //2. 创建异步通道群组
          AsynchronousChannelGroup tg = AsynchronousChannelGroup.withCachedThreadPool(es, 1);
          
          //3. 创建服务端异步通道
          AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open(tg);
     
          //4. 绑定监听端口
          assc.bind(new InetSocketAddress(8080));
     
          //5. 监听连接,传入回调类处理连接请求
          assc.accept(this, new AcceptHandler()); 
       }
    }
    

    1. 创建一个线程池,这个线程池用来执行来自内核的回调请求。

    2. 创建一个 AsynchronousChannelGroup,并绑定一个线程池。

    3. 创建 AsynchronousServerSocketChannel,并绑定到 AsynchronousChannelGroup。

    4. 绑定一个监听端口。

    5. 调用 accept 方法开始监听连接请求,同时传入一个回调类去处理连接请求。

    accept 方法的第一个参数是 this 对象,就是 Nio2Server 对象本身。

    实在异步 I/O 模型里,应用程序不知道数 据在什么时候到达,因此向内核注册回调函数,当数据到达时,内核就会调用这个回调函 数。同时为了提高处理速度,会提供一个线程池给内核使用,这样不会耽误内核线程的工 作,内核只需要把工作交给线程池就立即返回了。

    处理连接的回调类 AcceptHandler如下:

    //AcceptHandler 类实现了 CompletionHandler 接口的 completed 方法。它还有两个模板参数,第一个是异步通道,第二个就是 Nio2Server 本身
    public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Nio2Server> {
     
       // 具体处理连接请求的就是 completed 方法,它有两个参数:第一个是异步通道,第二个就是上面传入的 NioServer 对象
       @Override
       public void completed(AsynchronousSocketChannel asc, Nio2Server attachment) {      
          // 调用 accept 方法继续接收其他客户端的请求
          attachment.assc.accept(attachment, this);
          
          //1. 先分配好 Buffer,告诉内核,数据拷贝到哪里去
          ByteBuffer buf = ByteBuffer.allocate(1024);
          
          //2. 调用 read 函数读取数据,除了把 buf 作为参数传入,还传入读回调类
          channel.read(buf, buf, new ReadHandler(asc)); 
     
    }
    

      CompletionHandler 接 口的定义:

    public interface CompletionHandler<V,A> {
     
        void completed(V result, A attachment);
     
        void failed(Throwable exc, A attachment);
    }
    

      CompletionHandler 接口有两个模板参数 V 和 A,分别表示 I/O 调用的返回值和附件 类。比如 accept 的返回值就是 AsynchronousSocketChannel,而附件类由用户自己决 定,在 accept 的调用中,我们传入了一个 Nio2Server。因此 AcceptHandler 带有了两个 模板参数:AsynchronousSocketChannel 和 Nio2Server。

    CompletionHandler 有两个方法:completed 和 failed,分别在 I/O 操作成功和失败时 调用。completed 方法有两个参数,其实就是前面说的两个模板参数。也就是说,Java 的 NIO.2 在调用回调方法时,会把返回值和附件类当作参数传给 NIO.2 的使用者。

    下面是处理读的回调类 ReadHandler:

    public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {   
        // 读取到消息后的处理  
        @Override  
        public void completed(Integer result, ByteBuffer attachment) {  
            //attachment 就是数据,调用 flip 操作,其实就是把读的位置移动最前面
            attachment.flip();  
            // 读取数据
            ... 
        }  
     
        void failed(Throwable exc, A attachment){
            ...
        }
    }
    

      read 调用的返回值是一个整型数,所以我们回调方法里的第一个参数就是一个整型,表示 有多少数据被读取到了 Buffer 中。第二个参数是一个 ByteBuffer,这是因为我们在调用 read 方法时,把用来存放数据的 ByteBuffer 当作附件类传进去了,所以在回调方法里, 有 ByteBuffer 类型的参数,我们直接从这个 ByteBuffer 里获取数据。

    Nio2Endpoint总体工作流程跟 NioEndpoint 是相似的。

    LimitLatch 是连接控制器,它负责控制最大连接数。

    Nio2Acceptor 扩展了 Acceptor,用异步 I/O 的方式来接收连接,跑在一个单独的线程里,也是一个线程组。Nio2Acceptor 接收新的连接后,得到一个 AsynchronousSocketChannel,Nio2Acceptor 把 AsynchronousSocketChannel 封装成一个 Nio2SocketWrapper,并创建一个 SocketProcessor 任务类交给线程池处理,并且 SocketProcessor 持有 Nio2SocketWrapper 对象。

    Executor 在执行 SocketProcessor 时,SocketProcessor 的 run 方法会调用 Http11Processor 来处理请求,Http11Processor 会通过 Nio2SocketWrapper 读取和解析请求数据,请求经过容器处理后,再把响应通过 Nio2SocketWrapper 写出。

    Nio2Endpoint 中没有 Poller 组件,也就是没有 Selector。这是为什么呢?因为在异步 I/O 模式下,Selector 的工作交给内核来做了。

    和 NioEndpint 一样,Nio2Endpoint 的基本思路是用 LimitLatch 组件来控制连接数,但是 Nio2Acceptor 的监听连接的过程不是在一个死循环里不断的调 accept 方法,而是通过回调函数来完成的。以下是它的连接监听方法:

    serverSock.accept(null, this);
    

      其实就是调用了 accept 方法,注意它的第二个参数是 this,表明 Nio2Acceptor 自己就是 处理连接的回调类,因此 Nio2Acceptor 实现了 CompletionHandler 接口。

    protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
        implements CompletionHandler<AsynchronousSocketChannel, Void> {
        
    @Override
    public void completed(AsynchronousSocketChannel socket,
            Void attachment) {
            
        if (isRunning() && !isPaused()) {
            if (getMaxConnections() == -1) {
                // 如果没有连接限制,继续接收新的连接
                serverSock.accept(null, this);
            } else {
                // 如果有连接限制,就在线程池里跑 Run 方法,Run 方法会检查连接数
                getExecutor().execute(this);
            }
            // 处理请求
            if (!setSocketOptions(socket)) {
                closeSocket(socket);
            }
        } 
    }
    

      CompletionHandler 的两个模板参数分别是 AsynchronousServerSocketChannel 和 Void,第一个参数就是 accept 方 法的返回值,第二个参数是附件类,由用户自己决定,这里为 Void。

    completed 方法的处 理逻辑:

    如果没有连接限制,继续在本线程中调用 accept 方法接收新的连接。

    如果有连接限制,就在线程池里跑 run 方法去接收新的连接。那为什么要跑 run 方法 呢,因为在 run 方法里会检查连接数,当连接达到最大数时,线程可能会被 LimitLatch 阻塞。为什么要放在线程池里跑呢?这是因为如果放在当前线程里执行,completed 方 法可能被阻塞,会导致这个回调方法一直不返回。

    接着 completed 方法会调用 setSocketOptions 方法,在这个方法里,会创建 Nio2SocketWrapper 和 SocketProcessor,并交给线程池处理。

    Nio2SocketWrapper 的主要作用是封装 Channel,并提供接口给 Http11Processor 读写数据。但是Http11Processor 是不能阻塞等待数据的,按照异步 I/O 的套路,Http11Processor 在调用 Nio2SocketWrapper 的 read 方法时需要注册回调类,read 调用会立即返回,问题是立即返回后 Http11Processor 还没有读到数据, 怎么办呢?这个请求的处理不就失败了吗?

    为了解决这个问题,Http11Processor 是通过 2 次 read 调用来完成数据读取操作的。

    第一次 read 调用:连接刚刚建立好后,Acceptor 创建 SocketProcessor 任务类交给线 程池去处理,Http11Processor 在处理请求的过程中,会调用 Nio2SocketWrapper 的 read 方法发出第一次读请求,同时注册了回调类 readCompletionHandler,因为数据 没读到,Http11Processor 把当前的 Nio2SocketWrapper 标记为数据不完整。接着 SocketProcessor 线程被回收,Http11Processor 并没有阻塞等待数据。Http11Processor 维护了一个 Nio2SocketWrapper 列表,也就是维护了连接的状 态。

    第二次 read 调用:当数据到达后,内核已经把数据拷贝到 Http11Processor 指定的 Buffer 里,同时回调类 readCompletionHandler 被调用,在这个回调处理方法里会重 新创建一个新的 SocketProcessor 任务来继续处理这个连接,而这个新的 SocketProcessor 任务类持有原来那个 Nio2SocketWrapper,这一次 Http11Processor 可以通过 Nio2SocketWrapper 读取数据了,因为数据已经到了应用 层的 Buffer。

    这个回调类 readCompletionHandler 的源码如下,最关键的一点是, Nio2SocketWrapper 是作为附件类来传递的,这样在回调函数里能拿到所有的上下文。

    this.readCompletionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() {
        public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) {
            ...
            // 通过附件类 SocketWrapper 拿到所有的上下文
            Nio2SocketWrapper.this.getEndpoint().processSocket(attachment, SocketEvent.OPEN_READ, false);
        }
     
        public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
            ...
        }
    }
    

      一个系统中如果已经支持同步 I/O,要再支持异步 I/O,改动是比较大的,很有可能不得不重新设计组件之间的接口。但是 Tomcat 通过充分的抽象,比如 SocketWrapper 对 Channel 的封装,再加上 Http11Processor 的两次 read 调用,巧妙地解决了这个问题,使得协议处理器 Http11Processor 和 I/O 通信处理器 Endpoint 之间的接口保持不变

  • 相关阅读:
    POJ3159 Candies —— 差分约束 spfa
    POJ1511 Invitation Cards —— 最短路spfa
    POJ1860 Currency Exchange —— spfa求正环
    POJ3259 Wormholes —— spfa求负环
    POJ3660 Cow Contest —— Floyd 传递闭包
    POJ3268 Silver Cow Party —— 最短路
    POJ1797 Heavy Transportation —— 最短路变形
    POJ2253 Frogger —— 最短路变形
    POJ1759 Garland —— 二分
    POJ3685 Matrix —— 二分
  • 原文地址:https://www.cnblogs.com/moonyaoo/p/13047382.html
Copyright © 2011-2022 走看看