MINA框架是基于NIO的异步IO框架,上一文已经对MINA的理论及实践做了分析,本文将对于MINA的整体源码实现进行分析。
通过MINA的实际案例可以发现,MINA的IO实现相比于NIO的使用要简单很多,因为不需要关心IO的具体实现,只需要关心具体的IO数据即可。MINA服务端整体步骤一共就四步:
1、创建IoService:初始化IoService,服务端就是创建IoAcceptor对象,客户端就是创建IoConnector对象
2、添加IoFilter:添加IO过滤器,每个IoService内部都有一个IO过滤器链IoFIlterChain,调用addBefore或addLast等方法将IO过滤器添加到过滤器链上
3、设置IoHandler:给IoService设置IO数据处理器IoHandler对象,用来处理器具体的IO业务数据
4、绑定监听端口号:调用IoService的bind方法监听服务端需要监听的端口号,并开启Selector监听客户端连接
一、初始化IoService
服务器创建IoService是直接创建IoService的子接口IoAcceptor,实现类为NioSocketAcceptor,实例化代码如下:
1 /** 创建默认实例*/ 2 IoAcceptor acceptor1 = new NioSocketAcceptor(); 3 4 /** 创建指定数量IoProcessor的实例*/ 5 IoAcceptor acceptor2 = new NioSocketAcceptor(2); 6 7 /** 创建指定线程池的实例*/ 8 Executor executor = Executors.newFixedThreadPool(4); 9 IoAcceptor acceptor3 = new NioSocketAcceptor(executor, new SimpleIoProcessorPool<>(NioProcessor.class));
虽然NioSocketAcceptor是实现了IoAcceptor接口,但是并不是直接实现的,而是继承了抽象的实现了IoAcceptor接口的父类AbstractPollingIoAcceptor,按默认的构造方法为例,初始化代码如下:
1 /** 无参构造函数*/ 2 public NioSocketAcceptor() { 3 /** 调用父类构造函数*/ 4 super(new DefaultSocketSessionConfig(), NioProcessor.class); 5 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 6 } 7 8 /** AbstractPollingIoAcceptor构造函数 9 * @param sessionConfig:IoSession的全局配置 10 * @param processorClass:IoProcessor的Class 11 * */ 12 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) { 13 /** 调用重载构造函数*/ 14 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null); 15 } 16 17 /** AbstractPollingIoAcceptor构造函数 18 * @param sessionConfig:IoSession的全局配置 19 * @param executor:线程池 20 * @param processor:IoProcessor对象 21 * @param createdProcessor : 是否创建Processor 22 * @param selectorProvider : SelectorProvider对象,用于创建Selector 23 * */ 24 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor, 25 boolean createdProcessor, SelectorProvider selectorProvider) { 26 /** 1.调用父亲AbstractIoAcceptor的构造函数*/ 27 super(sessionConfig, executor); 28 29 if (processor == null) { 30 throw new IllegalArgumentException("processor"); 31 } 32 /** 2.设置属性processor、createdProcessor值*/ 33 this.processor = processor; 34 this.createdProcessor = createdProcessor; 35 36 try { 37 /** 3.根据SelectorProvider对象初始化Selector,和NIO的根据SelectorProvider获取Selector对象逻辑一样 */ 38 init(selectorProvider); 39 40 //标记当前IoAcceptor的Selector已经创建完成,可以接收客户端的连接请求了 41 selectable = true; 42 } catch (RuntimeException e) { 43 throw e; 44 } catch (Exception e) { 45 throw new RuntimeIoException("Failed to initialize.", e); 46 } finally { 47 if (!selectable) { 48 try { 49 destroy(); 50 } catch (Exception e) { 51 ExceptionMonitor.getInstance().exceptionCaught(e); 52 } 53 } 54 } 55 } 56 57 /** AbstractIoAcceptor构造函数*/ 58 protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) { 59 /** 调用父类AbstractIoService构造函数 */ 60 super(sessionConfig, executor); 61 defaultLocalAddresses.add(null); 62 } 63 64 /** AbstractIoService构造函数 */ 65 protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) { 66 /** 参数校验*/ 67 if (sessionConfig == null) { 68 throw new IllegalArgumentException("sessionConfig"); 69 } 70 71 if (getTransportMetadata() == null) { 72 throw new IllegalArgumentException("TransportMetadata"); 73 } 74 75 if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) { 76 throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: " 77 + getTransportMetadata().getSessionConfigType() + ")"); 78 } 79 80 /** 创建IoServiceListenerSupport对象,IoServiceListenerSupport内部有一个IoServiceListener的列表 81 * 将当前的IoServerListener对象添加到IoServiceListenerSupport的列表中 82 * */ 83 listeners = new IoServiceListenerSupport(this); 84 listeners.add(serviceActivationListener); 85 86 /** 设置属性 sessionConfig*/ 87 this.sessionConfig = sessionConfig; 88 89 //加载异常监听器ExceptionMonitor对象,提前加载防止在使用的时候还没有初始化 90 ExceptionMonitor.getInstance(); 91 /** 设置线程池,如果没有自定义,就采用默认的newCachedThreadPool*/ 92 if (executor == null) { 93 this.executor = Executors.newCachedThreadPool(); 94 createdExecutor = true; 95 } else { 96 this.executor = executor; 97 createdExecutor = false; 98 } 99 100 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet(); 101 }
整个的初始化过程都是在给IoAcceptor的一些属性进行初始化,核心属性包括sessionConfig、executor、IoProcessor等
其中SessionConfig表示IoSession的全局属性配置,主要配置如下:
1 public interface IoSessionConfig { 2 3 /** 4 * 获取IoProcessor读取数据的缓冲区大小 5 */ 6 int getReadBufferSize(); 7 8 /** 9 * 设置IoProcessor读取数据的缓冲区大小,通常会由IoProcessor自动动态调整 10 */ 11 void setReadBufferSize(int readBufferSize); 12 13 /** 14 * 获取IoProcessor读取属性的缓冲区大小的最小值 15 */ 16 int getMinReadBufferSize(); 17 18 /** 19 * 设置IoProcessor读取属性的缓冲区大小的最小值 20 */ 21 void setMinReadBufferSize(int minReadBufferSize); 22 23 /** 24 * 获取IoProcessor读取属性的缓冲区大小的最大值 25 */ 26 int getMaxReadBufferSize(); 27 28 /** 29 * 设置IoProcessor读取属性的缓冲区大小的最大值 30 */ 31 void setMaxReadBufferSize(int maxReadBufferSize); 32 33 /** 34 * 获取吞吐量(TPS)的统计间隔,单位为秒,默认是3秒 35 */ 36 int getThroughputCalculationInterval(); 37 38 /** 39 * 获取吞吐量(TPS)的统计间隔,单位为毫秒 40 */ 41 long getThroughputCalculationIntervalInMillis(); 42 43 /** 44 * 设置吞吐量的统计间隔时间 45 */ 46 void setThroughputCalculationInterval(int throughputCalculationInterval); 47 48 /** 49 * 获取指定状态的空闲时间,不同状态时间可能设置的不一样, 类型如下: 50 * READER_IDLE : 读数据空闲 51 * WRITER_IDLE : 写数据空闲 52 * BOTH_IDLE : 读写都空闲 53 */ 54 int getIdleTime(IdleStatus status); 55 56 /** 57 * 获取指定状态的空闲时间,单位为毫秒 58 */ 59 long getIdleTimeInMillis(IdleStatus status); 60 61 /** 62 * 设置指定状态的空闲时间 63 */ 64 void setIdleTime(IdleStatus status, int idleTime); 65 66 /** 67 * 获取读空闲时间,单位秒 68 */ 69 int getReaderIdleTime(); 70 71 /** 72 * 获取读空闲时间,单位毫秒 73 */ 74 long getReaderIdleTimeInMillis(); 75 76 /** 77 * 设置读空闲时间 78 */ 79 void setReaderIdleTime(int idleTime); 80 81 /** 82 * 获取写空闲时间,单位秒 83 */ 84 int getWriterIdleTime(); 85 86 /** 87 * 获取写空闲时间,单位毫秒 88 */ 89 long getWriterIdleTimeInMillis(); 90 91 /** 92 * 设置写空闲时间 93 */ 94 void setWriterIdleTime(int idleTime); 95 96 /** 97 * 获取读写都空闲时间,单位秒 98 */ 99 int getBothIdleTime(); 100 101 /** 102 * 获取读写都空闲时间,单位毫秒 103 */ 104 long getBothIdleTimeInMillis(); 105 106 /** 107 * 设置读写都空闲时间 108 */ 109 void setBothIdleTime(int idleTime); 110 111 /** 112 * 获取写超时时间,单位为秒 113 */ 114 int getWriteTimeout(); 115 116 /** 117 * 获取写超时时间,单位毫秒 118 */ 119 long getWriteTimeoutInMillis(); 120 121 /** 122 * 设置写超时时间 123 */ 124 void setWriteTimeout(int writeTimeout); 125 126 /** 127 * 获取会话读操作是否开启 128 */ 129 boolean isUseReadOperation(); 130 131 /** 132 * 开启或关闭会话读操作,如果开启所有接收到的消息会存储在内存的BlockingQueue中,使客户端应用可以更方便读取接收的消息 133 * 开启这个选项对服务器应用无效,并可能会导致内存泄漏,默认为关闭状态 134 */ 135 void setUseReadOperation(boolean useReadOperation); 136 137 /** 全量设置为IoSessionConfig */ 138 void setAll(IoSessionConfig config); 139 }
另外一个属性是线程池对象executor,如果没有自定义线程池传入的话,那么默认会调用Executors.newCachedThreadPool()创建无线程数上限的线程池,所以推荐使用自定义的线程池,避免默认的线程池没有线程数量限制导致线程过多的问题。
还有一个属性是IoProcessor池,默认类为SimpleIoProcessorPool,SimpleIoProcessorPool内部有一个IoProcessor[] pool和一个Executor executor属性,pool是IoProcessor数组,存储所有的IoProcessor,数量可以自定义通过构造函数传入,默认的个数为当前服务器机器的CPU个数+1,比如4核CPU那么就会创建5个IoProcessor,另外每一个IoProcessor初始化的时候都会设置线程池executor属性,如果executor没有自定义同样也会使用Executors.newCachedThreadPool创建。
总结:
IoService初始化的时候涉及到了两个线程池,首先是IoService本身使用的线程池,IoService本身用于接收客户端的连接请求,而连接请求的处理就交给了线程池处理;
另外IoService初始化时会创建多个IoProcessor用于处理客户端具体的IO操作,每一个IoProcessor内部有一个Selector用于监听客户端IO事件,然后将IO事件交给内部的线程池来处理
二、添加IoFilter
IoService内部都一个过滤器链IoFilterChainBuilder对象,默认实现类为DefaultIoFilterChainBuilder类,添加IoFilter时主要有四个方法,分别是在过滤器链的不同位置插入指定过滤器,用法分别如下:
1 IoAcceptor acceptor = new NioSocketAcceptor(); 2 /** 获取过滤器链 */ 3 DefaultIoFilterChainBuilder ioFilterChain = acceptor.getFilterChain(); 4 5 /** 1.在过滤器链的首部加入过滤器 */ 6 ioFilterChain.addFirst("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8")))); 7 /** 2.在过滤器链的指定过滤器前面加入过滤器 */ 8 ioFilterChain.addBefore("logFilter","codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8")))); 9 /** 3.在过滤器链的指定过滤器后面加入过滤器 */ 10 ioFilterChain.addAfter("logFilter","codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8")))); 11 /** 4.在过滤器链的尾部加入过滤器 */ 12 ioFilterChain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
以addFirst为例,实现逻辑如下:
1 /** 在过滤器链首部插入过滤器 2 * @param name:过滤器的名称 3 * @param filter: 过滤器 4 * */ 5 public synchronized void addFirst(String name, IoFilter filter) { 6 /** 构造过滤器链路节点对象EntryImpl,并调用register方法加入到列表中 */ 7 register(0, new EntryImpl(name, filter)); 8 } 9 10 11 private final List<Entry> entries; 12 13 public DefaultIoFilterChainBuilder() { 14 entries = new CopyOnWriteArrayList<Entry>(); 15 } 16 17 /** 18 * 在指定位置插入Entry节点 19 * */ 20 private void register(int index, Entry e) { 21 if (contains(e.getName())) { 22 throw new IllegalArgumentException("Other filter is using the same name: " + e.getName()); 23 } 24 //调用List的add方法在指定为止插入节点 25 entries.add(index, e); 26 }
从源码可以看出添加IoFilter的逻辑比较简单,过滤器链IoFilterChainBuilder对象内部有一个列表,用于存放所有的IoFilter,添加过滤器就是将IoFilter和名称封装成Entry对象添加到列表中,不同的add方法实际就是调用List的add(int index, Entry e)方法在指定的位置插入元素。addFirst就是在list头部插入元素,addLast就是在list尾部插入元素,addBefore和addAfter先通过遍历查找指定过滤器在List中的位置,然后再将新插入的过滤器插入到指定位置
三、设置IoHandler
1 public final void setHandler(IoHandler handler) { 2 if (handler == null) { 3 throw new IllegalArgumentException("handler cannot be null"); 4 } 5 /** 判断IoService是否活跃,主要是判断IoServiceListenerSupport是否有活跃的IoServiceListener */ 6 if (isActive()) { 7 throw new IllegalStateException("handler cannot be set while the service is active."); 8 } 9 /** 设置IoHandler属性*/ 10 this.handler = handler; 11 }
设置IoHandler的逻辑比较简单,就是给IoService内部的handler属性赋值
四、绑定主机
前面三个步骤都是在初始化服务器并设置各种属性,接下来就是绑定监听的逻辑,源码如下:
1 public final void bind(SocketAddress localAddress) throws IOException { 2 if (localAddress == null) { 3 throw new IllegalArgumentException("localAddress"); 4 } 5 /** 创建地址列表,将传入的SocketAddress添加到列表中*/ 6 List<SocketAddress> localAddresses = new ArrayList<>(1); 7 localAddresses.add(localAddress); 8 /** 内部方法执行绑定逻辑*/ 9 bind(localAddresses); 10 }
调用内部重载的bind方法,源码如下:
1 public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException { 2 /** 参数校验*/ 3 if (isDisposing()) { 4 throw new IllegalStateException("The Accpetor disposed is being disposed."); 5 } 6 7 if (localAddresses == null) { 8 throw new IllegalArgumentException("localAddresses"); 9 } 10 11 List<SocketAddress> localAddressesCopy = new ArrayList<>(); 12 13 for (SocketAddress a : localAddresses) { 14 checkAddressType(a); 15 localAddressesCopy.add(a); 16 } 17 18 if (localAddressesCopy.isEmpty()) { 19 throw new IllegalArgumentException("localAddresses is empty."); 20 } 21 22 boolean activate = false; 23 synchronized (bindLock) { 24 synchronized (boundAddresses) { 25 if (boundAddresses.isEmpty()) { 26 activate = true; 27 } 28 } 29 /** 判断IoHandler是否为空*/ 30 if (getHandler() == null) { 31 throw new IllegalStateException("handler is not set."); 32 } 33 34 try { 35 /** 绑定主机地址 */ 36 Set<SocketAddress> addresses = bindInternal(localAddressesCopy); 37 synchronized (boundAddresses) { 38 boundAddresses.addAll(addresses); 39 } 40 } catch (IOException e) { 41 throw e; 42 } catch (RuntimeException e) { 43 throw e; 44 } catch (Exception e) { 45 throw new RuntimeIoException("Failed to bind to: " + getLocalAddresses(), e); 46 } 47 } 48 /** 激活IoServiceListener */ 49 if (activate) { 50 getListeners().fireServiceActivated(); 51 } 52 }
虽然代码比较多,但是核心代码就只有 bindInternal这一行, bindInternal是真正的绑定主机的方法,代码如下:
1 /** 绑定主机*/ 2 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception { 3 /** 创建一个Future对象,当IoAcceptor的Selector处理了该请求后会给Future对象发送一个信号 */ 4 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); 5 6 /** 添加到AcceptorOperationFuture队列中*/ 7 registerQueue.add(request); 8 9 /** 开启Acceptor*/ 10 startupAcceptor(); 11 12 try { 13 /** 信号量设置为1,相当于加锁处理*/ 14 lock.acquire(); 15 16 wakeup(); 17 } finally { 18 /** 信号量释放, 相当于解锁处理*/ 19 lock.release(); 20 } 21 22 // Now, we wait until this request is completed. 23 request.awaitUninterruptibly(); 24 25 if (request.getException() != null) { 26 throw request.getException(); 27 } 28 Set<SocketAddress> newLocalAddresses = new HashSet<>(); 29 30 for (H handle : boundHandles.values()) { 31 newLocalAddresses.add(localAddress(handle)); 32 } 33 34 return newLocalAddresses; 35 }
核心逻辑是调用了startupAcceptor方法,代码如下:
1 private void startupAcceptor() throws InterruptedException { 2 if (!selectable) { 3 registerQueue.clear(); 4 cancelQueue.clear(); 5 } 6 7 /** 创建Acceptor对象, Acceptor实现了Runnable接口*/ 8 Acceptor acceptor = acceptorRef.get(); 9 10 if (acceptor == null) { 11 lock.acquire(); 12 acceptor = new Acceptor(); 13 14 if (acceptorRef.compareAndSet(null, acceptor)) { 15 /** 将实现了Runnable接口的acceptor对象放入线程池中执行*/ 16 executeWorker(acceptor); 17 } else { 18 lock.release(); 19 } 20 } 21 }
1 protected final void executeWorker(Runnable worker) { 2 executeWorker(worker, null); 3 } 4 5 protected final void executeWorker(Runnable worker, String suffix) { 6 String actualThreadName = threadName; 7 if (suffix != null) { 8 actualThreadName = actualThreadName + '-' + suffix; 9 } 10 executor.execute(new NamePreservingRunnable(worker, actualThreadName)); 11 }
该方法的核心逻辑是创建一个Acceptor对象,Acceptor对象是实现了Runnable接口的,所以是一个可执行逻辑。然后调用executeWorker方法将Acceptor对象交给线程池执行,而线程池就是初始化IoService时初始化的线程池,默认是Executors.newCachedThreadPool
既然Acceptor实现了Runnable接口,那么就再看下Acceptor的具体实现逻辑:
1 private class Acceptor implements Runnable { 2 public void run() { 3 assert (acceptorRef.get() == this); 4 5 int nHandles = 0; 6 7 /** 释放锁*/ 8 lock.release(); 9 10 while (selectable) { 11 try { 12 /** 获取注册的监听端口的数量*/ 13 nHandles += registerHandles(); 14 15 /** 调用Selector的select()方法,会阻塞当前线程 */ 16 int selected = select(); 17 18 // Now, if the number of registred handles is 0, we can 19 // quit the loop: we don't have any socket listening 20 // for incoming connection. 21 if (nHandles == 0) { 22 acceptorRef.set(null); 23 24 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { 25 assert (acceptorRef.get() != this); 26 break; 27 } 28 29 if (!acceptorRef.compareAndSet(null, this)) { 30 assert (acceptorRef.get() != this); 31 break; 32 } 33 34 assert (acceptorRef.get() == this); 35 } 36 37 if (selected > 0) { 38 /** 处理Selector返回的所有的SelectionKey */ 39 processHandles(selectedHandles()); 40 } 41 42 // check to see if any cancellation request has been made. 43 nHandles -= unregisterHandles(); 44 } catch (ClosedSelectorException cse) { 45 // If the selector has been closed, we can exit the loop 46 ExceptionMonitor.getInstance().exceptionCaught(cse); 47 break; 48 } catch (Exception e) { 49 ExceptionMonitor.getInstance().exceptionCaught(e); 50 51 try { 52 Thread.sleep(1000); 53 } catch (InterruptedException e1) { 54 ExceptionMonitor.getInstance().exceptionCaught(e1); 55 } 56 } 57 } 58 59 /** 当销毁IoService时, 销毁所有的processor*/ 60 if (selectable && isDisposing()) { 61 selectable = false; 62 try { 63 if (createdProcessor) { 64 processor.dispose(); 65 } 66 } finally { 67 try { 68 synchronized (disposalLock) { 69 if (isDisposing()) { 70 destroy(); 71 } 72 } 73 } catch (Exception e) { 74 ExceptionMonitor.getInstance().exceptionCaught(e); 75 } finally { 76 disposalFuture.setDone(); 77 } 78 } 79 } 80 } 81 82 /** 83 * This method will process new sessions for the Worker class. All 84 * keys that have had their status updates as per the Selector.selectedKeys() 85 * method will be processed here. Only keys that are ready to accept 86 * connections are handled here. 87 * <p/> 88 * Session objects are created by making new instances of SocketSessionImpl 89 * and passing the session object to the SocketIoProcessor class. 90 */ 91 @SuppressWarnings("unchecked") 92 private void processHandles(Iterator<H> handles) throws Exception { 93 /** 遍历所有SelectionKey*/ 94 while (handles.hasNext()) { 95 H handle = handles.next(); 96 handles.remove(); 97 98 /** 处理客户端连接请求,封装成NioSocketSession对象*/ 99 S session = accept(processor, handle); 100 101 if (session == null) { 102 continue; 103 } 104 /** 初始化客户端Session对象, 设置上一次读写时间 */ 105 initSession(session, null, null); 106 107 /** 将客户端Session添加到IoProcessor线程池,并分配一个IoProcessor和Session进行绑定 */ 108 session.getProcessor().add(session); 109 } 110 } 111 } 112 113 protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception { 114 115 SelectionKey key = null; 116 117 if (handle != null) { 118 key = handle.keyFor(selector); 119 } 120 /** key有效且必须触发了OP_ACCEPT事件,其他事件不处理*/ 121 if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) { 122 return null; 123 } 124 125 /** 获取客户端连接的Channel*/ 126 SocketChannel ch = handle.accept(); 127 128 if (ch == null) { 129 return null; 130 } 131 /** 封装客户端连接为NioSocketSession对象 */ 132 return new NioSocketSession(this, processor, ch); 133 } 134 135 public NioSocketSession(IoService service, IoProcessor<NioSession> processor, SocketChannel channel) { 136 /** 调用父类构造函数 设置IoProcessor、IoService、SocketChannel属性*/ 137 super(processor, service, channel); 138 config = new SessionConfigImpl(); 139 /** 将IoService的全局SessionConfig复制给当前session的配置*/ 140 config.setAll(service.getSessionConfig()); 141 }
从代码中可以看出IoAcceptor的run方法基本上和NIO的服务器启动方法差不多,首先了调用Selector的select()方法获取所有客户端的请求连接accept事件,然后遍历所有的SelectionKey,将客户端的连接封装成NioSocketSession,最后再将NioSocketSession添加到IoProcessor线程池中,而绑定到具体的IoProcessor的逻辑是在IoProcessor的add方法中实现的,这里IoProcessor线程池是通过SimpleIoProcessorPool类实现的,源码如下:
1 /** 添加SocketSession*/ 2 public final void add(S session) { 3 /** 根据session获取具体的IoProcessor,并将session添加到IoProcessor中*/ 4 getProcessor(session).add(session); 5 } 6 7 private IoProcessor<S> getProcessor(S session) { 8 IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR); 9 10 if (processor == null) { 11 if (disposed || disposing) { 12 throw new IllegalStateException("A disposed processor cannot be accessed."); 13 } 14 /** 根据session的ID通过取模算法从数组中获取对应的IoProcessor*/ 15 processor = pool[Math.abs((int) session.getId()) % pool.length]; 16 17 if (processor == null) { 18 throw new IllegalStateException("A disposed processor cannot be accessed."); 19 } 20 /** 给session设置IoProcessor属性*/ 21 session.setAttributeIfAbsent(PROCESSOR, processor); 22 } 23 24 return processor; 25 }
每一个SocketSession会绑定一个IoProcessor,并会缓存在session的attribute中,而获取IoProcessor的方式则是通过取模算法从IoProcessor数组中获取。
然后调用IoProcessor的add方法将session添加到IoProcessor中,处理IO数据的IoProcessor实现类为NioProcessor,实现的add方法源码如下:
1 /** 新创建的session队列*/ 2 private final Queue<S> newSessions = new ConcurrentLinkedQueue<>(); 3 4 /** 需要移除的session队列 */ 5 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>(); 6 7 /** 需要刷新的session队列*/ 8 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>(); 9 10 public final void add(S session) { 11 if (disposed || disposing) { 12 throw new IllegalStateException("Already disposed."); 13 } 14 15 /** 将新的session加入到队列中*/ 16 newSessions.add(session); 17 /** 开启IoProcessor*/ 18 startupProcessor(); 19 } 20 21 private void startupProcessor() { 22 /** 原子引用类型,如果不存在就新建Processor*/ 23 Processor processor = processorRef.get(); 24 25 if (processor == null) { 26 processor = new Processor(); 27 28 if (processorRef.compareAndSet(null, processor)) { 29 /** 将可执行的Processor添加到线程池中执行 */ 30 executor.execute(new NamePreservingRunnable(processor, threadName)); 31 } 32 } 33 34 /** 唤醒Selector, 防止一直被阻塞*/ 35 wakeup(); 36 }
这里将session添加到队列中,然后开启IoProcessor线程,过程和IoSession类型,都是封装成一个NamePreservingRunnable对象交给线程池去执行,而这里的线程池和IoService的线程池是独立的,没给IoProcessor都有一个自己的线程池
Processor具体的执行逻辑如下:
1 /** Processor线程,用于处理IO读写操作*/ 2 private class Processor implements Runnable { 3 public void run() { 4 assert (processorRef.get() == this); 5 //会话数量 6 int nSessions = 0; 7 lastIdleCheckTime = System.currentTimeMillis(); 8 /** 重试次数*/ 9 int nbTries = 10; 10 11 for (;;) { 12 try { 13 long t0 = System.currentTimeMillis(); 14 int selected = select(SELECT_TIMEOUT); 15 long t1 = System.currentTimeMillis(); 16 /** 计算select()方法执行的时间*/ 17 long delta = t1 - t0; 18 19 if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) { 20 // Last chance : the select() may have been 21 // interrupted because we have had an closed channel. 22 if (isBrokenConnection()) { 23 LOG.warn("Broken connection"); 24 } else { 25 // Ok, we are hit by the nasty epoll 26 // spinning. 27 // Basically, there is a race condition 28 // which causes a closing file descriptor not to be 29 // considered as available as a selected channel, 30 // but 31 // it stopped the select. The next time we will 32 // call select(), it will exit immediately for the 33 // same 34 // reason, and do so forever, consuming 100% 35 // CPU. 36 // We have to destroy the selector, and 37 // register all the socket on a new one. 38 if (nbTries == 0) { 39 LOG.warn("Create a new selector. Selected is 0, delta = " + delta); 40 registerNewSelector(); 41 nbTries = 10; 42 } else { 43 nbTries--; 44 } 45 } 46 } else { 47 nbTries = 10; 48 } 49 50 /** 处理新添加的Session 51 * 将session对应的channel注册到当前IoProcessor的Selector上,并监听OP_READ事件 52 * */ 53 nSessions += handleNewSessions(); 54 55 updateTrafficMask(); 56 57 // Now, if we have had some incoming or outgoing events, 58 // deal with them 59 if (selected > 0) { 60 /** 当select()返回值大于0表示当前已经有IO事件需要处理,则执行process方法进行处理*/ 61 process(); 62 } 63 64 // Write the pending requests 65 long currentTime = System.currentTimeMillis(); 66 flush(currentTime); 67 68 /** 69 * 移除所有需要移除的session,移除之前会将所有需要发送的数据发送给客户端 70 * */ 71 nSessions -= removeSessions(); 72 73 /** 更新所有session的读空闲、写空闲和读写空闲的时间值 */ 74 notifyIdleSessions(currentTime); 75 76 // Get a chance to exit the infinite loop if there are no 77 // more sessions on this Processor 78 if (nSessions == 0) { 79 processorRef.set(null); 80 81 if (newSessions.isEmpty() && isSelectorEmpty()) { 82 // newSessions.add() precedes startupProcessor 83 assert (processorRef.get() != this); 84 break; 85 } 86 87 assert (processorRef.get() != this); 88 89 if (!processorRef.compareAndSet(null, this)) { 90 // startupProcessor won race, so must exit processor 91 assert (processorRef.get() != this); 92 break; 93 } 94 95 assert (processorRef.get() == this); 96 } 97 98 /** 当IoProcessor销毁了,则移除所有的客户端的SocketSession*/ 99 if (isDisposing()) { 100 boolean hasKeys = false; 101 102 for (Iterator<S> i = allSessions(); i.hasNext();) { 103 IoSession session = i.next(); 104 105 if (session.isActive()) { 106 scheduleRemove((S)session); 107 hasKeys = true; 108 } 109 } 110 111 if (hasKeys) { 112 wakeup(); 113 } 114 } 115 } catch (ClosedSelectorException cse) { 116 // If the selector has been closed, we can exit the loop 117 // But first, dump a stack trace 118 ExceptionMonitor.getInstance().exceptionCaught(cse); 119 break; 120 } catch (Exception e) { 121 ExceptionMonitor.getInstance().exceptionCaught(e); 122 123 try { 124 Thread.sleep(1000); 125 } catch (InterruptedException e1) { 126 ExceptionMonitor.getInstance().exceptionCaught(e1); 127 } 128 } 129 } 130 131 try { 132 synchronized (disposalLock) { 133 if (disposing) { 134 doDispose(); 135 } 136 } 137 } catch (Exception e) { 138 ExceptionMonitor.getInstance().exceptionCaught(e); 139 } finally { 140 disposalFuture.setValue(true); 141 } 142 } 143 }
这里代码比较多,但是核心的不多,主要功能如下:
1、处理所有新加入的Session,将session对应的channel注册到Selector中,并监听OP_READ事件
2、处理所有的需要移除的session,将session从Selector中移除监听
3、调用Selector的select()方法获取IO事件,如果存在IO事件,则调用process()方法进行处理
4、更新所有session的读写空闲时间
而处理IO事件的逻辑全部在process方法中处理,源码如下:
1 /** 处理IoProcessor的Selector监听返回的所有IO事件 */ 2 private void process() throws Exception { 3 /** 遍历所有的SelectionKey*/ 4 for (Iterator<S> i = selectedSessions(); i.hasNext();) { 5 S session = i.next(); 6 /** 处理单个客户端session的IO事件*/ 7 process(session); 8 i.remove(); 9 } 10 } 11 /** 获取所有的SelectionKey*/ 12 protected Iterator<NioSession> selectedSessions() { 13 return new IoSessionIterator(selector.selectedKeys()); 14 }
process方法的逻辑不多,先是获取selector所有的IO事件SelectionKey集合,然后进行遍历所有的SelectionKey集合,调用process(session)方法处理每个Session的IO事件,源码如下:
1 /***/ 2 private void process(S session) { 3 /** 处理读事件 */ 4 if (isReadable(session) && !session.isReadSuspended()) { 5 /** 处理session的读事件*/ 6 read(session); 7 } 8 9 /** 处理写事件 */ 10 if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) { 11 /** 刷新session */ 12 flushingSessions.add(session); 13 } 14 } 15 16 private void read(S session) { 17 /** 获取配置的读缓冲区大小*/ 18 IoSessionConfig config = session.getConfig(); 19 int bufferSize = config.getReadBufferSize(); 20 /** 分配指定大小的缓冲区*/ 21 IoBuffer buf = IoBuffer.allocate(bufferSize); 22 23 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); 24 25 try { 26 int readBytes = 0; 27 int ret; 28 29 try { 30 if (hasFragmentation) { 31 /** 调用read方法将session中的IO数据读取到缓冲区*/ 32 while ((ret = read(session, buf)) > 0) { 33 readBytes += ret; 34 35 if (!buf.hasRemaining()) { 36 break; 37 } 38 } 39 } else { 40 /** 调用read方法将session中的IO数据读取到缓冲区*/ 41 ret = read(session, buf); 42 if (ret > 0) { 43 readBytes = ret; 44 } 45 } 46 } finally { 47 /** 从读模式切换到写模式 */ 48 buf.flip(); 49 } 50 51 if (readBytes > 0) { 52 IoFilterChain filterChain = session.getFilterChain(); 53 /** 将读到的数据交给过滤器链进行过滤处理 */ 54 filterChain.fireMessageReceived(buf); 55 buf = null; 56 57 if (hasFragmentation) { 58 /** 判断读取到的数据是否小于缓冲区总大小的一半*/ 59 if (readBytes << 1 < config.getReadBufferSize()) { 60 /** 缩小缓冲区大小 */ 61 session.decreaseReadBufferSize(); 62 } else if (readBytes == config.getReadBufferSize()) { 63 /** 扩大缓冲区大小 */ 64 session.increaseReadBufferSize(); 65 } 66 } 67 } 68 69 if (ret < 0) { 70 /** 过滤器链关闭 */ 71 IoFilterChain filterChain = session.getFilterChain(); 72 filterChain.fireInputClosed(); 73 } 74 } catch (Exception e) { 75 if (e instanceof IOException) { 76 if (!(e instanceof PortUnreachableException) 77 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass()) 78 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) { 79 scheduleRemove(session); 80 } 81 } 82 /** 如果抛异常,则通知过滤器链执行异常事件*/ 83 IoFilterChain filterChain = session.getFilterChain(); 84 filterChain.fireExceptionCaught(e); 85 } 86 }
这里逻辑比较多,但是整体逻辑不复杂,主要逻辑如下:
1、如果是可读事件,那么就执行read方法进行IO数据的读取
2、根据配置的读缓冲区大小创建IoBuffer对象进行内存分配
3、调用read方法从session中读取数据存到IoBuffer中,读取完成将IoBuffer从读模式切换到写模式
4、将读取到的IoBuffer数据交给过滤器链进行所有IO过滤器进行过滤处理
5、将缓冲区大小进行扩容或降容处理
6、将连接关闭或连接异常的事件交给过滤器链进行对应的处理
接下来就针对每一个步骤分别进行源码分析
1、分配缓冲区
1 /** 内存分配工具 */ 2 private static IoBufferAllocator allocator = new SimpleBufferAllocator(); 3 4 /** 是否使用直接内存,默认为false表示使用堆内内存,值为true表示使用直接内存 */ 5 private static boolean useDirectBuffer = false; 6 7 public static IoBuffer allocate(int capacity) { 8 return allocate(capacity, useDirectBuffer); 9 } 10 11 public static IoBuffer allocate(int capacity, boolean useDirectBuffer) { 12 if (capacity < 0) { 13 throw new IllegalArgumentException("capacity: " + capacity); 14 } 15 /** 调用内存分配工具的allocate方法进行内存分配 */ 16 return allocator.allocate(capacity, useDirectBuffer); 17 }
分配缓冲区一共有两个参数,分别是缓冲区的大小和是否使用直接内存,最终调用内存分配器的allocate方法进行内存分配,源码如下:
1 /** 创建IoBuffer对象,分配内存 */ 2 public IoBuffer allocate(int capacity, boolean direct) { 3 return wrap(allocateNioBuffer(capacity, direct)); 4 } 5 6 /** 创建NIO的ByteBuffer对象*/ 7 public ByteBuffer allocateNioBuffer(int capacity, boolean direct) { 8 ByteBuffer nioBuffer; 9 if (direct) { 10 /** 分配直接内存*/ 11 nioBuffer = ByteBuffer.allocateDirect(capacity); 12 } else { 13 /** 分配堆内内存*/ 14 nioBuffer = ByteBuffer.allocate(capacity); 15 } 16 return nioBuffer; 17 } 18 19 /** 将ByteBuffer对象包装成IoBuffer对象 */ 20 public IoBuffer wrap(ByteBuffer nioBuffer) { 21 return new SimpleBuffer(nioBuffer); 22 }
这里就回到了Java的NIO分配内存的逻辑了,MINA分配缓冲区的逻辑实际底层就是调用了Java NIO的ByteBuffer的分配逻辑,根据直接内存还是堆内内存进行内存分配,最终分配了之后再封装成了IoBuffer对象。
2、IO数据读取
1 @Override 2 protected int read(NioSession session, IoBuffer buf) throws Exception { 3 /** 从session中获取channel*/ 4 ByteChannel channel = session.getChannel(); 5 /** 调用Java NIO的channel.read(ByteBuffer buffer)方法读取数据 */ 6 return channel.read(buf.buf()); 7 }
读取数据的逻辑也不复杂,同样是直接调用了Java NIO的channel读取Buffer数据的方式进行读取,具体的实现逻辑可以参考Java NIO的channel读取缓冲区数据的逻辑
3、过滤器链对IO数据进行过滤处理
1 public void fireMessageReceived(Object message) { 2 if (message instanceof IoBuffer) { 3 session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis()); 4 } 5 6 callNextMessageReceived(head, session, message); 7 } 8 9 public final void increaseReadBytes(long increment, long currentTime) { 10 if (increment <= 0) { 11 return; 12 } 13 //统计已读字节数 14 readBytes += increment; 15 //设置上一次读操作时间 16 lastReadTime = currentTime; 17 //设置空闲为0 18 idleCountForBoth.set(0); 19 idleCountForRead.set(0); 20 21 if (getService() instanceof AbstractIoService) { 22 ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime); 23 } 24 } 25 26 /** 通知下一个节点处理接收消息事件 */ 27 private void callNextMessageReceived(Entry entry, IoSession session, Object message) { 28 try { 29 IoFilter filter = entry.getFilter(); 30 NextFilter nextFilter = entry.getNextFilter(); 31 /** 依次执行每一个IoFilter的messageReceived方法 */ 32 filter.messageReceived(nextFilter, session, message); 33 } catch (Exception e) { 34 fireExceptionCaught(e); 35 } catch (Error e) { 36 fireExceptionCaught(e); 37 throw e; 38 } 39 }
过滤器链路处理消息时,会从过滤器链路的头节点开始依次执行messageReceived方法,直到执行到最后一个过滤器,过滤器链路头节点和尾节点是固定不变的,头节点实现为HeadFilter,尾节点实现为TailFilter。
当处理读事件时,会从HeadFilter开始处理,然后按自定义的过滤器依次执行,最后执行TailFilter的处理;而处理写事件时,顺序完全相反,会从TailFilter开始到HeadFilter结束。所以读数据时最后会执行TailFilter的messageReceived方法,源码如下:
1 @Override 2 public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { 3 AbstractIoSession s = (AbstractIoSession) session; 4 5 if (!(message instanceof IoBuffer)) { 6 s.increaseReadMessages(System.currentTimeMillis()); 7 } else if (!((IoBuffer) message).hasRemaining()) { 8 s.increaseReadMessages(System.currentTimeMillis()); 9 } 10 11 // Update the statistics 12 if (session.getService() instanceof AbstractIoService) { 13 ((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis()); 14 } 15 16 try { 17 /** 尾部过滤器将消息交给业务处理器IoHandler处理 */ 18 session.getHandler().messageReceived(s, message); 19 } finally { 20 if (s.getConfig().isUseReadOperation()) { 21 s.offerReadFuture(message); 22 } 23 } 24 }
可以看出TailFilter最重要的一个步骤是需要将发送的IO数据交给业务层处理,通过从IoSession中获取IoHandler对象,并调用IoHandler的messageReceived方法交给业务层处理IO数据
4、缓冲区扩容或降容处理
1 /** 扩容读缓冲区大小为原先的两倍 */ 2 public final void increaseReadBufferSize() { 3 /** 扩大两倍*/ 4 int newReadBufferSize = getConfig().getReadBufferSize() << 1; 5 if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) { 6 getConfig().setReadBufferSize(newReadBufferSize); 7 } else { 8 getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize()); 9 } 10 11 deferDecreaseReadBuffer = true; 12 } 13 14 /** 15 * 降容读缓冲区大小为原先的一半 16 */ 17 public final void decreaseReadBufferSize() { 18 if (deferDecreaseReadBuffer) { 19 deferDecreaseReadBuffer = false; 20 return; 21 } 22 23 if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) { 24 /** 缩小一半*/ 25 getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1); 26 } 27 28 deferDecreaseReadBuffer = true; 29 }
配置的读缓冲区大小和实际的IO数据大小可能会存在偏差,所以需要动态的调整读缓冲区的大小,避免缓冲区内存不足或内存浪费,每次扩容都会扩容到原大小的两倍,降容也会缩小到原先的一半
5、连接关闭或连接异常事件处理
1 public void fireInputClosed() { 2 /** 获取过滤器链头节点*/ 3 Entry head = this.head; 4 callNextInputClosed(head, session); 5 } 6 7 /** 通知下一个节点处理IO输入关闭事件*/ 8 private void callNextInputClosed(Entry entry, IoSession session) { 9 try { 10 IoFilter filter = entry.getFilter(); 11 /** 获取下一个过滤器*/ 12 NextFilter nextFilter = entry.getNextFilter(); 13 /** 处理IO输入关闭事件*/ 14 filter.inputClosed(nextFilter, session); 15 } catch (Throwable e) { 16 fireExceptionCaught(e); 17 } 18 } 19 20 public void fireExceptionCaught(Throwable cause) { 21 callNextExceptionCaught(head, session, cause); 22 } 23 24 private void callNextExceptionCaught(Entry entry, IoSession session, Throwable cause) { 25 /** 唤醒关联的Future*/ 26 ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE); 27 if (future == null) { 28 try { 29 IoFilter filter = entry.getFilter(); 30 NextFilter nextFilter = entry.getNextFilter(); 31 /** 执行过滤的异常处理事件*/ 32 filter.exceptionCaught(nextFilter, session, cause); 33 } catch (Throwable e) { 34 LOGGER.warn("Unexpected exception from exceptionCaught handler.", e); 35 } 36 } else { 37 /** 关闭session*/ 38 if (!session.isClosing()) { 39 // Call the closeNow method only if needed 40 session.closeNow(); 41 } 42 43 future.setException(cause); 44 } 45 }
处理逻辑基本上一样,都是先获取过滤器链的头节点,开始处理对应的事件,然后再依次调用下一个节点的过滤器的方法。另外最后一个过滤器TailFilter会将对应的事件会最终交给业务层处理,调用IoHandler的对应方法,让业务层处理事件。
总结MINA服务端的工作机制:
1、创建IoService,每个IoService内部有一个线程池用于处理客户端的连接请求,并且有一个IoProcessor池,默认数量为CPU个数+1,用于处理客户端的IO操作
2、IoService内部有一个Selector用于监听客户端的连接请求,连接成功之后会创建IoSession并交给IoProcessor池处理
3、IoProcessor池根据IoSession的ID进行取模算法选取一个IoProcessor和IoSession进行绑定,一个IoProcessor可以绑定多个IoSession,一个IoSession只可以绑定一个IoProcessor
4、IoProcessor内部也有一个Selector用来监控所有关联的IoSession的IO操作状态,并且有一个线程池用来处理IO操作
5、IoProcessor将处理的IO事件交给IoService绑定的过滤器链,过滤器链上的所有过滤器依次处理IO事件
7、过滤器链的最后一个过滤器为TailFilter,处理完IO事件之后将IO事件交给业务层处理器IoHandler
8、IoHandler处理业务数据,而写数据的话顺序完全相反,才业务层到过滤器链路,通过HeadFilter再由IoProcessor发送出去