zoukankan      html  css  js  c++  java
  • mina2的processor

    processor顾名思义,就是进行IO处理,处理当前session的数据读写,并进行业务处理。

    在mina server初始化的时候,会初始化一个processor池,通过NioSocketAcceptor的构造器传入池的大小,默认是当前处理器的个数+1。

    processor池里面有一个jdk提供的 线程池 - Executors.newCachedThreadPool()。各个processor线程会引用此线程池,即每个processor线程都在这个线程池里面运行。

    在mina server实际处理时,每个processor相当于一个线程,轮流处理当前的session队列里面的数据(每个processor里面的session相当于顺序处理,共享一个线程)。

    每个processor有一个Selector对象

    processor类图

    processor端的处理逻辑相对有点复杂,看下面的流程图。

    1、把新添加进来的session注册到当前processor的Selector里面的read事件,并初始化session;
    2、判断当前Selector是否有读写事件;
    3、若第2步有读事件时,进入步骤4,若没有的话,直接到第6步;
    4、处理当前读事件,并把处理后的数据放入到flush队列里面;
    5、把第4步执行的结果flush到客户端;
    6、处理session,比如session idle时间等。
    7、重新执行第1步,循环执行。

    processor类源码:

        private class Processor implements Runnable {
            public void run() {
                assert (processorRef.get() == this);
    
                int nSessions = 0;
                lastIdleCheckTime = System.currentTimeMillis();
    
                for (;;) {
                    try {
                        // This select has a timeout so that we can manage
                        // idle session when we get out of the select every
                        // second. (note : this is a hack to avoid creating
                        // a dedicated thread).
                        long t0 = System.currentTimeMillis();
                        int selected = select(SELECT_TIMEOUT);
                        long t1 = System.currentTimeMillis();
                        long delta = (t1 - t0);
    
                        if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
                            // Last chance : the select() may have been
                            // interrupted because we have had an closed channel.
                            if (isBrokenConnection()) {
                                LOG.warn("Broken connection");
    
                                // we can reselect immediately
                                // set back the flag to false
                                wakeupCalled.getAndSet(false);
    
                                continue;
                            } else {
                                LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                                // Ok, we are hit by the nasty epoll
                                // spinning.
                                // Basically, there is a race condition
                                // which causes a closing file descriptor not to be
                                // considered as available as a selected channel, but
                                // it stopped the select. The next time we will
                                // call select(), it will exit immediately for the same
                                // reason, and do so forever, consuming 100%
                                // CPU.
                                // We have to destroy the selector, and
                                // register all the socket on a new one.
                                registerNewSelector();
                            }
    
                            // Set back the flag to false
                            wakeupCalled.getAndSet(false);
    
                            // and continue the loop
                            continue;
                        }
    
                        // Manage newly created session first
                        nSessions += handleNewSessions();
    
                        updateTrafficMask();
    
                        // Now, if we have had some incoming or outgoing events,
                        // deal with them
                        if (selected > 0) {
                            //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
                            process();
                        }
    
                        // Write the pending requests
                        long currentTime = System.currentTimeMillis();
                        flush(currentTime);
    
                        // And manage removed sessions
                        nSessions -= removeSessions();
    
                        // Last, not least, send Idle events to the idle sessions
                        notifyIdleSessions(currentTime);
    
                        // Get a chance to exit the infinite loop if there are no
                        // more sessions on this Processor
                        if (nSessions == 0) {
                            processorRef.set(null);
    
                            if (newSessions.isEmpty() && isSelectorEmpty()) {
                                // newSessions.add() precedes startupProcessor
                                assert (processorRef.get() != this);
                                break;
                            }
    
                            assert (processorRef.get() != this);
    
                            if (!processorRef.compareAndSet(null, this)) {
                                // startupProcessor won race, so must exit processor
                                assert (processorRef.get() != this);
                                break;
                            }
    
                            assert (processorRef.get() == this);
                        }
    
                        // Disconnect all sessions immediately if disposal has been
                        // requested so that we exit this loop eventually.
                        if (isDisposing()) {
                            for (Iterator<S> i = allSessions(); i.hasNext();) {
                                scheduleRemove(i.next());
                            }
    
                            wakeup();
                        }
                    } catch (ClosedSelectorException cse) {
                        // If the selector has been closed, we can exit the loop
                        break;
                    } catch (Throwable t) {
                        ExceptionMonitor.getInstance().exceptionCaught(t);
    
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
                            ExceptionMonitor.getInstance().exceptionCaught(e1);
                        }
                    }
                }
    
                try {
                    synchronized (disposalLock) {
                        if (disposing) {
                            doDispose();
                        }
                    }
                } catch (Throwable t) {
                    ExceptionMonitor.getInstance().exceptionCaught(t);
                } finally {
                    disposalFuture.setValue(true);
                }
            }
        }
  • 相关阅读:
    评估您的网站/博客的价值
    Jquery从入门到精通:二、选择器 1、准备篇 (2)$()工厂方法
    JQuery核心:1.jQuery( expression, context )
    VS2008引用webservice的奇怪BUG解决方案
    Jquery从入门到精通:二、选择器 1、准备篇 1)基础的基础:DOM模型
    js实现html页面显示时间的定时刷新
    分页显示批量数据
    JSP与Access2010结合,实现数据的交互使用(re)
    通过datasource与数据库交互的jsp范例
    js练习V1
  • 原文地址:https://www.cnblogs.com/duanxz/p/6754593.html
Copyright © 2011-2022 走看看