zoukankan      html  css  js  c++  java
  • MINA2 源代码学习--源代码结构梳理

    一、mina总体框架与案例:

    1.总体结构图:

    简述:以上是一张来自网上比較经典的图,总体上揭示了mina的结构,当中IoService包括clientIoConnector和服务端IoAcceptor两部分。即不管是client还是服务端都是这个结构。IoService封装了网络传输层(TCP和UDP),而IoFilterChain中mina自带的filter做了一些主要的操作之外,支持扩展。经过FilterChain之后终于调用IoHandler,IoHandler是详细实现业务逻辑的处理接口,详细的业务实现可扩展。


    2.一个可执行的案例(案例来自网上,转载后试验):
    Client.java:

    import java.net.InetSocketAddress;
    import java.nio.charset.Charset;
    import java.util.Random;
    
    import org.apache.mina.core.future.ConnectFuture;
    import org.apache.mina.core.future.IoFutureListener;
    import org.apache.mina.core.service.IoConnector;
    import org.apache.mina.core.service.IoHandlerAdapter;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
    import org.apache.mina.transport.socket.nio.NioSocketConnector;
    
    public class Client extends IoHandlerAdapter {
    
        private Random random = new Random(System.currentTimeMillis());
        public Client() {
            IoConnector connector = new NioSocketConnector();
            connector.getFilterChain().addLast(
                    "text",
                    new ProtocolCodecFilter(new TextLineCodecFactory(Charset
                            .forName(Server.ENCODE))));
            connector.setHandler(this);
            ConnectFuture future = connector.connect(new InetSocketAddress(
                    "127.0.0.1", Server.PORT));
            future.awaitUninterruptibly();
            future.addListener(new IoFutureListener<ConnectFuture>() {
                @Override
                public void operationComplete(ConnectFuture future) {
                    IoSession session = future.getSession();
                    while (!session.isClosing()) {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        String message = "你好。我roll了" + random.nextInt(100) + "点.";
                        session.write(message);
                    }
                }
            });
            connector.dispose();
        }
        @Override
        public void messageReceived(IoSession session, Object message)
                throws Exception {
            System.out.println("批复:" + message.toString());
        }
        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
            System.out.println("报告:" + message.toString());
        }
        @Override
        public void exceptionCaught(IoSession session, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            session.close(true);
        }
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Client();
            }
        }
    }
    

    ServerHandler.java:

    import java.net.InetSocketAddress;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    import org.apache.mina.core.service.IoHandlerAdapter;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    
    public class ServerHandler extends IoHandlerAdapter {
        @Override
        public void exceptionCaught(IoSession session, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            session.close(false);
        }
        public void messageReceived(IoSession session, Object message)
                throws Exception {
            String s = message.toString();
            System.out.println("收到请求:" + s);
            if (s != null) {
                int i = getPoint(s);
                if (session.isConnected()) {
                    if (i >= 95) {
                        session.write("运气不错,你能够出去了.");
                        session.close(false);
                        return;
                    }
                    Integer count = (Integer) session.getAttribute(Server.KEY);
                    count++;
                    session.setAttribute(Server.KEY, count);
                    session.write("抱歉。你运气太差了,第" + count + "次请求未被通过。继续在小黑屋呆着吧.");
                } else {
                    session.close(true);
                }
            }
        }
        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
            System.out.println("发给client:" + message.toString());
        }
        @Override
        public void sessionClosed(IoSession session) throws Exception {
            long l = session.getCreationTime();
            System.out.println("来自" + getInfo(session) + "的会话已经关闭,它已经存活了"
                    + (System.currentTimeMillis() - 1) + "毫秒");
        }
        @Override
        public void sessionCreated(IoSession session) throws Exception {
            System.out.println("给" + getInfo(session) + "创建了一个会话");
        }
        @Override
        public void sessionIdle(IoSession session, IdleStatus status)
                throws Exception {
            System.out.println("来自" + getInfo(session) + "的会话闲置,状态为"
                    + status.toString());
        }
        public void sessionOpened(IoSession session) throws Exception {
            session.setAttribute(Server.KEY, 0);
            System.out.println("和" + getInfo(session) + "的会话已经打开.");
        }
        public String getInfo(IoSession session) {
    
            if (session == null) {
                return null;
            }
            InetSocketAddress address = (InetSocketAddress) session
                    .getRemoteAddress();
            int port = address.getPort();
            String ip = address.getAddress().getHostAddress();
            return ip + ":" + port;
        }
        public int getPoint(String s) {
    
            if (s == null) {
                return -1;
            }
            Pattern p = Pattern.compile("^[u0041-uFFFF,]*(\d+).*$");
            Matcher m = p.matcher(s);
            if (m.matches()) {
                return Integer.valueOf(m.group(1));
            }
            return 0;
        }
    }
    

    Server.java:

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.charset.Charset;
    
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
    import org.apache.mina.transport.socket.SocketAcceptor;
    import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
    
    
    public class Server {
    
        public static final int PORT = 2534;
        public static String ENCODE = "UTF-8";
        public static final String KEY = "roll";
    
        public static void main(String[] args){ 
            SocketAcceptor acceptor = new NioSocketAcceptor();
            acceptor.getFilterChain().addLast(
                    "text",
                    new ProtocolCodecFilter(new TextLineCodecFactory(Charset
                            .forName(ENCODE))));
            acceptor.setHandler(new ServerHandler());
            try {
                acceptor.bind(new InetSocketAddress(PORT));
                System.out.println("游戏開始,你想出去吗,来,碰碰运气吧!");
            } catch (IOException e) {
                e.printStackTrace();
                acceptor.dispose();
            }
        }
    }
    

    本案例依赖的jar例如以下图:


    简述:以上是依赖mina实现的一个可执行的案例,就不多说了,结合总体的结构图和案例实现能够看出mina框架还是非常轻量级的。以下分析一下mina的源代码结构和一些时序流程。

    二、mina 核心源代码分析:

    1.mina的启动时序(结合上面的案例):


    简述:SocketAcceptor作为服务端对外启动接口类,在bind网络地址的时候,会触发服务端一系列服务的启动,从调用链能够清晰找到相应的源代码阅读。

    当中AbstractPollingIoAcceptor是一个核心类,它会调用自身的startupAcceptor方法,来启动一个存放Acceptor的线程池用来处理client传输过来的请求。
    AbstractPollingIoAcceptor 类的 startupAcceptor 方法例如以下:

    /**
     * This method is called by the doBind() and doUnbind()
     * methods.  If the acceptor is null, the acceptor object will
     * be created and kicked off by the executor.  If the acceptor
     * object is null, probably already created and this class
     * is now working, then nothing will happen and the method
     * will just return.
     */
    private void startupAcceptor() throws InterruptedException {
        // If the acceptor is not ready, clear the queues
        // TODO : they should already be clean : do we have to do that ?

    if (!selectable) { registerQueue.clear(); cancelQueue.clear(); } // start the acceptor if not already started Acceptor acceptor = acceptorRef.get(); //这里仅仅会启动一个worker if (acceptor == null) { lock.acquire(); acceptor = new Acceptor(); if (acceptorRef.compareAndSet(null, acceptor)) { executeWorker(acceptor); } else { lock.release(); } } }

    上面调用到 AbstractIoService 的 executeWorker方法例如以下:

    protected final void executeWorker(Runnable worker) {
        executeWorker(worker, null);
    }
    
    protected final void executeWorker(Runnable worker, String suffix) {
        String actualThreadName = threadName;
        if (suffix != null) {
            actualThreadName = actualThreadName + '-' + suffix;
        }
        executor.execute(new NamePreservingRunnable(worker, actualThreadName));
    }
    

    简述:有类AbstractPollingIoAcceptor 的 startupAcceptor方法(上文)能够看到,一个SocketAcceptor仅仅启动了一个Worker线程(即代码中的Acceptor对象)而且把他加到线程池中。反过来讲,也能够看出AbstractIoService维护了Worker的线程池。(ps:这个Worker就是服务端处理请求的线程)。


    2.Mina处理client链接的过程(启动后):

    概述:从1中的启动时序能够看到,启动过程通过创建SocketAcceptor将有类AbstractPollingIoAcceptor的内部类Acceptor放到了 AbstractIoService的线程池里面,而这个Acceptor就是处理client网络请求的worker。而以下这个时序就是线程池中每一个worker处理client网络请求的时序流程。

    处理请求时序: 

    简述:worker线程Acceptor的run方法中会调用NioSocketAcceptor或者AprSocketAccetpor的select方法。
    ps:APR(Apache Protable Runtime Library,Apache可移植执行库)是能够提供非常好的可拓展性、性能以及对底层操作系统一致性操作的技术,说白了就是apache实现的一套标准的通讯接口。

    AprSocketAcceptor先不做深入了解,主要了解下NioSocketAcceptor,NioSocketAcceptor顾名思义,它调用了java NIO的API实现了NIO的网络连接处理过程。

    AbstractPolling$Acceptor 的run方法的核心代码例如以下:

     private class Acceptor implements Runnable {
        public void run() {
            assert (acceptorRef.get() == this);
    
            int nHandles = 0;
    
            // Release the lock
            lock.release();
    
            while (selectable) {
                try {
                    // Detect if we have some keys ready to be processed
                    // The select() will be woke up if some new connection
                    // have occurred, or if the selector has been explicitly
                    // woke up
                    //调用了NioSocketAcceptor的select方法,获取了selectKey
                    int selected = select();
    
                    // this actually sets the selector to OP_ACCEPT,
                    // and binds to the port on which this class will
                    // listen on
                    nHandles += registerHandles();
    
                    // Now, if the number of registred handles is 0, we can
                    // quit the loop: we don't have any socket listening
                    // for incoming connection.
                    if (nHandles == 0) {
                        acceptorRef.set(null);
    
                        if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
                            assert (acceptorRef.get() != this);
                            break;
                        }
    
                        if (!acceptorRef.compareAndSet(null, this)) {
                            assert (acceptorRef.get() != this);
                            break;
                        }
    
                        assert (acceptorRef.get() == this);
                    }
    
                    if (selected > 0) {
                        // We have some connection request, let's process
                        // them here.
                        processHandles(selectedHandles());
                    }
    
                    // check to see if any cancellation request has been made.
                    nHandles -= unregisterHandles();
                } catch (ClosedSelectorException cse) {
                    // If the selector has been closed, we can exit the loop
                    break;
                } catch (Throwable e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);
    
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }
    
            // Cleanup all the processors, and shutdown the acceptor.
            if (selectable && isDisposing()) {
                selectable = false;
                try {
                    if (createdProcessor) {
                        processor.dispose();
                    }
                } finally {
                    try {
                        synchronized (disposalLock) {
                            if (isDisposing()) {
                                destroy();
                            }
                        }
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    } finally {
                        disposalFuture.setDone();
                    }
                }
            }
        }
    

    简述:从上面的代码中能够看出一个典型的网络请求处理的程序,在循环中拿到处理的请求后就调用AbstractPollingIoAcceptor的processHandles()对网络请求做处理。

    代码例如以下:

        /**
         * This method will process new sessions for the Worker class.  All
         * keys that have had their status updates as per the Selector.selectedKeys()
         * method will be processed here.  Only keys that are ready to accept
         * connections are handled here.
         * <p/>
         * Session objects are created by making new instances of SocketSessionImpl
         * and passing the session object to the SocketIoProcessor class.
         */
        @SuppressWarnings("unchecked")
        private void processHandles(Iterator<H> handles) throws Exception {
            while (handles.hasNext()) {
                H handle = handles.next();
                handles.remove();
    
                // Associates a new created connection to a processor,
                // and get back a session
                //这里调用了NioSocketAcceptor的accept方法
                S session = accept(processor, handle);
    
                if (session == null) {
                    continue;
                }
    
                initSession(session, null, null);
    
                // add the session to the SocketIoProcessor
                // 这步处理add操作,会触发对client请求的异步处理。

    session.getProcessor().add(session); } }

    NioSocketAcceptor的accept方法new了一个包装Process处理线程的session实例:而且在调用session.getProcessor().add(session)的操作的时候触发了对client请求的异步处理。

    /**
     * {@inheritDoc}
     */
    @Override
    protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
    
        SelectionKey key = handle.keyFor(selector);
    
        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
            return null;
        }
    
        // accept the connection from the client
        SocketChannel ch = handle.accept();
    
        if (ch == null) {
            return null;
        }
    
        return new NioSocketSession(this, processor, ch);
    }
    

    再看上面时序图:有一步是AbstractPollingIoProcessor调用了startupProcessor方法。代码例如以下:

     /**
     * Starts the inner Processor, asking the executor to pick a thread in its
     * pool. The Runnable will be renamed
     */
    private void startupProcessor() {
        Processor processor = processorRef.get();
    
        if (processor == null) {
            processor = new Processor();
    
            if (processorRef.compareAndSet(null, processor)) {
                executor.execute(new NamePreservingRunnable(processor, threadName));
            }
        }
    
        // Just stop the select() and start it again, so that the processor
        // can be activated immediately.
        wakeup();
    }
    

    简述:这个startupProcessor方法在调用 session里包装的processor的add方法是,触发了将处理client请求的processor放入异步处理的线程池中。兴许详细Processor怎么处理client请求的流程,涉及到FilterChain的过滤。以及Adapter的调用。用来处理业务逻辑。详细的异步处理时序看以下的时序图:


    简述:这个时序就是将待处理的client链接,通过NIO的形式接受请求,并将请求包装成Processor的形式放到处理的线程池中异步的处理。

    在异步的处理过程中则调用了Processor的run方法,详细的filterchain的调用和业务Adapter的调用也是在这一步得到处理。

    值得注意的是。Handler的调用是封装在DefaultFilterchain的内部类诶TairFilter中触发调用的。Processor的run方法代码例如以下:

     private class Processor implements Runnable {
        public void run() {
            assert (processorRef.get() == this);
    
            int nSessions = 0;
            lastIdleCheckTime = System.currentTimeMillis();
    
            for (;;) {
                try {
                    // This select has a timeout so that we can manage
                    // idle session when we get out of the select every
                    // second. (note : this is a hack to avoid creating
                    // a dedicated thread).
                    long t0 = System.currentTimeMillis();
                    //调用了NioProcessor
                    int selected = select(SELECT_TIMEOUT);
                    long t1 = System.currentTimeMillis();
                    long delta = (t1 - t0);
    
                    if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
                        // Last chance : the select() may have been
                        // interrupted because we have had an closed channel.
                        if (isBrokenConnection()) {
                            LOG.warn("Broken connection");
    
                            // we can reselect immediately
                            // set back the flag to false
                            wakeupCalled.getAndSet(false);
    
                            continue;
                        } else {
                            LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                            // Ok, we are hit by the nasty(讨厌的) epoll
                            // spinning.
                            // Basically, there is a race condition
                            // which causes a closing file descriptor not to be
                            // considered as available as a selected channel, but
                            // it stopped the select. The next time we will
                            // call select(), it will exit immediately for the same
                            // reason, and do so forever, consuming 100%
                            // CPU.
                            // We have to destroy the selector, and
                            // register all the socket on a new one.
                            registerNewSelector();
                        }
    
                        // Set back the flag to false
                        wakeupCalled.getAndSet(false);
    
                        // and continue the loop
                        continue;
                    }
    
                    // Manage newly created session first
                    nSessions += handleNewSessions();
    
                    updateTrafficMask();
    
                    // Now, if we have had some incoming or outgoing events,
                    // deal with them
                    if (selected > 0) {
                        //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
                        //触发了详细的调用逻辑
                        process();
                    }
    
                    // Write the pending requests
                    long currentTime = System.currentTimeMillis();
                    flush(currentTime);
    
                    // And manage removed sessions
                    nSessions -= removeSessions();
    
                    // Last, not least, send Idle events to the idle sessions
                    notifyIdleSessions(currentTime);
    
                    // Get a chance to exit the infinite loop if there are no
                    // more sessions on this Processor
                    if (nSessions == 0) {
                        processorRef.set(null);
    
                        if (newSessions.isEmpty() && isSelectorEmpty()) {
                            // newSessions.add() precedes startupProcessor
                            assert (processorRef.get() != this);
                            break;
                        }
    
                        assert (processorRef.get() != this);
    
                        if (!processorRef.compareAndSet(null, this)) {
                            // startupProcessor won race, so must exit processor
                            assert (processorRef.get() != this);
                            break;
                        }
    
                        assert (processorRef.get() == this);
                    }
    
                    // Disconnect all sessions immediately if disposal has been
                    // requested so that we exit this loop eventually.
                    if (isDisposing()) {
                        for (Iterator<S> i = allSessions(); i.hasNext();) {
                            scheduleRemove(i.next());
                        }
    
                        wakeup();
                    }
                } catch (ClosedSelectorException cse) {
                    // If the selector has been closed, we can exit the loop
                    break;
                } catch (Throwable t) {
                    ExceptionMonitor.getInstance().exceptionCaught(t);
    
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }
    
            try {
                synchronized (disposalLock) {
                    if (disposing) {
                        doDispose();
                    }
                }
            } catch (Throwable t) {
                ExceptionMonitor.getInstance().exceptionCaught(t);
            } finally {
                disposalFuture.setValue(true);
            }
        }
    }
    

    简述:这么一坨代码能够看出,这个处理器也调用了java的Nio API是一个NIO模型。当中select和process方法各自是从session拿到要处理的请求,并进行处理。而详细的Processor实例是NioProcessor。从加入凝视的代码中有一步调用了自身的process方法,这步调用触发了详细业务逻辑的调用。能够结合代码和时序图看下。在Process方法中会调用reader(session)或wirte(session)方法,然后调用fireMessageReceived方法,这种方法又调用了callNextMessageReceived方法致使触发了整个FilterChain和Adapter的调用。read方法的核心代码例如以下:

    private void read(S session) {
        IoSessionConfig config = session.getConfig();
        int bufferSize = config.getReadBufferSize();
        IoBuffer buf = IoBuffer.allocate(bufferSize);
    
        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
    
        try {
            int readBytes = 0;
            int ret;
    
            try {
                if (hasFragmentation) {
    
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;
    
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
    
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }
    
            if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireMessageReceived(buf);
                buf = null;
    
                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
    
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                if (!(e instanceof PortUnreachableException)
                        || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
                        || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
                    scheduleRemove(session);
                }
            }
    
            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
        }
    }
    

    从这段代码并结合上面的时序图能够看出来触发整个FilterChain的调用以及IoHandler的调用。

    三、类结构分析

    參考第一部分的总体结构图,画一下每一个部分大致的类结构图:

    简述: 从类继承结构图来看,能够看到在IOService体系下,存在IoConnector和IoAcceptor两个大的分支体系。IoConnector是做为client的时候使用,IoAcceptor是作为服务端的时候使用。实际上在Mina中,有三种worker线程各自是:Acceptor、Connector 和 I/O processor。


    (1) Acceptor Thread 作为server端的链接线程,实现了IoService接口。线程的数量就是创建SocketAcceptor的数量。


    (2) Connector Thread 作为client请求建立的链接线程,实现了IoService接口,维持了一个和服务端Acceptor的一个链接,线程的数量就是创建SocketConnector的数量。


    (3) I/O processorThread 作为I/O真正处理的线程,存在于server端和client。线程的数量是能够配置的,默认是CPU个数+1。

    上面那个图仅仅是表述了IoService类体系,而I/O Processor的类体系并不在当中,见下图:


    简述:IOProcessor主要分为两种。各自是AprIOProcessor和NioProcessor,Apr的解释见上文:ps:APR(Apache Protable Runtime Library,Apache可移植执行库)。

    NioProcessor也是Nio的一种实现,用来处理client连接过来的请求。在Processor中会调用到 FilterChain 和 Handler,见上文代码。先看下FilterChain的类结构图例如以下:


    Filter 和 Handler的类结构例如以下:


    Handler的类结构例如以下:


    Mina的session类结构图例如以下:

    Mina的Buffer的类结构图例如以下:

    版权声明:本文博主原创文章,博客,未经同意不得转载。

  • 相关阅读:
    BestCoder Round #29 1003 (hdu 5172) GTY's gay friends [线段树 判不同 预处理 好题]
    POJ 1182 食物链 [并查集 带权并查集 开拓思路]
    Codeforces Round #288 (Div. 2) E. Arthur and Brackets [dp 贪心]
    Codeforces Round #287 (Div. 2) E. Breaking Good [Dijkstra 最短路 优先队列]
    Codeforces Round #287 (Div. 2) D. The Maths Lecture [数位dp]
    NOJ1203 最多约数问题 [搜索 数论]
    poj1426
    POJ 1502 MPI Maelstrom [最短路 Dijkstra]
    POJ 2785 4 Values whose Sum is 0 [二分]
    浅析group by,having count()
  • 原文地址:https://www.cnblogs.com/zfyouxi/p/4847183.html
Copyright © 2011-2022 走看看