zoukankan      html  css  js  c++  java
  • 深入理解Tomcat(十)Connector

    前言

    终于进行到Connector的分析阶段了,这也是Tomcat里面最复杂的一块功能了。Connector中文名为连接器,既然是连接器,它肯定会连接某些东西,连接些什么呢?connector组件的processor对象处理的request和response,connector有三种request和response对象:coyoteRequest和coyoteResponse(是专门用于Tomcat内部表示,和servlet无关),Catalina(实现了servlet规范,将上面的coyote的那对request和response适配成Catalina包下的request和response),facade(作用是做Catalina包下面的那对request和response的门面,也部分实现servlet的规范,还有其他public方法,但是不暴露出去)

    Connector用于接受请求并将请求封装成Request和Response,然后交给Container进行处理,Container处理完之后再交给Connector返回给客户端。

    要理解Connector,我们需要问自己4个问题。

    • (1)Connector如何接受请求的?
    • (2)如何将请求封装成Request和Response的?
    • (3)封装完之后的Request和Response如何交给Container进行处理的?
    • (4)Container处理完之后如何交给Connector并返回给客户端的?

    先来一张Connector的整体结构图

     
    Connector整体结构图

    【注意】:不同的协议、不同的通信方式,ProtocolHandler会有不同的实现。在Tomcat8.5中,ProtocolHandler的类继承层级如下图所示。

     
    ProtocolHandler类继承层级

    针对上述的类继承层级图,我们做如下说明:

    1. ajp和http11是两种不同的协议
    2. nio、nio2和apr是不同的通信方式
    3. 协议和通信方式可以相互组合。

    ProtocolHandler包含三个部件:EndpointProcessorAdapter

    1. Endpoint用来处理底层Socket的网络连接,Processor用于将Endpoint接收到的Socket封装成Request,Adapter用于将Request交给Container进行具体的处理。
    2. Endpoint由于是处理底层的Socket网络连接,因此Endpoint是用来实现TCP/IP协议的,而Processor用来实现HTTP协议的,Adapter将请求适配到Servlet容器进行具体的处理。
    3. Endpoint的抽象实现类AbstractEndpoint里面定义了AcceptorAsyncTimeout两个内部类和一个Handler接口Acceptor用于监听请求,AsyncTimeout用于检查异步Request的超时,Handler用于处理接收到的Socket,在内部调用Processor进行处理。

    至此,我们已经明白了问题(1)、(2)和(3)。至于(4),当我们了解了Container自然就明白了,前面章节内容已经详细分析过了。

    Connector源码分析入口

    我们在Service标准实现StandardService的源码中发现,其init()start()stop()destroy()方法分别会对Connectors的同名方法进行调用。而一个Service对应着多个Connector。限于篇幅,本章不再罗列这部分代码,需要读者自行阅读tomcat源码。

    【注】:本章我们仅对http1.1协议且nio通信方式的相关代码进行分析。

    Connector启动逻辑

    我们知道Connector实现了Lifecycle接口,所以它是一个生命周期组件。所以Connector的启动逻辑入口在于init()start()

    Connector构造方法

    在分析之前,我们看看server.xml,该文件已经体现出了tomcat中各个组件的大体结构。

    <?xml version='1.0' encoding='utf-8'?>
    <Server port="8005" shutdown="SHUTDOWN">
      <Listener className="org.apache.catalina.startup.VersionLoggerListener" />
      <Listener className="org.apache.catalina.core.AprLifecycleListener" SSLEngine="on" />
      <Listener className="org.apache.catalina.core.JreMemoryLeakPreventionListener" />
      <Listener className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener" />
      <Listener className="org.apache.catalina.core.ThreadLocalLeakPreventionListener" />
    
      <GlobalNamingResources>
        <Resource name="UserDatabase" auth="Container"
                  type="org.apache.catalina.UserDatabase"
                  description="User database that can be updated and saved"
                  factory="org.apache.catalina.users.MemoryUserDatabaseFactory"
                  pathname="conf/tomcat-users.xml" />
      </GlobalNamingResources>
    
      <Service name="Catalina">
        <Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" />
        <Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />
    
        <Engine name="Catalina" defaultHost="localhost">
          <Realm className="org.apache.catalina.realm.LockOutRealm">
            <Realm className="org.apache.catalina.realm.UserDatabaseRealm"
                   resourceName="UserDatabase"/>
          </Realm>
    
          <Host name="localhost"  appBase="webapps"
                unpackWARs="true" autoDeploy="true">
            <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
                   prefix="localhost_access_log" suffix=".txt"
                   pattern="%h %l %u %t &quot;%r&quot; %s %b" />
          </Host>
        </Engine>
      </Service>
    </Server>
    

    在这个文件中,我们看到一个Connector有几个关键属性,portprotocol是其中的两个。server.xml默认支持两种协议:HTTP/1.1AJP/1.3。其中HTTP/1.1用于支持http1.1协议,而AJP/1.3用于支持对apache服务器的通信。

    接下来我们看看构造方法。

    public Connector() {
        this(null); // 1. 无参构造方法,传入参数为空协议,会默认使用`HTTP/1.1`
    }
    
    public Connector(String protocol) {
        setProtocol(protocol);
        // Instantiate protocol handler
        // 5. 使用protocolHandler的类名构造ProtocolHandler的实例
        ProtocolHandler p = null;
        try {
            Class<?> clazz = Class.forName(protocolHandlerClassName);
            p = (ProtocolHandler) clazz.getConstructor().newInstance();
        } catch (Exception e) {
            log.error(sm.getString(
                    "coyoteConnector.protocolHandlerInstantiationFailed"), e);
        } finally {
            this.protocolHandler = p;
        }
    
        if (Globals.STRICT_SERVLET_COMPLIANCE) {
            uriCharset = StandardCharsets.ISO_8859_1;
        } else {
            uriCharset = StandardCharsets.UTF_8;
        }
    }
    
    @Deprecated
    public void setProtocol(String protocol) {
        boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
                AprLifecycleListener.getUseAprConnector();
    
        // 2. `HTTP/1.1`或`null`,protocolHandler使用`org.apache.coyote.http11.Http11NioProtocol`,不考虑apr
        if ("HTTP/1.1".equals(protocol) || protocol == null) {
            if (aprConnector) {
                setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
            } else {
                setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
            }
        }
        // 3. `AJP/1.3`,protocolHandler使用`org.apache.coyote.ajp.AjpNioProtocol`,不考虑apr
        else if ("AJP/1.3".equals(protocol)) {
            if (aprConnector) {
                setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
            } else {
                setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
            }
        }
        // 4. 其他情况,使用传入的protocol作为protocolHandler的类名
        else {
            setProtocolHandlerClassName(protocol);
        }
    }
    

    从上面的代码我们看到构造方法主要做了下面几件事情:

    1. 无参构造方法,传入参数为空协议,会默认使用HTTP/1.1
    2. HTTP/1.1null,protocolHandler使用org.apache.coyote.http11.Http11NioProtocol,不考虑apr
    3. AJP/1.3,protocolHandler使用org.apache.coyote.ajp.AjpNioProtocol,不考虑apr
    4. 其他情况,使用传入的protocol作为protocolHandler的类名
    5. 使用protocolHandler的类名构造ProtocolHandler的实例

    Connector.init()

    @Override
    protected void initInternal() throws LifecycleException {
        super.initInternal();
    
        // Initialize adapter
        // 1. 初始化adapter
        adapter = new CoyoteAdapter(this);
        protocolHandler.setAdapter(adapter);
    
        // Make sure parseBodyMethodsSet has a default
        // 2. 设置接受body的method列表,默认为POST
        if (null == parseBodyMethodsSet) {
            setParseBodyMethods(getParseBodyMethods());
        }
    
        if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
            throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
                    getProtocolHandlerClassName()));
        }
        if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
                protocolHandler instanceof AbstractHttp11JsseProtocol) {
            AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
                    (AbstractHttp11JsseProtocol<?>) protocolHandler;
            if (jsseProtocolHandler.isSSLEnabled() &&
                    jsseProtocolHandler.getSslImplementationName() == null) {
                // OpenSSL is compatible with the JSSE configuration, so use it if APR is available
                jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
            }
        }
    
        // 3. 初始化protocolHandler
        try {
            protocolHandler.init();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
        }
    }
    

    init()方法做了3件事情

    1. 初始化adapter
    2. 设置接受body的method列表,默认为POST
    3. 初始化protocolHandler

    ProtocolHandler类继承层级我们知道ProtocolHandler的子类都必须实现AbstractProtocol抽象类,而protocolHandler.init();的逻辑代码正是在这个抽象类里面。我们来分析一下。

    @Override
    public void init() throws Exception {
        if (getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
        }
    
        if (oname == null) {
            // Component not pre-registered so register it
            oname = createObjectName();
            if (oname != null) {
                Registry.getRegistry(null, null).registerComponent(this, oname, null);
            }
        }
    
        if (this.domain != null) {
            rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
            Registry.getRegistry(null, null).registerComponent(
                    getHandler().getGlobal(), rgOname, null);
        }
    
        // 1. 设置endpoint的名字,默认为:http-nio-{port}
        String endpointName = getName();
        endpoint.setName(endpointName.substring(1, endpointName.length()-1));
        endpoint.setDomain(domain);
        
        // 2. 初始化endpoint
        endpoint.init();
    }
    

    我们接着分析一下Endpoint.init()里面又做了什么。该方法位于AbstactEndpoint抽象类,该类是基于模板方法模式实现的,主要调用了子类的bind()方法。

    public abstract void bind() throws Exception;
    public abstract void unbind() throws Exception;
    public abstract void startInternal() throws Exception;
    public abstract void stopInternal() throws Exception;
    
    public void init() throws Exception {
        // 执行bind()方法
        if (bindOnInit) {
            bind();
            bindState = BindState.BOUND_ON_INIT;
        }
        if (this.domain != null) {
            // Register endpoint (as ThreadPool - historical name)
            oname = new ObjectName(domain + ":type=ThreadPool,name="" + getName() + """);
            Registry.getRegistry(null, null).registerComponent(this, oname, null);
    
            ObjectName socketPropertiesOname = new ObjectName(domain +
                    ":type=ThreadPool,name="" + getName() + "",subType=SocketProperties");
            socketProperties.setObjectName(socketPropertiesOname);
            Registry.getRegistry(null, null).registerComponent(socketProperties, socketPropertiesOname, null);
    
            for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
                registerJmx(sslHostConfig);
            }
        }
    }
    

    继续分析bind()方法,我们终于看到了我们想要看的东西了。关键的代码在于serverSock.socket().bind(addr,getAcceptCount());,用于绑定ServerSocket到指定的端口。

    @Override
    public void bind() throws Exception {
    
        if (!getUseInheritedChannel()) {
            serverSock = ServerSocketChannel.open();
            socketProperties.setProperties(serverSock.socket());
            InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
            serverSock.socket().bind(addr,getAcceptCount());
        } else {
            // Retrieve the channel provided by the OS
            Channel ic = System.inheritedChannel();
            if (ic instanceof ServerSocketChannel) {
                serverSock = (ServerSocketChannel) ic;
            }
            if (serverSock == null) {
                throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
            }
        }
        serverSock.configureBlocking(true); //mimic APR behavior
    
        // Initialize thread count defaults for acceptor, poller
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }
        if (pollerThreadCount <= 0) {
            //minimum one poller thread
            pollerThreadCount = 1;
        }
        setStopLatch(new CountDownLatch(pollerThreadCount));
    
        // Initialize SSL if needed
        initialiseSsl();
    
        selectorPool.open();
    }
    

    好了,我们已经分析完了init()方法,接下来我们分析start()方法。关键代码就一行,调用ProtocolHandler.start()方法。

    Connector.start()

    @Override
    protected void startInternal() throws LifecycleException {
    
        // Validate settings before starting
        if (getPort() < 0) {
            throw new LifecycleException(sm.getString(
                    "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
        }
    
        setState(LifecycleState.STARTING);
    
        try {
            protocolHandler.start();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
        }
    }
    

    我们深入ProtocolHandler.start()方法。

    1. 调用Endpoint.start()方法
    2. 开启异步超时线程,线程执行单元为Asynctimeout
    @Override
    public void start() throws Exception {
        if (getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
        }
    
        // 1. 调用`Endpoint.start()`方法
        endpoint.start();
    
        // Start async timeout thread
        // 2. 开启异步超时线程,线程执行单元为`Asynctimeout`
        asyncTimeout = new AsyncTimeout();
        Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
        int priority = endpoint.getThreadPriority();
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            priority = Thread.NORM_PRIORITY;
        }
        timeoutThread.setPriority(priority);
        timeoutThread.setDaemon(true);
        timeoutThread.start();
    }
    

    这儿我们重点关注Endpoint.start()方法,主要做的事情如下:

    1. bind()已经在init()中分析过了
    2. 创建工作者线程池
    3. 初始化连接latch,用于限制请求的并发量
    4. 开启poller线程。poller用于对接受者线程生产的消息(或事件)进行处理,poller最终调用的是Handler的代码
    5. 开启acceptor线程
    public final void start() throws Exception {
        // 1. `bind()`已经在`init()`中分析过了
        if (bindState == BindState.UNBOUND) {
            bind();
            bindState = BindState.BOUND_ON_START;
        }
        startInternal();
    }
    
    @Override
    public void startInternal() throws Exception {
        if (!running) {
            running = true;
            paused = false;
    
            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getBufferPool());
    
            // Create worker collection
            // 2. 创建工作者线程池
            if ( getExecutor() == null ) {
                createExecutor();
            }
            
            // 3. 初始化连接latch,用于限制请求的并发量
            initializeConnectionLatch();
    
            // Start poller threads
            // 4. 开启poller线程。poller用于对接受者线程生产的消息(或事件)进行处理,poller最终调用的是Handler的代码
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }
            // 5. 开启acceptor线程
            startAcceptorThreads();
        }
    }
    
    protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];
    
        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }
    

    Connector请求逻辑

    分析完了Connector的启动逻辑之后,我们就需要进一步分析一下http的请求逻辑,当请求从客户端发起之后,需要经过哪些操作才能真正地得到执行?tomcat设计得非常得精巧和复杂,如果没有一个整的调用逻辑图,我们很难在复杂的代码中一窥全貌。

    警告:过多的细节往往会掩盖真相!

    先给出调用链路图~,该图位于tomcat官网 - Apache Tomcat 8 Architecture

     
    调用链路图

    Acceptor

    Connector整体结构图里面我们看到请求的入口是在AcceptorEndpoint.start()方法会开启Acceptor线程来处理请求。那么我们接下来就要分析一下Acceptor线程中的执行逻辑。

    protected class Acceptor extends AbstractEndpoint.Acceptor {
        @Override
        public void run() {
            int errorDelay = 0;
    
            // Loop until we receive a shutdown command
            while (running) {
    
                // Loop if endpoint is paused
                // 1. 运行过程中,如果`Endpoint`暂停了,则`Acceptor`进行自旋(间隔50毫秒) `       
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
                // 2. 如果`Endpoint`终止运行了,则`Acceptor`也会终止
                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;
    
                try {
                    //if we have reached max connections, wait
                    // 3. 如果请求达到了最大连接数,则wait直到连接数降下来
                    countUpOrAwaitConnection();
    
                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        // 4. 接受下一次连接的socket
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        // We didn't get a socket
                        countDownConnection();
                        if (running) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;
    
                    // Configure the socket
                    if (running && !paused) {
                        // setSocketOptions() will hand the socket off to
                        // an appropriate processor if successful
                        // 5. `setSocketOptions()`这儿是关键,会将socket以事件的方式传递给poller
                        if (!setSocketOptions(socket)) {
                            closeSocket(socket);
                        }
                    } else {
                        closeSocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }
    }
    

    Acceptor.run()方法会做下面几件事情

    1. 运行过程中,如果Endpoint暂停了,则Acceptor进行自旋(间隔50毫秒)
    2. 如果Endpoint终止运行了,则Acceptor也会终止
    3. 如果请求达到了最大连接数,则wait直到连接数降下来
    4. 接受下一次连接的socket
    5. setSocketOptions()这儿是关键,会将socket以事件的方式传递给poller

    我们来分析一下关键的方法setSocketOptions()

    protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
    
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                channel.reset();
            }
            // 将channel注册到poller,注意关键的两个方法,`getPoller0()`和`Poller.register()`
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }
    

    将channel注册到poller,注意关键的两个方法,getPoller0()Poller.register()。先来分析一下getPoller0(),该方法比较关键的一个地方就是以取模的方式对poller数量进行轮询获取。

    /**
     * The socket poller.
     */
    private Poller[] pollers = null;
    private AtomicInteger pollerRotater = new AtomicInteger(0);
    /**
     * Return an available poller in true round robin fashion.
     *
     * @return The next poller in sequence
     */
    public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }
    

    接下来我们分析一下Poller.register()方法。因为Poller维持了一个events同步队列,所以Acceptor接受到的channel会放在这个队列里面,放置的代码为events.offer(event);

    public class Poller implements Runnable {
    
        private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
    
        /**
         * Registers a newly created socket with the poller.
         *
         * @param socket    The newly created socket
         */
        public void register(final NioChannel socket) {
            socket.setPoller(this);
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
            socket.setSocketWrapper(ka);
            ka.setPoller(this);
            ka.setReadTimeout(getSocketProperties().getSoTimeout());
            ka.setWriteTimeout(getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            ka.setReadTimeout(getConnectionTimeout());
            ka.setWriteTimeout(getConnectionTimeout());
            PollerEvent r = eventCache.pop();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }
    
        private void addEvent(PollerEvent event) {
            events.offer(event);
            if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
        }
    }
    

    Poller

    Acceptor生成了事件PollerEvent,那么Poller必然会对这些事件进行消费。我们来分析一下Poller.run()方法。真正处理key的地方在于processKey(sk, attachment);

    public class Poller implements Runnable {
        @Override
        public void run() {
            // Loop until destroy() is called
            while (true) {
    
                boolean hasEvents = false;
    
                try {
                    if (!close) {
                        hasEvents = events();
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            //if we are here, means we have other stuff to do
                            //do a non blocking select
                            keyCount = selector.selectNow();
                        } else {
                            keyCount = selector.select(selectorTimeout);
                        }
                        wakeupCounter.set(0);
                    }
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error("",x);
                    continue;
                }
                //either we timed out or we woke up, process events first
                if ( keyCount == 0 ) hasEvents = (hasEvents | events());
    
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                // 对已经准备好的key进行处理
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (attachment == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        // 真正处理key的地方
                        processKey(sk, attachment);
                    }
                }//while
    
                //process timeouts
                timeout(keyCount,hasEvents);
            }//while
    
            getStopLatch().countDown();
        }
    }
    

    我们接着分析processKey(),该方法又会根据key的类型,来分别处理读和写。

    1. 处理读事件,比如生成Request对象
    2. 处理写事件,比如将生成的Response对象通过socket写回客户端
    protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
        try {
            if ( close ) {
                cancelledKey(sk);
            } else if ( sk.isValid() && attachment != null ) {
                if (sk.isReadable() || sk.isWritable() ) {
                    if ( attachment.getSendfileData() != null ) {
                        processSendfile(sk,attachment, false);
                    } else {
                        unreg(sk, attachment, sk.readyOps());
                        boolean closeSocket = false;
                        // 1. 处理读事件,比如生成Request对象
                        // Read goes before write
                        if (sk.isReadable()) {
                            if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                closeSocket = true;
                            }
                        }
                        // 2. 处理写事件,比如将生成的Response对象通过socket写回客户端
                        if (!closeSocket && sk.isWritable()) {
                            if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                closeSocket = true;
                            }
                        }
                        if (closeSocket) {
                            cancelledKey(sk);
                        }
                    }
                }
            } else {
                //invalid key
                cancelledKey(sk);
            }
        } catch ( CancelledKeyException ckx ) {
            cancelledKey(sk);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            log.error("",t);
        }
    }
    

    我们继续来分析方法processSocket()

    1. processorCache里面拿一个Processor来处理socket,Processor的实现为SocketProcessor
    2. Processor放到工作线程池中执行
    public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            // 1. 从`processorCache`里面拿一个`Processor`来处理socket,`Processor`的实现为`SocketProcessor`
            SocketProcessorBase<S> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            // 2. 将`Processor`放到工作线程池中执行
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }
    

    接着我们分析SocketProcessor.doRun()方法(SocketProcessor.run()方法最终调用此方法)。该方法将处理逻辑交给Handler处理,当event为null时,则表明是一个OPEN_READ事件。

    protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
    
        public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
            super(socketWrapper, event);
        }
    
        @Override
        protected void doRun() {
            NioChannel socket = socketWrapper.getSocket();
            SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
    
            try {
                int handshake = -1;
    
                try {
                    if (key != null) {
                        if (socket.isHandshakeComplete()) {
                            // No TLS handshaking required. Let the handler
                            // process this socket / event combination.
                            handshake = 0;
                        } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                                event == SocketEvent.ERROR) {
                            // Unable to complete the TLS handshake. Treat it as
                            // if the handshake failed.
                            handshake = -1;
                        } else {
                            handshake = socket.handshake(key.isReadable(), key.isWritable());
                            // The handshake process reads/writes from/to the
                            // socket. status may therefore be OPEN_WRITE once
                            // the handshake completes. However, the handshake
                            // happens when the socket is opened so the status
                            // must always be OPEN_READ after it completes. It
                            // is OK to always set this as it is only used if
                            // the handshake completes.
                            event = SocketEvent.OPEN_READ;
                        }
                    }
                } catch (IOException x) {
                    handshake = -1;
                    if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
                } catch (CancelledKeyException ckx) {
                    handshake = -1;
                }
                if (handshake == 0) {
                    SocketState state = SocketState.OPEN;
                    // Process the request from this socket
                    // 将处理逻辑交给`Handler`处理,当event为null时,则表明是一个`OPEN_READ`事件
                    if (event == null) {
                        state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                    } else {
                        state = getHandler().process(socketWrapper, event);
                    }
                    if (state == SocketState.CLOSED) {
                        close(socket, key);
                    }
                } else if (handshake == -1 ) {
                    close(socket, key);
                } else if (handshake == SelectionKey.OP_READ){
                    socketWrapper.registerReadInterest();
                } else if (handshake == SelectionKey.OP_WRITE){
                    socketWrapper.registerWriteInterest();
                }
            } catch (CancelledKeyException cx) {
                socket.getPoller().cancelledKey(key);
            } catch (VirtualMachineError vme) {
                ExceptionUtils.handleThrowable(vme);
            } catch (Throwable t) {
                log.error("", t);
                socket.getPoller().cancelledKey(key);
            } finally {
                socketWrapper = null;
                event = null;
                //return to cache
                if (running && !paused) {
                    processorCache.push(this);
                }
            }
        }
    }
    

    Handler的实现 -- ConnectionHandler

    Handler的关键方法是process(),该方法非常地长,超过了200行,前方高能!
    虽然这个方法有很多条件分支,但是逻辑却非常清楚,主要是调用Processor.process()方法。

    @Override
    public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
        if (getLog().isDebugEnabled()) {
            getLog().debug(sm.getString("abstractConnectionHandler.process",
                    wrapper.getSocket(), status));
        }
        if (wrapper == null) {
            // Nothing to do. Socket has been closed.
            return SocketState.CLOSED;
        }
    
        S socket = wrapper.getSocket();
    
        Processor processor = connections.get(socket);
        if (getLog().isDebugEnabled()) {
            getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
                    processor, socket));
        }
    
        // Async timeouts are calculated on a dedicated thread and then
        // dispatched. Because of delays in the dispatch process, the
        // timeout may no longer be required. Check here and avoid
        // unnecessary processing.
        if (SocketEvent.TIMEOUT == status && (processor == null ||
                !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {
            // This is effectively a NO-OP
            return SocketState.OPEN;
        }
    
        if (processor != null) {
            // Make sure an async timeout doesn't fire
            getProtocol().removeWaitingProcessor(processor);
        } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
            // Nothing to do. Endpoint requested a close and there is no
            // longer a processor associated with this socket.
            return SocketState.CLOSED;
        }
    
        ContainerThreadMarker.set();
    
        try {
            if (processor == null) {
                String negotiatedProtocol = wrapper.getNegotiatedProtocol();
                if (negotiatedProtocol != null) {
                    UpgradeProtocol upgradeProtocol =
                            getProtocol().getNegotiatedProtocol(negotiatedProtocol);
                    if (upgradeProtocol != null) {
                        processor = upgradeProtocol.getProcessor(
                                wrapper, getProtocol().getAdapter());
                    } else if (negotiatedProtocol.equals("http/1.1")) {
                        // Explicitly negotiated the default protocol.
                        // Obtain a processor below.
                    } else {
                        // TODO:
                        // OpenSSL 1.0.2's ALPN callback doesn't support
                        // failing the handshake with an error if no
                        // protocol can be negotiated. Therefore, we need to
                        // fail the connection here. Once this is fixed,
                        // replace the code below with the commented out
                        // block.
                        if (getLog().isDebugEnabled()) {
                            getLog().debug(sm.getString(
                                "abstractConnectionHandler.negotiatedProcessor.fail",
                                negotiatedProtocol));
                        }
                        return SocketState.CLOSED;
                        /*
                         * To replace the code above once OpenSSL 1.1.0 is
                         * used.
                        // Failed to create processor. This is a bug.
                        throw new IllegalStateException(sm.getString(
                                "abstractConnectionHandler.negotiatedProcessor.fail",
                                negotiatedProtocol));
                        */
                    }
                }
            }
            if (processor == null) {
                processor = recycledProcessors.pop();
                if (getLog().isDebugEnabled()) {
                    getLog().debug(sm.getString("abstractConnectionHandler.processorPop",
                            processor));
                }
            }
            if (processor == null) {
                processor = getProtocol().createProcessor();
                register(processor);
            }
    
            processor.setSslSupport(
                    wrapper.getSslSupport(getProtocol().getClientCertProvider()));
    
            // Associate the processor with the connection
            connections.put(socket, processor);
    
            SocketState state = SocketState.CLOSED;
            do {
                // 关键的代码,终于找到你了
                state = processor.process(wrapper, status);
    
                if (state == SocketState.UPGRADING) {
                    // Get the HTTP upgrade handler
                    UpgradeToken upgradeToken = processor.getUpgradeToken();
                    // Retrieve leftover input
                    ByteBuffer leftOverInput = processor.getLeftoverInput();
                    if (upgradeToken == null) {
                        // Assume direct HTTP/2 connection
                        UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");
                        if (upgradeProtocol != null) {
                            processor = upgradeProtocol.getProcessor(
                                    wrapper, getProtocol().getAdapter());
                            wrapper.unRead(leftOverInput);
                            // Associate with the processor with the connection
                            connections.put(socket, processor);
                        } else {
                            if (getLog().isDebugEnabled()) {
                                getLog().debug(sm.getString(
                                    "abstractConnectionHandler.negotiatedProcessor.fail",
                                    "h2c"));
                            }
                            return SocketState.CLOSED;
                        }
                    } else {
                        HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
                        // Release the Http11 processor to be re-used
                        release(processor);
                        // Create the upgrade processor
                        processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);
                        if (getLog().isDebugEnabled()) {
                            getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
                                    processor, wrapper));
                        }
                        wrapper.unRead(leftOverInput);
                        // Mark the connection as upgraded
                        wrapper.setUpgraded(true);
                        // Associate with the processor with the connection
                        connections.put(socket, processor);
                        // Initialise the upgrade handler (which may trigger
                        // some IO using the new protocol which is why the lines
                        // above are necessary)
                        // This cast should be safe. If it fails the error
                        // handling for the surrounding try/catch will deal with
                        // it.
                        if (upgradeToken.getInstanceManager() == null) {
                            httpUpgradeHandler.init((WebConnection) processor);
                        } else {
                            ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
                            try {
                                httpUpgradeHandler.init((WebConnection) processor);
                            } finally {
                                upgradeToken.getContextBind().unbind(false, oldCL);
                            }
                        }
                    }
                }
            } while ( state == SocketState.UPGRADING);
    
            if (state == SocketState.LONG) {
                // In the middle of processing a request/response. Keep the
                // socket associated with the processor. Exact requirements
                // depend on type of long poll
                longPoll(wrapper, processor);
                if (processor.isAsync()) {
                    getProtocol().addWaitingProcessor(processor);
                }
            } else if (state == SocketState.OPEN) {
                // In keep-alive but between requests. OK to recycle
                // processor. Continue to poll for the next request.
                connections.remove(socket);
                release(processor);
                wrapper.registerReadInterest();
            } else if (state == SocketState.SENDFILE) {
                // Sendfile in progress. If it fails, the socket will be
                // closed. If it works, the socket either be added to the
                // poller (or equivalent) to await more data or processed
                // if there are any pipe-lined requests remaining.
            } else if (state == SocketState.UPGRADED) {
                // Don't add sockets back to the poller if this was a
                // non-blocking write otherwise the poller may trigger
                // multiple read events which may lead to thread starvation
                // in the connector. The write() method will add this socket
                // to the poller if necessary.
                if (status != SocketEvent.OPEN_WRITE) {
                    longPoll(wrapper, processor);
                }
            } else if (state == SocketState.SUSPENDED) {
                // Don't add sockets back to the poller.
                // The resumeProcessing() method will add this socket
                // to the poller.
            } else {
                // Connection closed. OK to recycle the processor. Upgrade
                // processors are not recycled.
                connections.remove(socket);
                if (processor.isUpgrade()) {
                    UpgradeToken upgradeToken = processor.getUpgradeToken();
                    HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
                    InstanceManager instanceManager = upgradeToken.getInstanceManager();
                    if (instanceManager == null) {
                        httpUpgradeHandler.destroy();
                    } else {
                        ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
                        try {
                            httpUpgradeHandler.destroy();
                        } finally {
                            try {
                                instanceManager.destroyInstance(httpUpgradeHandler);
                            } catch (Throwable e) {
                                ExceptionUtils.handleThrowable(e);
                                getLog().error(sm.getString("abstractConnectionHandler.error"), e);
                            }
                            upgradeToken.getContextBind().unbind(false, oldCL);
                        }
                    }
                } else {
                    release(processor);
                }
            }
            return state;
        } catch(java.net.SocketException e) {
            // SocketExceptions are normal
            getLog().debug(sm.getString(
                    "abstractConnectionHandler.socketexception.debug"), e);
        } catch (java.io.IOException e) {
            // IOExceptions are normal
            getLog().debug(sm.getString(
                    "abstractConnectionHandler.ioexception.debug"), e);
        } catch (ProtocolException e) {
            // Protocol exceptions normally mean the client sent invalid or
            // incomplete data.
            getLog().debug(sm.getString(
                    "abstractConnectionHandler.protocolexception.debug"), e);
        }
        // Future developers: if you discover any other
        // rare-but-nonfatal exceptions, catch them here, and log as
        // above.
        catch (Throwable e) {
            ExceptionUtils.handleThrowable(e);
            // any other exception or error is odd. Here we log it
            // with "ERROR" level, so it will show up even on
            // less-than-verbose logs.
            getLog().error(sm.getString("abstractConnectionHandler.error"), e);
        } finally {
            ContainerThreadMarker.clear();
        }
    
        // Make sure socket/processor is removed from the list of current
        // connections
        connections.remove(socket);
        release(processor);
        return SocketState.CLOSED;
    }
    

    Processor

    这儿我们主要关注的是Processor对于读的操作,也只有一行代码。调用service()方法。

    public abstract class AbstractProcessorLight implements Processor {
    
        @Override
        public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
                throws IOException {
    
            SocketState state = SocketState.CLOSED;
            Iterator<DispatchType> dispatches = null;
            do {
                if (dispatches != null) {
                    DispatchType nextDispatch = dispatches.next();
                    state = dispatch(nextDispatch.getSocketStatus());
                } else if (status == SocketEvent.DISCONNECT) {
                    // Do nothing here, just wait for it to get recycled
                } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
                    state = dispatch(status);
                    if (state == SocketState.OPEN) {
                        // There may be pipe-lined data to read. If the data isn't
                        // processed now, execution will exit this loop and call
                        // release() which will recycle the processor (and input
                        // buffer) deleting any pipe-lined data. To avoid this,
                        // process it now.
                        state = service(socketWrapper);
                    }
                } else if (status == SocketEvent.OPEN_WRITE) {
                    // Extra write event likely after async, ignore
                    state = SocketState.LONG;
                } else if (status == SocketEvent.OPEN_READ){
                    // 调用`service()`方法
                    state = service(socketWrapper);
                } else {
                    // Default to closing the socket if the SocketEvent passed in
                    // is not consistent with the current state of the Processor
                    state = SocketState.CLOSED;
                }
    
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Socket: [" + socketWrapper +
                            "], Status in: [" + status +
                            "], State out: [" + state + "]");
                }
    
                if (state != SocketState.CLOSED && isAsync()) {
                    state = asyncPostProcess();
                    if (getLog().isDebugEnabled()) {
                        getLog().debug("Socket: [" + socketWrapper +
                                "], State after async post processing: [" + state + "]");
                    }
                }
    
                if (dispatches == null || !dispatches.hasNext()) {
                    // Only returns non-null iterator if there are
                    // dispatches to process.
                    dispatches = getIteratorAndClearDispatches();
                }
            } while (state == SocketState.ASYNC_END ||
                    dispatches != null && state != SocketState.CLOSED);
    
            return state;
        }
    }
    

    Processor.service()方法比较重要的地方就两点。该方法非常得长,也超过了200行,在此我们不再拷贝此方法的代码。

    1. 生成Request和Response对象
    2. 调用Adapter.service()方法,将生成的Request和Response对象传进去

    Adapter

    Adapter用于连接ConnectorContainer,起到承上启下的作用。Processor会调用Adapter.service()方法。我们来分析一下,主要做了下面几件事情:

    1. 根据coyote框架的request和response对象,生成connector的request和response对象(是HttpServletRequest和HttpServletResponse的封装)
    2. 补充header
    3. 解析请求,该方法会出现代理服务器、设置必要的header等操作
    4. 真正进入容器的地方,调用Engine容器下pipeline的阀门
    @Override
    public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
            throws Exception {
    
        // 1. 根据coyote框架的request和response对象,生成connector的request和response对象(是HttpServletRequest和HttpServletResponse的封装)
        Request request = (Request) req.getNote(ADAPTER_NOTES);
        Response response = (Response) res.getNote(ADAPTER_NOTES);
    
        if (request == null) {
            // Create objects
            request = connector.createRequest();
            request.setCoyoteRequest(req);
            response = connector.createResponse();
            response.setCoyoteResponse(res);
    
            // Link objects
            request.setResponse(response);
            response.setRequest(request);
    
            // Set as notes
            req.setNote(ADAPTER_NOTES, request);
            res.setNote(ADAPTER_NOTES, response);
    
            // Set query string encoding
            req.getParameters().setQueryStringCharset(connector.getURICharset());
        }
    
        // 2. 补充header
        if (connector.getXpoweredBy()) {
            response.addHeader("X-Powered-By", POWERED_BY);
        }
    
        boolean async = false;
        boolean postParseSuccess = false;
    
        req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
    
        try {
            // Parse and set Catalina and configuration specific
            // request parameters
            // 3. 解析请求,该方法会出现代理服务器、设置必要的header等操作
            postParseSuccess = postParseRequest(req, request, res, response);
            if (postParseSuccess) {
                //check valves if we support async
                request.setAsyncSupported(
                        connector.getService().getContainer().getPipeline().isAsyncSupported());
                // Calling the container
                // 4. 真正进入容器的地方,调用Engine容器下pipeline的阀门
                connector.getService().getContainer().getPipeline().getFirst().invoke(
                        request, response);
            }
            if (request.isAsync()) {
                async = true;
                ReadListener readListener = req.getReadListener();
                if (readListener != null && request.isFinished()) {
                    // Possible the all data may have been read during service()
                    // method so this needs to be checked here
                    ClassLoader oldCL = null;
                    try {
                        oldCL = request.getContext().bind(false, null);
                        if (req.sendAllDataReadEvent()) {
                            req.getReadListener().onAllDataRead();
                        }
                    } finally {
                        request.getContext().unbind(false, oldCL);
                    }
                }
    
                Throwable throwable =
                        (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
    
                // If an async request was started, is not going to end once
                // this container thread finishes and an error occurred, trigger
                // the async error process
                if (!request.isAsyncCompleting() && throwable != null) {
                    request.getAsyncContextInternal().setErrorState(throwable, true);
                }
            } else {
                request.finishRequest();
                response.finishResponse();
            }
    
        } catch (IOException e) {
            // Ignore
        } finally {
            AtomicBoolean error = new AtomicBoolean(false);
            res.action(ActionCode.IS_ERROR, error);
    
            if (request.isAsyncCompleting() && error.get()) {
                // Connection will be forcibly closed which will prevent
                // completion happening at the usual point. Need to trigger
                // call to onComplete() here.
                res.action(ActionCode.ASYNC_POST_PROCESS,  null);
                async = false;
            }
    
            // Access log
            if (!async && postParseSuccess) {
                // Log only if processing was invoked.
                // If postParseRequest() failed, it has already logged it.
                Context context = request.getContext();
                // If the context is null, it is likely that the endpoint was
                // shutdown, this connection closed and the request recycled in
                // a different thread. That thread will have updated the access
                // log so it is OK not to update the access log here in that
                // case.
                if (context != null) {
                    context.logAccess(request, response,
                            System.currentTimeMillis() - req.getStartTime(), false);
                }
            }
    
            req.getRequestProcessor().setWorkerThreadName(null);
    
            // Recycle the wrapper request and response
            if (!async) {
                request.recycle();
                response.recycle();
            }
        }
    }
    

    总结

    本文我们首先抛出了理解Connector前需要解答的4个问题。然后给出了整体结构图,并分析结构图中的各个组件及其关联关系。最后,我们根据整体结构图分析了Connector的启动逻辑和请求逻辑(内部逻辑可谓是非常细节和复杂)。

    通过上面的源码分析,我们终于清楚了Connector解决了什么问题结构是怎样的内部又是如何工作的

    参考链接

  • 相关阅读:
    linux利用yum下载rpm离线包
    Struts Spring Plugin注意点
    Spring 对没有实现接口的类使用aspect的时候,可以使用CGLIB
    Spring HibernateTemplate
    Spring 声明式事务管理
    Spring 配置dataSource和sessionFactory
    Spring 配置中的 ${}
    Srping AOP xml方式
    Spring AOP 面向切面编程相关注解
    Spring 常用注入注解(annotation)和其对应xml标签
  • 原文地址:https://www.cnblogs.com/nizuimeiabc1/p/13060061.html
Copyright © 2011-2022 走看看