zoukankan      html  css  js  c++  java
  • 从连接器组件看Tomcat的线程模型——NIO模式(转载)

    从连接器组件看Tomcat的线程模型——NIO模式

     

    Tomcat8之后,针对Http协议默认使用org.apache.coyote.http11.Http11NioProtocol,也就是NIO模式。通过之前的博客分析,我们知道Connector组件在初始化和start的时候会触发它子组件(Http11NioProtocol、NIOEndpoint的初始化和start)。

    NIO模式工作时序图#

    还是像之前那样,我们先整理出NIO模式启动时的时序图。

    从上面的时序图可以看出,整个流程的重点时在NioEndpoint这个类中。下面我们通过源代码看下这几个重点方法。

    Copy
    //NIO模式绑定端口
    public void bind() throws Exception {
            //初始化套接字服务,需要注意的是在NIO模式下,这个ServerSocketChannel还是阻塞模式的
            initServerSocket();
            //设置默认的acceptor线程数,默认是1个,这个参数暂时好像没法修改(??)
            //注意这个参数和acceptCount(接收请求连接的数量)之间的区别
            if (acceptorThreadCount == 0) {
                acceptorThreadCount = 1;
            }
            //设置pollerThreadCount,根据CPU的核数来,CPU大于2个设置为2,否则为1
            if (pollerThreadCount <= 0) 
                pollerThreadCount = 1;
            }
            //设置CountDownLatch
            setStopLatch(new CountDownLatch(pollerThreadCount));
            initialiseSsl();
            selectorPool.open();
        }
    

    这个代码主要做了些初始化工作,初始化套接字服务,初始化acceptorThreadCount和pollerThreadCount等。

    再看看startInternal代码:

    Copy
    @Override
    public void startInternal() throws Exception {
    
        if (!running) {
            running = true;
            paused = false;
            //创建3个缓存
            //频繁创建SocketProcessor成本高
            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                                     socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                                 socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                                  socketProperties.getBufferPool());
            //一般情况下,我们自己不配置线程池,所以会进入这个方法,也可以自己在server.xml中配置这个线程池。
            if ( getExecutor() == null ) {
                //创建一个核心线程数是10,最大线程数是200,队列长度是Integer.MaxValue的线程池
                //注意下,这边线程池的逻辑和JDK中线程池的逻辑不一样,默认创建10个线程,当请求数
                //超过10个的话会继续创建,最大创建200个线程,超过200个后,任务就会进入阻塞队列
    
                //值得注意的是Tomcat的线程池继承了JDK的ThreadPoolExecutor,但是重写了线程池的默认
                //机制。Tomcat的线程池会默认创建corePoolSize个线程,此时线程池中的线程都是空闲的。
                //随着不断向线程池中添加任务,空闲线程逐渐减少,当线程池中的空闲线程耗尽之前,任务
                //都会直接被提交到线程池的队列中(这些任务会立即被空闲线程消费),当线程池中没有空闲
                //线程而且线程池中的线程总数没达到MaximumPoolSize,会创建一个新的线程来执行新的任务;
                //当线程池的大小达到MaximumPoolSize时,直接将任务放进队列,等到有线程空闲下来后再处理
                //这个任务。(参考TaskQueue的offer方法)
                createExecutor();
            }
    
            initializeConnectionLatch();
            // Start poller threads
            //开启poller线程,如果CPU是多核就开启2个,否则开启一个
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }
            //开启acceptor线程,默认开启一个acceptor线程
            startAcceptorThreads();
        }
    }
    

    Acceptor线程分析#

    acceptor线程的作用是接收客户端请求,启动之后一个loop线程一直在监听用户请求。值得注意的是,如果用户一直没法请求过来,这个线程也是会一直阻塞的,直到有请求过来。

    Copy
    //Acceptor这个类是NIOEndpoint的一个内部类
    public void run() {
        int errorDelay = 0;
        // 一直会监听,直到关闭tomcat
        while (endpoint.isRunning()) {
            // Loop if endpoint is paused
            while (endpoint.isPaused() && endpoint.isRunning()) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }
            if (!endpoint.isRunning()) {
                break;
            }
            state = AcceptorState.RUNNING;
            try {
                //如果已经接受的请求超过maxAcceptCount,那么accept线程进入wait状态
                endpoint.countUpOrAwaitConnection();
                if (endpoint.isPaused()) {
                    continue;
                }
                U socket = null;
                try {
                    //接受socket,这个方法会阻塞,因为NIOEndpoint在初始化的时候
                    //将ServerSocketChannel设置成了阻塞模式
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    endpoint.countDownConnection();
                    if (endpoint.isRunning()) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                errorDelay = 0;
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    //这边委托给NioEndpoint的setSocketOptions方法处理
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else {
                    endpoint.destroySocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                String msg = sm.getString("endpoint.accept.fail");
                if (t instanceof Error) {
                    Error e = (Error) t;
                    if (e.getError() == 233) {
                        log.warn(msg, t);
                    } else {
                        log.error(msg, t);
                    }
                } else {
                    log.error(msg, t);
                }
            }
        }
        state = AcceptorState.ENDED;
    }
    

    下面看下NioEndpoint的setSocketOptions(SocketChannel socket)方法:

    Copy
    protected boolean setSocketOptions(SocketChannel socket) {
            // Process the connection
            try {
                //disable blocking, APR style, we are gonna be polling it
                socket.configureBlocking(false);
                Socket sock = socket.socket();
                socketProperties.setProperties(sock);
                //使用缓存的NioChannel,没有缓存的则新建
                NioChannel channel = nioChannels.pop();
                if (channel == null) {
                    SocketBufferHandler bufhandler = new SocketBufferHandler(
                            socketProperties.getAppReadBufSize(),
                            socketProperties.getAppWriteBufSize(),
                            socketProperties.getDirectBuffer());
                    if (isSSLEnabled()) {
                        channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                    } else {
                        channel = new NioChannel(socket, bufhandler);
                    }
                } else {
                    channel.setIOChannel(socket);
                    //使用缓存的channel,但是需要重新reset这个信道
                    channel.reset();
                }
                //将socket注册到poller队列中
                getPoller0().register(channel);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                try {
                    log.error("",t);
                } catch (Throwable tt) {
                    ExceptionUtils.handleThrowable(tt);
                }
                // Tell to close the socket
                return false;
            }
            return true;
        }
    

    Tomcat以NIO模式启动时NioEndpoint组件将启动某个端口的监听,一个连接到来后将被注册到NioChannel队列中,由Poller(轮询器)负责检测通道的读写事件,并在创建任务后扔进线程池中,线程池进行任务处理。处理过程中将通过协议解析器Http11NioProcessor组件对HTTP协议解析,同时通过适配器(Adapter)匹配到指定的容器进行处理并响应客户端。

    LimitLatch组件负责对连接数的控制,Acceptor组件负责接收套接字连接并注册到通道队列里面,Poller组件负责轮询检查事件列表,Poller池包含了若干Poller组件,SocketProcessor组件是任务定义器,Executor组件是负责处理套接字的线程池。下面将对每个组件的结构与作用进行解析。

    连接数控制器LimitLatch

    NIO模式中的LimitLatch组件和BIO模式中的LimitLatch组件功能一致,作用也是对最大连接数的限制。

    与BIO中的控制器不同的是,控制阀门的大小不相同,BIO模式受本身模式的限制,它的连接数与线程数比例是1:1的关系,所以当连接数太多时将导致线程数也很多,JVM线程数过多将导致线程间切换成本很高。默认情况下,Tomcat处理连接池的线程数为200,所以BIO流量控制阀门大小也默认设置为200。但NIO模式能克服BIO连接数的不足,它能基于事件同时维护大量的连接,对于事件的遍历只须交给同一个或少量的线程,再把具体的事件执行逻辑交给线程池。例如,Tomcat把套接字接收工作交给一个线程,而把套接字读写及处理工作交给N个线程,N一般为CPU核数。对于NIO模式,Tomcat默认把流量阀门大小设置为10 000,如果你想更改大小,可以通过server.xml中节点的maxConnections属性修改,同时要注意,连接数到达最大值后,操作系统仍然会接收客户端连接,直到操作系统接收队列被塞满。队列默认长度为100,可通过server.xml中节点的acceptCount属性配置。

    Acceptor组件

    Acceptor的主要职责也是监听是否有客户端连接进来并接收连接,这里需要注意的是,accept操作是阻塞的。假如用户一直没有请求发送过来,acceptor线程将一直阻塞。

    Acceptor接收SocketChannel对象后要把它设置为非阻塞,这是因为后面对客户端所有的连接都采取非阻塞模式处理。接着设置套接字的一些属性,再封装成非阻塞通道对象。非阻塞通道可能是NioChannel也可能是SecureNioChannel,这取决于使用HTTP通信还是使用HTTPS通信。最后将非阻塞通道对象注册到通道队列中并由Poller负责检测事件。

    任务定义器SocketProcessor

    与JIoEndpoint组件相似,将任务放到线程池中处理前需要定义好任务的执行逻辑。根据线程池的约定,它必须扩展Runnable接口:

    Copy
    protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
        //NIO方式读取套接字处理,并返回
        //连接数减一
        //关闭连接
    }
    

    因为NIO与BIO模式有很大不同,其中一个很大不同在于BIO每次返回都肯定能获取若干字节,而NIO无法保证每次读取的字节量,可多可少甚至可能没有,所以对于NIO模式,只能“尝试”处理请求报文。例如,第一次只读取了请求头部的一部分,不足以开始处理,但并不会阻塞,而是继续往下执行,直到下次循环到来,此时可能请求头部的另外一部分已经被读取,则可以开始处理请求头部。

    连接轮询器Poller

    NIO模型需要同时对很多连接进行管理,管理的方式则是不断遍历事件列表,对相应连接的相应事件做出处理,而遍历的工作正是交给Poller负责。Poller负责的工作可以用下图简单表示出来,在Java层面上看,它不断轮询事件列表,一旦发现相应的事件则封装成任务定义器SocketProcessor,进而扔进线程池中执行任务。当然,由于NioEndpoint组件内有一个Poller池,因此如果不存在线程池,任务将由Poller直接执行。

    Poller内部依赖JDK的Selector对象进行轮询,Selector会选择出待处理的事件,每轮询一次就选出若干需要处理的通道,例如从通道中读取字节、将字节写入Channel等。在NIO模式下,因为每次读取的数据是不确定的,对于HTTP协议来说,每次读取的数据可能既包含了请求行也包含了请求头部,也可能不包含请求头部,所以每次只能尝试去解析报文。若解析不成功则等待下次轮询读取更多的数据后再尝试解析,若解析报文成功则做一些逻辑处理后对客户端响应,而这些报文解析、逻辑处理、响应等都是在任务定义器中定义的。

    Poller池子

    在NIO模式下,对于客户端连接的管理都是基于事件驱动的,上一节提到NioEndpoint组件包含了Poller组件,Poller负责的工作就是检测事件并处理事件。但假如整个Tomcat的所有客户端连接都交给一个线程来处理,那么即使这个线程是不阻塞的,整体处理性能也可能无法达到最佳或较佳的状态。为了提升处理性能,Tomcat设计成由多个Poller共同处理所有客户端连接,所有连接均摊给每个Poller处理,而这些Poller便组成了Poller池。

    整个结构如图6.40所示,客户端连接由Acceptor组件接收后按照一定的算法放到通道队列上。这里使用的是轮询调度算法,从第1个队列到第N个队列循环分配,假如这里有3个Poller,则第1个连接分配给第1个Poller对应的通道列表,第2个连接分配给第2个Poller对应的通道列表,以此类推,到第4个连接又分配到第1个Poller对应的通道列表上。这种算法基本保证了每个Poller所对应处理的连接数均匀,每个Poller各自轮询检测自己对应的事件列表,一旦发现需要处理的连接则对其进行处理。这时如果NioEndpoint组件包含任务执行器(Executor)则会将任务处理交给它,但假如没有Executor组件,Poller则自己处理任务。

    Poller池的大小多少比较合适呢?Tomcat使用了一个经典的算法Math.min(2, Runtime. getRuntime().availableProcessors()),即会根据Tomcat运行环境决定Poller组件的数量。所以在Tomcat中最少会有两个Poller组件,而如果运行在更多处理器的机器上,则JVM可用处理器个数等于Poller组件的个数。

    参考#

    http://server.51cto.com/sOS-595052.html

    https://nod0620.iteye.com/blog/998215

    https://www.jianshu.com/p/370af4895545

    https://www.jianshu.com/p/901a6e35b3d9

    http://m.elecfans.com/article/632834.html

    原文链接:https://www.cnblogs.com/54chensongxia/p/13289174.html

  • 相关阅读:
    HDU2059(龟兔赛跑)
    pat 1012 The Best Rank
    pat 1010 Radix
    pat 1007 Maximum Subsequence Sum
    pat 1005 Sign In and Sign Out
    pat 1005 Spell It Right
    pat 1004 Counting Leaves
    1003 Emergency
    第7章 输入/输出系统
    第六章 总线
  • 原文地址:https://www.cnblogs.com/cmd-/p/13330658.html
Copyright © 2011-2022 走看看