zoukankan      html  css  js  c++  java
  • NIO框架之MINA源代码解析(二):mina核心引擎

    NIO框架之MINA源代码解析(一):背景



    MINA的底层还是利用了jdk提供了nio功能,mina仅仅是对nio进行封装。包含MINA用的线程池都是jdk直接提供的。


    MINA的server端主要有accept、processor、session三部分组成的。当中accept主要负责在指定的port监听。若有新连接则建立一个新的session。processor则负责处理session相应的发送数据和接收数据并调用上层处理;session则缓存当前连接数据

    MINA採用了线程懒启动的技术,即最少启动线程。在MINA server启动的时候,仅仅有一个线程-accept,而且accept线程仅仅有一个,在指定的port进行监听(能够同一时候监听多个port,mina能够绑定多port)。


    1、acceptor


    先看下acceptor的主要类图吧。



    mina server的启动入口是在NioSocketAcceptor.bind(InetSocketAddress)或者NioSocketAcceptor.bind(SocketAddress...)方法, acceptor.bind(new InetSocketAddress(1234));  
    然后会调用AbstractPollingIoAcceptor.bindInternal(List<? extends SocketAddress>)方法,在bindInternal方法里面会调用startupAcceptor()方法提交一个accept线程到线程池里面(仅仅提交一次),并初始化acceptor端的Selector。就这样一个acceptor线程启动了。

    acceptor端业务相对简单,相当于在当前Selector里面监听acceptor事件,处理新连接并新建一个session放到相应的processor里面。




    acceptor 代码。非常easy。

     private class Acceptor implements Runnable {
            public void run() {
                assert (acceptorRef.get() == this);
    
                int nHandles = 0;
    
                // Release the lock
                lock.release();
    
                while (selectable) {
                    try {
                        // Detect if we have some keys ready to be processed
                        // The select() will be woke up if some new connection
                        // have occurred, or if the selector has been explicitly
                        // woke up
                        int selected = select();
    
                        // this actually sets the selector to OP_ACCEPT,
                        // and binds to the port on which this class will
                        // listen on  在通道里面注冊连接事件
                        nHandles += registerHandles();
    
                        // Now, if the number of registred handles is 0, we can
                        // quit the loop: we don't have any socket listening
                        // for incoming connection.
                        if (nHandles == 0) {
                            acceptorRef.set(null);
    
                            if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
                                assert (acceptorRef.get() != this);
                                break;
                            }
    
                            if (!acceptorRef.compareAndSet(null, this)) {
                                assert (acceptorRef.get() != this);
                                break;
                            }
    
                            assert (acceptorRef.get() == this);
                        }
    
                        if (selected > 0) {
                            // We have some connection request, let's process
                            // them here.处理连接
                            processHandles(selectedHandles());
                        }
    
                        // check to see if any cancellation request has been made.
                        nHandles -= unregisterHandles();
                    } catch (ClosedSelectorException cse) {
                        // If the selector has been closed, we can exit the loop
                        break;
                    } catch (Throwable e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
    
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
                            ExceptionMonitor.getInstance().exceptionCaught(e1);
                        }
                    }
                }
    
                // Cleanup all the processors, and shutdown the acceptor.
                if (selectable && isDisposing()) {
                    selectable = false;
                    try {
                        if (createdProcessor) {
                            processor.dispose();
                        }
                    } finally {
                        try {
                            synchronized (disposalLock) {
                                if (isDisposing()) {
                                    destroy();
                                }
                            }
                        } catch (Exception e) {
                            ExceptionMonitor.getInstance().exceptionCaught(e);
                        } finally {
                            disposalFuture.setDone();
                        }
                    }
                }
            }
    

    2、processor



    processor顾名思义,就是进行IO处理,处理当前session的数据读写,并进行业务处理。


    在mina server初始化的时候,会初始化一个processor池,通过NioSocketAcceptor的构造器传入池的大小,默认是当前处理器的个数+1。


    processor池里面有一个jdk提供的 线程池 - Executors.newCachedThreadPool()。各个processor线程会引用此线程池,即每一个processor线程都在这个线程池里面运行。


    在mina server实际处理时,每一个processor相当于一个线程,轮流处理当前的session队列里面的数据(每一个processor里面的session相当于顺序处理,共享一个线程)。


    每一个processor有一个Selector对象


    processor类图



    processor端的处理逻辑相对有点复杂。看以下的流程图。




    1、把新加入进来的session注冊到当前processor的Selector里面的read事件,并初始化session;
    2、推断当前Selector是否有读写事件。
    3、若第2步有读事件时。进入步骤4,若没有的话,直接到第6步。
    4、处理当前读事件,并把处理后的数据放入到flush队列里面。
    5、把第4步运行的结果flush到client;
    6、处理session。比方session idle时间等。


    7、又一次运行第1步,循环运行。


    processor端代码。


     private class Processor implements Runnable {
            public void run() {
                assert (processorRef.get() == this);
    
                int nSessions = 0;
                lastIdleCheckTime = System.currentTimeMillis();
    
                for (;;) {
                    try {
                        // This select has a timeout so that we can manage
                        // idle session when we get out of the select every
                        // second. (note : this is a hack to avoid creating
                        // a dedicated thread).
                        long t0 = System.currentTimeMillis();
                        int selected = select(SELECT_TIMEOUT);
                        long t1 = System.currentTimeMillis();
                        long delta = (t1 - t0);
    
                        if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
                            // Last chance : the select() may have been
                            // interrupted because we have had an closed channel.
                            if (isBrokenConnection()) {
                                LOG.warn("Broken connection");
    
                                // we can reselect immediately
                                // set back the flag to false
                                wakeupCalled.getAndSet(false);
    
                                continue;
                            } else {
                                LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                                // Ok, we are hit by the nasty epoll
                                // spinning.
                                // Basically, there is a race condition
                                // which causes a closing file descriptor not to be
                                // considered as available as a selected channel, but
                                // it stopped the select. The next time we will
                                // call select(), it will exit immediately for the same
                                // reason, and do so forever, consuming 100%
                                // CPU.
                                // We have to destroy the selector, and
                                // register all the socket on a new one.
                                registerNewSelector();
                            }
    
                            // Set back the flag to false
                            wakeupCalled.getAndSet(false);
    
                            // and continue the loop
                            continue;
                        }
    
                        // Manage newly created session first  处理新加入进来的session,并注冊到当前processor的Selector读事件
                        nSessions += handleNewSessions();
    
                        updateTrafficMask();
    
                        // Now, if we have had some incoming or outgoing events,
                        // deal with them
                        if (selected > 0) {
                            //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...处理读事件。并把结果放入flush队列里面
                            process();
                        }
    
                        // Write the pending requests  把flush队列里面的session的处理完的数据发送给client
                        long currentTime = System.currentTimeMillis();
                        flush(currentTime);
    
                        // And manage removed sessions
                        nSessions -= removeSessions();
    
                        // Last, not least, send Idle events to the idle sessions
                        notifyIdleSessions(currentTime);
    
                        // Get a chance to exit the infinite loop if there are no
                        // more sessions on this Processor
                        if (nSessions == 0) {
                            processorRef.set(null);
    
                            if (newSessions.isEmpty() && isSelectorEmpty()) {
                                // newSessions.add() precedes startupProcessor
                                assert (processorRef.get() != this);
                                break;
                            }
    
                            assert (processorRef.get() != this);
    
                            if (!processorRef.compareAndSet(null, this)) {
                                // startupProcessor won race, so must exit processor
                                assert (processorRef.get() != this);
                                break;
                            }
    
                            assert (processorRef.get() == this);
                        }
    
                        // Disconnect all sessions immediately if disposal has been
                        // requested so that we exit this loop eventually.
                        if (isDisposing()) {
                            for (Iterator<S> i = allSessions(); i.hasNext();) {
                                scheduleRemove(i.next());
                            }
    
                            wakeup();
                        }
                    } catch (ClosedSelectorException cse) {
                        // If the selector has been closed, we can exit the loop
                        break;
                    } catch (Throwable t) {
                        ExceptionMonitor.getInstance().exceptionCaught(t);
    
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
                            ExceptionMonitor.getInstance().exceptionCaught(e1);
                        }
                    }
                }
    
                try {
                    synchronized (disposalLock) {
                        if (disposing) {
                            doDispose();
                        }
                    }
                } catch (Throwable t) {
                    ExceptionMonitor.getInstance().exceptionCaught(t);
                } finally {
                    disposalFuture.setValue(true);
                }
            }
        }

    3、session


    session做为一个连接的详细对象,缓存当前连接用户的一些信息。

    session类图



    session对象是绑定在SelectableChannel的一个attach。


    class  NioProcessor
     @Override
        protected void init(NioSession session) throws Exception {
            SelectableChannel ch = (SelectableChannel) session.getChannel();
            ch.configureBlocking(false);
            session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));//chanel 注冊读事件,并把session当做一个attach辅导SelectableChannel里面。
        }



  • 相关阅读:
    安装MySQLdb
    树莓派及其他硬件平台国内外Linux镜像站全汇总
    rpc使用举例
    SAE上安装第三方模块
    【Java】Map
    【Java】判断字符串是否含字母
    【Android Studio】提示代码忽略大小写
    【iOS】Xcode 离线文档
    【iOS】iOS main() 简介
    【eclipse】No enclosing instance of type A is accessible. Must qualify the allocation with an enclosing instance of type A
  • 原文地址:https://www.cnblogs.com/lytwajue/p/7124456.html
Copyright © 2011-2022 走看看