processor顾名思义,就是进行IO处理,处理当前session的数据读写,并进行业务处理。
在mina server初始化的时候,会初始化一个processor池,通过NioSocketAcceptor的构造器传入池的大小,默认是当前处理器的个数+1。
processor池里面有一个jdk提供的 线程池 - Executors.newCachedThreadPool()。各个processor线程会引用此线程池,即每个processor线程都在这个线程池里面运行。
在mina server实际处理时,每个processor相当于一个线程,轮流处理当前的session队列里面的数据(每个processor里面的session相当于顺序处理,共享一个线程)。
每个processor有一个Selector对象。
processor类图
processor端的处理逻辑相对有点复杂,看下面的流程图。
1、把新添加进来的session注册到当前processor的Selector里面的read事件,并初始化session;
2、判断当前Selector是否有读写事件;
3、若第2步有读事件时,进入步骤4,若没有的话,直接到第6步;
4、处理当前读事件,并把处理后的数据放入到flush队列里面;
5、把第4步执行的结果flush到客户端;
6、处理session,比如session idle时间等。
7、重新执行第1步,循环执行。
processor类源码:
private class Processor implements Runnable { public void run() { assert (processorRef.get() == this); int nSessions = 0; lastIdleCheckTime = System.currentTimeMillis(); for (;;) { try { // This select has a timeout so that we can manage // idle session when we get out of the select every // second. (note : this is a hack to avoid creating // a dedicated thread). long t0 = System.currentTimeMillis(); int selected = select(SELECT_TIMEOUT); long t1 = System.currentTimeMillis(); long delta = (t1 - t0); if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) { // Last chance : the select() may have been // interrupted because we have had an closed channel. if (isBrokenConnection()) { LOG.warn("Broken connection"); // we can reselect immediately // set back the flag to false wakeupCalled.getAndSet(false); continue; } else { LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0)); // Ok, we are hit by the nasty epoll // spinning. // Basically, there is a race condition // which causes a closing file descriptor not to be // considered as available as a selected channel, but // it stopped the select. The next time we will // call select(), it will exit immediately for the same // reason, and do so forever, consuming 100% // CPU. // We have to destroy the selector, and // register all the socket on a new one. registerNewSelector(); } // Set back the flag to false wakeupCalled.getAndSet(false); // and continue the loop continue; } // Manage newly created session first nSessions += handleNewSessions(); updateTrafficMask(); // Now, if we have had some incoming or outgoing events, // deal with them if (selected > 0) { //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test... process(); } // Write the pending requests long currentTime = System.currentTimeMillis(); flush(currentTime); // And manage removed sessions nSessions -= removeSessions(); // Last, not least, send Idle events to the idle sessions notifyIdleSessions(currentTime); // Get a chance to exit the infinite loop if there are no // more sessions on this Processor if (nSessions == 0) { processorRef.set(null); if (newSessions.isEmpty() && isSelectorEmpty()) { // newSessions.add() precedes startupProcessor assert (processorRef.get() != this); break; } assert (processorRef.get() != this); if (!processorRef.compareAndSet(null, this)) { // startupProcessor won race, so must exit processor assert (processorRef.get() != this); break; } assert (processorRef.get() == this); } // Disconnect all sessions immediately if disposal has been // requested so that we exit this loop eventually. if (isDisposing()) { for (Iterator<S> i = allSessions(); i.hasNext();) { scheduleRemove(i.next()); } wakeup(); } } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop break; } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } try { synchronized (disposalLock) { if (disposing) { doDispose(); } } } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); } finally { disposalFuture.setValue(true); } } }