zoukankan      html  css  js  c++  java
  • Mina源码研究

    目录

    1. NioSocketAcceptor初始化源码研究

    1.1 类图

    1.2 方法调用时序图

    1.3 初始化NioSocketAcceptor

    1.4 SimpleIoProcessorPool初始化分析

    1.5 NioProcessor的源码

    1.6 总结

    2. NioSocketAcceptor bind方法源码研究

    2.1 创建ServerSocket监听

    2.1.1 时序图

    2.1.2 bind方法

    2.1.3 startupAcceptor方法

    2.1.4 创建ServerSocket监听

    2.2 accpet客户端连接请求

    2.3 读写监听及处理

    Mina版本为2.09

    1. NioSocketAcceptor初始化源码研究

    初始化服务端acceptor的代码如下:

    IoAcceptor acceptor = new NioSocketAcceptor();

    那么它到底做了些什么呢,我们一起来看看源代码

    先贴出类图和类调用时序图,给大家看个大概:

    1.1 类图

    clip_image002

    1.2 方法调用时序图

    clip_image004

    1.3 初始化NioSocketAcceptor

    调用构造方法代码如下:

    NioSocketAcceptor类

    public NioSocketAcceptor() {
    
        super(new DefaultSocketSessionConfig(), NioProcessor.class);
    
        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
    
    }

    注意参数NioProcessor.class,这将是后面processor池中对象的具体类型

    继续调用父类AbstractPollingIoAcceptor的构造方法

    代码编号1.2

    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
            this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
    }

    注意传参new SimpleIoProcessorPool<S>(processorClass),processorClass实际是NioProcessor.class

    然后继续

    private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
                boolean createdProcessor, SelectorProvider selectorProvider) {
            super(sessionConfig, executor);
    
            if (processor == null) {
                throw new IllegalArgumentException("processor");
            }
            
            //代码编号1.3:赋值给processor
            this.processor = processor;
            this.createdProcessor = createdProcessor;
    
            try {
                // Initialize the selector
                //代码编号1.4:初始化Selector
                init(selectorProvider);
    
                // The selector is now ready, we can switch the
                // flag to true so that incoming connection can be accepted
                selectable = true;
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new RuntimeIoException("Failed to initialize.", e);
            } finally {
                if (!selectable) {
                    try {
                        destroy();
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    }
                }
            }
        }

    注释:

    1. 代码编码1.3:赋值给AbstractPollingIoAcceptor的processor,其实际类型为SimpleIoProcessorPool

    那么接下来继续看代码编号1.4

    @Override
        protected void init(SelectorProvider selectorProvider) throws Exception {
            this.selectorProvider = selectorProvider;
    
            if (selectorProvider == null) {
                selector = Selector.open();
            } else {
                selector = selectorProvider.openSelector();
            }
        }

    这里初始化了selector,该selector用于注册客户端连接的事件

    那么我们再画一个类图,分别看看processor和selector的位置:

    clip_image006

    1.4 SimpleIoProcessorPool初始化分析

    从代码编号1.2红色部分点击进入

    public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) {
            this(processorType, null, DEFAULT_SIZE, null);
    }

    继续进入

    public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size, SelectorProvider selectorProvider) {
            if (processorType == null) {
                throw new IllegalArgumentException("processorType");
            }
    
            if (size <= 0) {
                throw new IllegalArgumentException("size: " + size + " (expected: positive integer)");
            }
    
            // Create the executor if none is provided
            createdExecutor = (executor == null);
    
            if (createdExecutor) {
                this.executor = Executors.newCachedThreadPool();
                // Set a default reject handler
                ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            } else {
                this.executor = executor;
            }
            
            //代码编号1.2.0:IoProcessor池,注意这里的size为电脑cpu核数+1,后面为线程的最大数
            pool = new IoProcessor[size];
    
            boolean success = false;
            Constructor<? extends IoProcessor<S>> processorConstructor = null;
            boolean usesExecutorArg = true;
    
            try {
                // We create at least one processor
                    // 代码编号1.2.1:初始化至少一个processor,这里的processorType为NioProcessor 
                try {
                    try {
                     processorConstructor = processorType.getConstructor(ExecutorService.class);
                        pool[0] = processorConstructor.newInstance(this.executor);
                    } catch (NoSuchMethodException e1) {
                        // To the next step...
                        try {
                            if(selectorProvider==null) {
                               processorConstructor = processorType.getConstructor(Executor.class);
                                //代码编号1.2.2 Executor为线程池  
                                pool[0] = processorConstructor.newInstance(this.executor);
                            } else {
                                processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class);
                                pool[0] = processorConstructor.newInstance(this.executor,selectorProvider);
                            }
                        } catch (NoSuchMethodException e2) {
                            // To the next step...
                            try {
                                processorConstructor = processorType.getConstructor();
                                usesExecutorArg = false;
                                pool[0] = processorConstructor.newInstance();
                            } catch (NoSuchMethodException e3) {
                                // To the next step...
                            }
                        }
                    }
                } catch (RuntimeException re) {
                    LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
                    throw re;
                } catch (Exception e) {
                    String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
                    LOGGER.error(msg, e);
                    throw new RuntimeIoException(msg, e);
                }
    
                if (processorConstructor == null) {
                    // Raise an exception if no proper constructor is found.
                    String msg = String.valueOf(processorType) + " must have a public constructor with one "
                            + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one "
                            + Executor.class.getSimpleName() + " parameter or a public default constructor.";
                    LOGGER.error(msg);
                    throw new IllegalArgumentException(msg);
                }
    
                // Constructor found now use it for all subsequent instantiations
                for (int i = 1; i < pool.length; i++) {
                    try {
                        if (usesExecutorArg) {
                            if(selectorProvider==null) {
                                pool[i] = processorConstructor.newInstance(this.executor);
                            } else {
                                pool[i] = processorConstructor.newInstance(this.executor, selectorProvider);
                            }
                        } else {
                            pool[i] = processorConstructor.newInstance();
                        }
                    } catch (Exception e) {
                        // Won't happen because it has been done previously
                    }
                }
    
                success = true;
            } finally {
                if (!success) {
                    dispose();
                }
            }
        }

    注意看代码编号1.2.0-1.2.2

    1.5 NioProcessor的源码

    public NioProcessor(Executor executor) {
            super(executor);
    
            try {
                // Open a new selector
                
    selector =
     Selector.open();
            } catch (IOException e) {
                throw new RuntimeIoException("Failed to open a selector.", e);
            }
        }

    NioProcessor也维护一个selector,用户监听读写事件

    1.6 总结

    经过分析,NioSocketAcceptor初始化做了如下事情:

    1) 建立processor池SimpleIoProcessorPool,初始化池中的对象NioProcessor

    2) 初始化NioSocketAcceptor的Selector,监听客户端连接事件

    3) 初始化NioProcessor中的Selector,监听读写事件

    2. NioSocketAcceptor bind方法源码研究

    2.1 创建ServerSocket监听

    2.1.1 时序图

    clip_image008

    2.1.2 bind方法

    API使用示例:

    public final void bind(SocketAddress localAddress) throws IOException {
       if (localAddress == null) {
           throw new IllegalArgumentException("localAddress");
       }
    
       List<SocketAddress> localAddresses = new ArrayList<SocketAddress>(1);
       localAddresses.add(localAddress);
       //代码编号2.1.2.1
       bind(localAddresses);
    }

    进入代码编号2.1.2.1: AbstractIoAcceptor类

    public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException {
            if (isDisposing()) {
                throw new IllegalStateException("Already disposed.");
            }
    
            if (localAddresses == null) {
                throw new IllegalArgumentException("localAddresses");
            }
    
            List<SocketAddress> localAddressesCopy = new ArrayList<SocketAddress>();
    
            for (SocketAddress a : localAddresses) {
                checkAddressType(a);
                localAddressesCopy.add(a);
            }
    
            if (localAddressesCopy.isEmpty()) {
                throw new IllegalArgumentException("localAddresses is empty.");
            }
    
            boolean activate = false;
            synchronized (bindLock) {
                synchronized (boundAddresses) {
                    if (boundAddresses.isEmpty()) {
                        activate = true;
                    }
                }
    
                if (getHandler() == null) {
                    throw new IllegalStateException("handler is not set.");
                }
    
                try {
                    //代码编号2.1.2.2
                    Set<SocketAddress> addresses = bindInternal(localAddressesCopy);
                    
                    synchronized (boundAddresses) {
                        boundAddresses.addAll(addresses);
                    }
                } catch (IOException e) {
                    throw e;
                } catch (RuntimeException e) {
                    throw e;
                } catch (Exception e) {
                    throw new RuntimeIoException("Failed to bind to: " + getLocalAddresses(), e);
                }
            }
    
            if (activate) {
                getListeners().fireServiceActivated();
            }
        }

    进入代码编号2.1.2.2,AbstractPollingIoAcceptor类

    protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
            // Create a bind request as a Future operation. When the selector
            // have handled the registration, it will signal this future.
            AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
    
            // adds the Registration request to the queue for the Workers
            // to handle
            registerQueue.add(request);
    
            // creates the Acceptor instance and has the local
            // executor kick it off.
            //代码编号2.1.2.3
            startupAcceptor();
            
            // As we just started the acceptor, we have to unblock the select()
            // in order to process the bind request we just have added to the
            // registerQueue.
            try {
                lock.acquire();
    
                // Wait a bit to give a chance to the Acceptor thread to do the select()
                Thread.sleep(10);
                wakeup();
            } finally {
                lock.release();
            }
    
            // Now, we wait until this request is completed.
            request.awaitUninterruptibly();
    
            if (request.getException() != null) {
                throw request.getException();
            }
    
            // Update the local addresses.
            // setLocalAddresses() shouldn't be called from the worker thread
            // because of deadlock.
            Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
    
            for (H handle : boundHandles.values()) {
                newLocalAddresses.add(localAddress(handle));
            }
    
            return newLocalAddresses;
        }
    2.1.3 startupAcceptor方法

    进入代码编号2.1.2.3 (AbstractPollingIoAcceptor)

    private void startupAcceptor() throws InterruptedException {
            // If the acceptor is not ready, clear the queues
            // TODO : they should already be clean : do we have to do that ?
            if (!selectable) {
                registerQueue.clear();
                cancelQueue.clear();
            }
    
            // start the acceptor if not already started
            //当前线程中获取,为同步
            Acceptor acceptor = acceptorRef.get();
    
            if (acceptor == null) {
                lock.acquire();
                //代码编号2.1.3.1:建立一个acceptor,即服务端启动服务,acceptor为runnable
                acceptor = new Acceptor();
    
                if (acceptorRef.compareAndSet(null, acceptor)) {
                    //代码编号2.1.3.2:开启线程
                    executeWorker(acceptor);
                } else {
                    lock.release();
                }
            }
        }

    进入代码编号2.1.3.1 (AbstractPollingIoAcceptor.Acceptor)

    private class Acceptor implements Runnable {
            public void run() {
                assert (acceptorRef.get() == this);
    
                int nHandles = 0;
    
                // Release the lock
                lock.release();
    
                while (selectable) {
                    try {
                        // Detect if we have some keys ready to be processed
                        // The select() will be woke up if some new connection
                        // have occurred, or if the selector has been explicitly
                        // woke up
                        //代码编号2.1.3.2:监听
                        int selected = select();
    
                        // this actually sets the selector to OP_ACCEPT,
                        // and binds to the port on which this class will
                        // listen on
                        //代码编号2.1.3.3:将OP_ACCEPT事件注册到Selector
                        nHandles += registerHandles();
    
                        // Now, if the number of registred handles is 0, we can
                        // quit the loop: we don't have any socket listening
                        // for incoming connection.
                        if (nHandles == 0) {
                            acceptorRef.set(null);
    
                            if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
                                assert (acceptorRef.get() != this);
                                break;
                            }
    
                            if (!acceptorRef.compareAndSet(null, this)) {
                                assert (acceptorRef.get() != this);
                                break;
                            }
    
                            assert (acceptorRef.get() == this);
                        }
    
                        if (selected > 0) {
                            // We have some connection request, let's process
                            // them here.
                            //代码编号2.1.3.4:处理OP_ACCEPT事件,即处理客户端连接请求
                            processHandles(selectedHandles());
                        }
    
                        // check to see if any cancellation request has been made.
                        nHandles -= unregisterHandles();
                    } catch (ClosedSelectorException cse) {
                        // If the selector has been closed, we can exit the loop
                        ExceptionMonitor.getInstance().exceptionCaught(cse);
                        break;
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
    
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
                            ExceptionMonitor.getInstance().exceptionCaught(e1);
                        }
                    }
                }
    
                // Cleanup all the processors, and shutdown the acceptor.
                if (selectable && isDisposing()) {
                    selectable = false;
                    try {
                        if (createdProcessor) {
                            processor.dispose();
                        }
                    } finally {
                        try {
                            synchronized (disposalLock) {
                                if (isDisposing()) {
                                    destroy();
                                }
                            }
                        } catch (Exception e) {
                            ExceptionMonitor.getInstance().exceptionCaught(e);
                        } finally {
                            disposalFuture.setDone();
                        }
                    }
                }
            }
    2.1.4 创建ServerSocket监听

    进入代码代码编号2.1.3.3AbstractPollingIoAcceptor类),将OP_ACCEPT事件注册到Selector

    private int registerHandles() {
            for (;;) {
                // The register queue contains the list of services to manage
                // in this acceptor.
                AcceptorOperationFuture future = registerQueue.poll();
    
                if (future == null) {
                    return 0;
                }
    
                // We create a temporary map to store the bound handles,
                // as we may have to remove them all if there is an exception
                // during the sockets opening.
                Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
                List<SocketAddress> localAddresses = future.getLocalAddresses();
    
                try {
                    // Process all the addresses
                    for (SocketAddress a : localAddresses) {
                        //代码编号2.1.4.1:开启通道
                        H handle = open(a);
                        newHandles.put(localAddress(handle), handle);
                    }
    
                    // Everything went ok, we can now update the map storing
                    // all the bound sockets.
                    boundHandles.putAll(newHandles);
    
                    // and notify.
                    future.setDone();
                    return newHandles.size();
                } catch (Exception e) {
                    // We store the exception in the future
                    future.setException(e);
                } finally {
                    // Roll back if failed to bind all addresses.
                    if (future.getException() != null) {
                        for (H handle : newHandles.values()) {
                            try {
                                close(handle);
                            } catch (Exception e) {
                                ExceptionMonitor.getInstance().exceptionCaught(e);
                            }
                        }
    
                        // TODO : add some comment : what is the wakeup() waking up ?
                        wakeup();
                    }
                }
            }
        }

    进入代码代码编号2.1.4.1AbstractPollingIoAcceptor类)

    protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
            // Creates the listening ServerSocket
            //创建ServerSocket监听
            ServerSocketChannel channel = null;
    
            if (selectorProvider != null) {
                channel = selectorProvider.openServerSocketChannel();
            } else {
                //开启通道
                channel = ServerSocketChannel.open();
            }
    
            boolean success = false;
    
            try {
                // This is a non blocking socket channel
                // 设置为非阻塞
                channel.configureBlocking(false);
    
                // Configure the server socket,
                ServerSocket socket = channel.socket();
    
                // Set the reuseAddress flag accordingly with the setting
                socket.setReuseAddress(isReuseAddress());
    
                // and bind.
                try {
                    socket.bind(localAddress, getBacklog());
                } catch (IOException ioe) {
                    // Add some info regarding the address we try to bind to the
                    // message
                    String newMessage = "Error while binding on " + localAddress + "
    " + "original message : "
                            + ioe.getMessage();
                    Exception e = new IOException(newMessage);
                    e.initCause(ioe.getCause());
    
                    // And close the channel
                    channel.close();
    
                    throw e;
                }
    
                // Register the channel within the selector for ACCEPT event
                //注册OP_ACCEPT监听事件
                channel.register(selector, SelectionKey.OP_ACCEPT);
                success = true;
            } finally {
                if (!success) {
                    close(channel);
                }
            }
            return channel;
        }

    2.2 accpet客户端连接请求

    时序图:

    clip_image010

    进入代码编号2.1.3.4AbstractPollingIoAcceptor. Acceptor类)

    private void processHandles(Iterator<H> handles) throws Exception {
                while (handles.hasNext()) {
                    H handle = handles.next();
                    handles.remove();
    
                    // Associates a new created connection to a processor,
                    // and get back a session
                    //代码2.4.0:accept客户端连接,创建Session
                    S session = accept(processor, handle);
    
                    if (session == null) {
                        continue;
                    }
                    //代码2.4.1:初始化Session
                    initSession(session, null, null);
    
                    // add the session to the SocketIoProcessor
                    //代码2.4.2:
                    session.getProcessor().add(session);
                }
            }

    进入代码2.4.0NioSocketAcceptor类)

    protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
    
            SelectionKey key = null;
    
            if (handle != null) {
                key = handle.keyFor(selector);
            }
    
            if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
                return null;
            }
    
            // accept the connection from the client
            //代码2.4.3 accept客户端连接
            SocketChannel ch = handle.accept();
    
            if (ch == null) {
                return null;
            }
            //代码2.4.4: 创建Session并返回
            return new NioSocketSession(this, processor, ch);
        }

    回到processHandles方法,进入代码2.4.2 SimpleIoProcessor.add(S session);

    public final void add(S session) {
            //代码2.4.5: getProcessor 返回NioProcessor,并将session加入NioProcessor
            getProcessor(session).add(session);
        }

    代码2.4.5进入getProcessor方法(SimpleIoProcessor)

    private IoProcessor<S> getProcessor(S session) {
            IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
    
            if (processor == null) {
                if (disposed || disposing) {
                    throw new IllegalStateException("A disposed processor cannot be accessed.");
                }
    
                processor = pool[Math.abs((int) session.getId()) % pool.length];
    
                if (processor == null) {
                    throw new IllegalStateException("A disposed processor cannot be accessed.");
                }
    
                session.setAttributeIfAbsent(PROCESSOR, processor);
            }
    
            return processor;
        }

    该段代码的作用

    根据session从processor池中获取一个processor

    不同的session可能对应同一个processor

    进入代码2.4.5: AbstractPollingIoProcessor.add(S session)

    public final void add(S session) {
            if (disposed || disposing) {
                throw new IllegalStateException("Already disposed.");
            }
    
            // Adds the session to the newSession queue and starts the worker
            //将session加入newSession队列
            newSessions.add(session);
            //代码2.4.6: 开启processor线程
            startupProcessor();
        }

    我们现在再看一下NioProcessor的类结构图

    clip_image012

    开启processor线程

    进入代码2.4.6AbstractPollingIoProcessor类

    private void startupProcessor() {
               //代码2.4.7:线程安全处理
            Processor processor = processorRef.get();
    
            if (processor == null) {
                  //代码2.4.8:如果该对象没有绑定Processor,则新建一个
                processor = new Processor();
                //线程安全处理
                if (processorRef.compareAndSet(null, processor)) {
                     //代码2.4.9:开启Processor线程
                    executor.execute(new NamePreservingRunnable(processor, threadName));
                }
            }
    
            // Just stop the select() and start it again, so that the processor
            // can be activated immediately.
            wakeup();
        }

    2.3 读写监听及处理

    时序图:

    clip_image014

    进入代码2.4.8AbstractPollingIoProcessor.Processor

    private class Processor implements Runnable {
            public void run() {
                assert (processorRef.get() == this);
    
                int nSessions = 0;
                lastIdleCheckTime = System.currentTimeMillis();
    
                for (;;) {
                    try {
                        // This select has a timeout so that we can manage
                        // idle session when we get out of the select every
                        // second. (note : this is a hack to avoid creating
                        // a dedicated thread).
                        long t0 = System.currentTimeMillis();
                        //代码2.3.0:监听读写事件
                           int selected = select(SELECT_TIMEOUT);
                        long t1 = System.currentTimeMillis();
                        long delta = (t1 - t0);
    
                     //代码2.3.1:此段代码不清楚
                        if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
                            // Last chance : the select() may have been
                            // interrupted because we have had an closed channel.
                            if (isBrokenConnection()) {
                                LOG.warn("Broken connection");
    
                                // we can reselect immediately
                                // set back the flag to false
                                wakeupCalled.getAndSet(false);
    
                                continue;
                            } else {
                                LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                                // Ok, we are hit by the nasty epoll
                                // spinning.
                                // Basically, there is a race condition
                                // which causes a closing file descriptor not to be
                                // considered as available as a selected channel, but
                                // it stopped the select. The next time we will
                                // call select(), it will exit immediately for the same
                                // reason, and do so forever, consuming 100%
                                // CPU.
                                // We have to destroy the selector, and
                                // register all the socket on a new one.
                                registerNewSelector();
                            }
    
                            // Set back the flag to false
                            wakeupCalled.getAndSet(false);
    
                            // and continue the loop
                            continue;
                        }
    
                        // Manage newly created session first
                        nSessions += handleNewSessions();
    
                        updateTrafficMask();
    
                        // Now, if we have had some incoming or outgoing events,
                        // deal with them
                        if (selected > 0) {
                            //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
                            //代码2.3.2:处理读写
                            process();
                        }
    
                        // Write the pending requests
                        long currentTime = System.currentTimeMillis();
                        flush(currentTime);
    
                        // And manage removed sessions
                        nSessions -= removeSessions();
    
                        // Last, not least, send Idle events to the idle sessions
                        notifyIdleSessions(currentTime);
    
                        // Get a chance to exit the infinite loop if there are no
                        // more sessions on this Processor
                        if (nSessions == 0) {
                            processorRef.set(null);
    
                            if (newSessions.isEmpty() && isSelectorEmpty()) {
                                // newSessions.add() precedes startupProcessor
                                assert (processorRef.get() != this);
                                break;
                            }
    
                            assert (processorRef.get() != this);
    
                            if (!processorRef.compareAndSet(null, this)) {
                                // startupProcessor won race, so must exit processor
                                assert (processorRef.get() != this);
                                break;
                            }
    
                            assert (processorRef.get() == this);
                        }
    
                        // Disconnect all sessions immediately if disposal has been
                        // requested so that we exit this loop eventually.
                        if (isDisposing()) {
                            for (Iterator<S> i = allSessions(); i.hasNext();) {
                                scheduleRemove(i.next());
                            }
    
                            wakeup();
                        }
                    } catch (ClosedSelectorException cse) {
                        // If the selector has been closed, we can exit the loop
                        // But first, dump a stack trace
                        ExceptionMonitor.getInstance().exceptionCaught(cse);
                        break;
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
    
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
                            ExceptionMonitor.getInstance().exceptionCaught(e1);
                        }
                    }
                }
    
                try {
                    synchronized (disposalLock) {
                        if (disposing) {
                            doDispose();
                        }
                    }
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);
                } finally {
                    disposalFuture.setValue(true);
                }
            }
        }

    进入代码2.3.2AbstractPollingIoProcessor类)

    private void process() throws Exception {
            for (Iterator<S> i = selectedSessions(); i.hasNext();) {
                S session = i.next();
                //代码2.3.3:
                process(session);
                i.remove();
            }
    }

    进入代码2.3.3AbstractPollingIoProcessor类)

    该段代码处理读写

    private void process(S session) {
            // Process Reads
            //代码2.3.4:处理读
            if (isReadable(session) && !session.isReadSuspended()) {
                read(session);
            }
    
            // Process writes
           //代码2.3.5:处理写
            if (isWritable(session) && !session.isWriteSuspended()) {
                // add the session to the queue, if it's not already there
                if (session.setScheduledForFlush(true)) {
                    flushingSessions.add(session);
                }
            }
        }

    2.4 读Process

  • 相关阅读:
    Delphi Help
    RAD 10 新控件 TSearchBox TSplitView
    滚动条
    c++builder Active Form
    chart左侧
    RAD 10 蓝牙
    浏览器插件 火狐插件
    c++builder delphi 调用dll dll编写
    模拟键盘 键盘虚拟代码
    oracle怎么把一个用户下的表复制给另一个用户?(授予表权限)
  • 原文地址:https://www.cnblogs.com/tangyanbo/p/4297361.html
Copyright © 2011-2022 走看看