zoukankan      html  css  js  c++  java
  • NIO之路4--MINA框架源码解析

    MINA框架是基于NIO的异步IO框架,上一文已经对MINA的理论及实践做了分析,本文将对于MINA的整体源码实现进行分析。

    通过MINA的实际案例可以发现,MINA的IO实现相比于NIO的使用要简单很多,因为不需要关心IO的具体实现,只需要关心具体的IO数据即可。MINA服务端整体步骤一共就四步:

    1、创建IoService:初始化IoService,服务端就是创建IoAcceptor对象,客户端就是创建IoConnector对象

    2、添加IoFilter:添加IO过滤器,每个IoService内部都有一个IO过滤器链IoFIlterChain,调用addBefore或addLast等方法将IO过滤器添加到过滤器链上

    3、设置IoHandler:给IoService设置IO数据处理器IoHandler对象,用来处理器具体的IO业务数据

    4、绑定监听端口号:调用IoService的bind方法监听服务端需要监听的端口号,并开启Selector监听客户端连接

    一、初始化IoService

    服务器创建IoService是直接创建IoService的子接口IoAcceptor,实现类为NioSocketAcceptor,实例化代码如下:

    1     /** 创建默认实例*/
    2     IoAcceptor acceptor1 = new NioSocketAcceptor();
    3 
    4     /** 创建指定数量IoProcessor的实例*/
    5     IoAcceptor acceptor2 = new NioSocketAcceptor(2);
    6 
    7     /** 创建指定线程池的实例*/
    8     Executor executor = Executors.newFixedThreadPool(4);
    9     IoAcceptor acceptor3 = new NioSocketAcceptor(executor, new SimpleIoProcessorPool<>(NioProcessor.class));

    虽然NioSocketAcceptor是实现了IoAcceptor接口,但是并不是直接实现的,而是继承了抽象的实现了IoAcceptor接口的父类AbstractPollingIoAcceptor,按默认的构造方法为例,初始化代码如下:

      1 /** 无参构造函数*/
      2     public NioSocketAcceptor() {
      3         /** 调用父类构造函数*/
      4         super(new DefaultSocketSessionConfig(), NioProcessor.class);
      5         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
      6     }
      7 
      8     /** AbstractPollingIoAcceptor构造函数
      9      * @param sessionConfig:IoSession的全局配置
     10      * @param processorClass:IoProcessor的Class
     11      * */
     12     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
     13         /** 调用重载构造函数*/
     14         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
     15     }
     16 
     17     /** AbstractPollingIoAcceptor构造函数
     18      * @param sessionConfig:IoSession的全局配置
     19      * @param executor:线程池
     20      * @param processor:IoProcessor对象
     21      * @param createdProcessor : 是否创建Processor
     22      * @param selectorProvider : SelectorProvider对象,用于创建Selector
     23      * */
     24     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
     25                                       boolean createdProcessor, SelectorProvider selectorProvider) {
     26         /** 1.调用父亲AbstractIoAcceptor的构造函数*/
     27         super(sessionConfig, executor);
     28 
     29         if (processor == null) {
     30             throw new IllegalArgumentException("processor");
     31         }
     32         /** 2.设置属性processor、createdProcessor值*/
     33         this.processor = processor;
     34         this.createdProcessor = createdProcessor;
     35 
     36         try {
     37             /** 3.根据SelectorProvider对象初始化Selector,和NIO的根据SelectorProvider获取Selector对象逻辑一样 */
     38             init(selectorProvider);
     39 
     40             //标记当前IoAcceptor的Selector已经创建完成,可以接收客户端的连接请求了
     41             selectable = true;
     42         } catch (RuntimeException e) {
     43             throw e;
     44         } catch (Exception e) {
     45             throw new RuntimeIoException("Failed to initialize.", e);
     46         } finally {
     47             if (!selectable) {
     48                 try {
     49                     destroy();
     50                 } catch (Exception e) {
     51                     ExceptionMonitor.getInstance().exceptionCaught(e);
     52                 }
     53             }
     54         }
     55     }
     56 
     57     /** AbstractIoAcceptor构造函数*/
     58     protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
     59         /** 调用父类AbstractIoService构造函数 */
     60         super(sessionConfig, executor);
     61         defaultLocalAddresses.add(null);
     62     }
     63 
     64     /** AbstractIoService构造函数 */
     65     protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
     66         /** 参数校验*/
     67         if (sessionConfig == null) {
     68             throw new IllegalArgumentException("sessionConfig");
     69         }
     70 
     71         if (getTransportMetadata() == null) {
     72             throw new IllegalArgumentException("TransportMetadata");
     73         }
     74 
     75         if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
     76             throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
     77                     + getTransportMetadata().getSessionConfigType() + ")");
     78         }
     79 
     80         /** 创建IoServiceListenerSupport对象,IoServiceListenerSupport内部有一个IoServiceListener的列表
     81          *  将当前的IoServerListener对象添加到IoServiceListenerSupport的列表中
     82          *  */
     83         listeners = new IoServiceListenerSupport(this);
     84         listeners.add(serviceActivationListener);
     85 
     86         /** 设置属性 sessionConfig*/
     87         this.sessionConfig = sessionConfig;
     88 
     89         //加载异常监听器ExceptionMonitor对象,提前加载防止在使用的时候还没有初始化
     90         ExceptionMonitor.getInstance();
     91         /** 设置线程池,如果没有自定义,就采用默认的newCachedThreadPool*/
     92         if (executor == null) {
     93             this.executor = Executors.newCachedThreadPool();
     94             createdExecutor = true;
     95         } else {
     96             this.executor = executor;
     97             createdExecutor = false;
     98         }
     99 
    100         threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
    101     }

    整个的初始化过程都是在给IoAcceptor的一些属性进行初始化,核心属性包括sessionConfig、executor、IoProcessor等

    其中SessionConfig表示IoSession的全局属性配置,主要配置如下:

      1 public interface IoSessionConfig {
      2 
      3         /**
      4          * 获取IoProcessor读取数据的缓冲区大小
      5          */
      6         int getReadBufferSize();
      7 
      8         /**
      9          * 设置IoProcessor读取数据的缓冲区大小,通常会由IoProcessor自动动态调整
     10          */
     11         void setReadBufferSize(int readBufferSize);
     12 
     13         /**
     14          * 获取IoProcessor读取属性的缓冲区大小的最小值
     15          */
     16         int getMinReadBufferSize();
     17 
     18         /**
     19          * 设置IoProcessor读取属性的缓冲区大小的最小值
     20          */
     21         void setMinReadBufferSize(int minReadBufferSize);
     22 
     23         /**
     24          * 获取IoProcessor读取属性的缓冲区大小的最大值
     25          */
     26         int getMaxReadBufferSize();
     27 
     28         /**
     29          * 设置IoProcessor读取属性的缓冲区大小的最大值
     30          */
     31         void setMaxReadBufferSize(int maxReadBufferSize);
     32 
     33         /**
     34          * 获取吞吐量(TPS)的统计间隔,单位为秒,默认是3秒
     35          */
     36         int getThroughputCalculationInterval();
     37 
     38         /**
     39          * 获取吞吐量(TPS)的统计间隔,单位为毫秒
     40          */
     41         long getThroughputCalculationIntervalInMillis();
     42 
     43         /**
     44          * 设置吞吐量的统计间隔时间
     45          */
     46         void setThroughputCalculationInterval(int throughputCalculationInterval);
     47 
     48         /**
     49          * 获取指定状态的空闲时间,不同状态时间可能设置的不一样, 类型如下:
     50          * READER_IDLE : 读数据空闲
     51          * WRITER_IDLE : 写数据空闲
     52          * BOTH_IDLE : 读写都空闲
     53          */
     54         int getIdleTime(IdleStatus status);
     55 
     56         /**
     57          * 获取指定状态的空闲时间,单位为毫秒
     58          */
     59         long getIdleTimeInMillis(IdleStatus status);
     60 
     61         /**
     62          * 设置指定状态的空闲时间
     63          */
     64         void setIdleTime(IdleStatus status, int idleTime);
     65 
     66         /**
     67          * 获取读空闲时间,单位秒
     68          */
     69         int getReaderIdleTime();
     70 
     71         /**
     72          * 获取读空闲时间,单位毫秒
     73          */
     74         long getReaderIdleTimeInMillis();
     75 
     76         /**
     77          * 设置读空闲时间
     78          */
     79         void setReaderIdleTime(int idleTime);
     80 
     81         /**
     82          * 获取写空闲时间,单位秒
     83          */
     84         int getWriterIdleTime();
     85 
     86         /**
     87          * 获取写空闲时间,单位毫秒
     88          */
     89         long getWriterIdleTimeInMillis();
     90 
     91         /**
     92          * 设置写空闲时间
     93          */
     94         void setWriterIdleTime(int idleTime);
     95 
     96         /**
     97          * 获取读写都空闲时间,单位秒
     98          */
     99         int getBothIdleTime();
    100 
    101         /**
    102          * 获取读写都空闲时间,单位毫秒
    103          */
    104         long getBothIdleTimeInMillis();
    105 
    106         /**
    107          * 设置读写都空闲时间
    108          */
    109         void setBothIdleTime(int idleTime);
    110 
    111         /**
    112          * 获取写超时时间,单位为秒
    113          */
    114         int getWriteTimeout();
    115 
    116         /**
    117          * 获取写超时时间,单位毫秒
    118          */
    119         long getWriteTimeoutInMillis();
    120 
    121         /**
    122          * 设置写超时时间
    123          */
    124         void setWriteTimeout(int writeTimeout);
    125 
    126         /**
    127          * 获取会话读操作是否开启
    128          */
    129         boolean isUseReadOperation();
    130 
    131         /**
    132          * 开启或关闭会话读操作,如果开启所有接收到的消息会存储在内存的BlockingQueue中,使客户端应用可以更方便读取接收的消息
    133          * 开启这个选项对服务器应用无效,并可能会导致内存泄漏,默认为关闭状态
    134          */
    135         void setUseReadOperation(boolean useReadOperation);
    136 
    137         /** 全量设置为IoSessionConfig */
    138         void setAll(IoSessionConfig config);
    139     }

    另外一个属性是线程池对象executor,如果没有自定义线程池传入的话,那么默认会调用Executors.newCachedThreadPool()创建无线程数上限的线程池,所以推荐使用自定义的线程池,避免默认的线程池没有线程数量限制导致线程过多的问题。

    还有一个属性是IoProcessor池,默认类为SimpleIoProcessorPool,SimpleIoProcessorPool内部有一个IoProcessor[] pool和一个Executor executor属性,pool是IoProcessor数组,存储所有的IoProcessor,数量可以自定义通过构造函数传入,默认的个数为当前服务器机器的CPU个数+1,比如4核CPU那么就会创建5个IoProcessor,另外每一个IoProcessor初始化的时候都会设置线程池executor属性,如果executor没有自定义同样也会使用Executors.newCachedThreadPool创建。

    总结:

    IoService初始化的时候涉及到了两个线程池,首先是IoService本身使用的线程池,IoService本身用于接收客户端的连接请求,而连接请求的处理就交给了线程池处理;

    另外IoService初始化时会创建多个IoProcessor用于处理客户端具体的IO操作,每一个IoProcessor内部有一个Selector用于监听客户端IO事件,然后将IO事件交给内部的线程池来处理

    二、添加IoFilter

    IoService内部都一个过滤器链IoFilterChainBuilder对象,默认实现类为DefaultIoFilterChainBuilder类,添加IoFilter时主要有四个方法,分别是在过滤器链的不同位置插入指定过滤器,用法分别如下:

     1         IoAcceptor acceptor = new NioSocketAcceptor();
     2         /** 获取过滤器链 */
     3         DefaultIoFilterChainBuilder ioFilterChain = acceptor.getFilterChain();
     4 
     5         /** 1.在过滤器链的首部加入过滤器 */
     6         ioFilterChain.addFirst("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
     7         /** 2.在过滤器链的指定过滤器前面加入过滤器 */
     8         ioFilterChain.addBefore("logFilter","codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
     9         /** 3.在过滤器链的指定过滤器后面加入过滤器 */
    10         ioFilterChain.addAfter("logFilter","codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
    11         /** 4.在过滤器链的尾部加入过滤器 */
    12         ioFilterChain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));

    以addFirst为例,实现逻辑如下:

     1 /** 在过滤器链首部插入过滤器
     2      * @param name:过滤器的名称
     3      * @param filter: 过滤器
     4      * */
     5     public synchronized void addFirst(String name, IoFilter filter) {
     6         /** 构造过滤器链路节点对象EntryImpl,并调用register方法加入到列表中 */
     7         register(0, new EntryImpl(name, filter));
     8     }
     9 
    10 
    11     private final List<Entry> entries;
    12 
    13     public DefaultIoFilterChainBuilder() {
    14         entries = new CopyOnWriteArrayList<Entry>();
    15     }
    16 
    17     /**
    18      * 在指定位置插入Entry节点
    19      * */
    20     private void register(int index, Entry e) {
    21         if (contains(e.getName())) {
    22             throw new IllegalArgumentException("Other filter is using the same name: " + e.getName());
    23         }
    24         //调用List的add方法在指定为止插入节点
    25         entries.add(index, e);
    26     }

    从源码可以看出添加IoFilter的逻辑比较简单,过滤器链IoFilterChainBuilder对象内部有一个列表,用于存放所有的IoFilter,添加过滤器就是将IoFilter和名称封装成Entry对象添加到列表中,不同的add方法实际就是调用List的add(int index, Entry e)方法在指定的位置插入元素。addFirst就是在list头部插入元素,addLast就是在list尾部插入元素,addBefore和addAfter先通过遍历查找指定过滤器在List中的位置,然后再将新插入的过滤器插入到指定位置

    三、设置IoHandler

     1 public final void setHandler(IoHandler handler) {
     2         if (handler == null) {
     3             throw new IllegalArgumentException("handler cannot be null");
     4         }
     5         /** 判断IoService是否活跃,主要是判断IoServiceListenerSupport是否有活跃的IoServiceListener */
     6         if (isActive()) {
     7             throw new IllegalStateException("handler cannot be set while the service is active.");
     8         }
     9         /** 设置IoHandler属性*/
    10         this.handler = handler;
    11     }

    设置IoHandler的逻辑比较简单,就是给IoService内部的handler属性赋值

    四、绑定主机

    前面三个步骤都是在初始化服务器并设置各种属性,接下来就是绑定监听的逻辑,源码如下:

     1 public final void bind(SocketAddress localAddress) throws IOException {
     2         if (localAddress == null) {
     3             throw new IllegalArgumentException("localAddress");
     4         }
     5         /** 创建地址列表,将传入的SocketAddress添加到列表中*/
     6         List<SocketAddress> localAddresses = new ArrayList<>(1);
     7         localAddresses.add(localAddress);
     8         /** 内部方法执行绑定逻辑*/
     9         bind(localAddresses);
    10     }

    调用内部重载的bind方法,源码如下:

     1 public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException {
     2         /** 参数校验*/
     3         if (isDisposing()) {
     4             throw new IllegalStateException("The Accpetor disposed is being disposed.");
     5         }
     6 
     7         if (localAddresses == null) {
     8             throw new IllegalArgumentException("localAddresses");
     9         }
    10 
    11         List<SocketAddress> localAddressesCopy = new ArrayList<>();
    12 
    13         for (SocketAddress a : localAddresses) {
    14             checkAddressType(a);
    15             localAddressesCopy.add(a);
    16         }
    17 
    18         if (localAddressesCopy.isEmpty()) {
    19             throw new IllegalArgumentException("localAddresses is empty.");
    20         }
    21 
    22         boolean activate = false;
    23         synchronized (bindLock) {
    24             synchronized (boundAddresses) {
    25                 if (boundAddresses.isEmpty()) {
    26                     activate = true;
    27                 }
    28             }
    29             /** 判断IoHandler是否为空*/
    30             if (getHandler() == null) {
    31                 throw new IllegalStateException("handler is not set.");
    32             }
    33 
    34             try {
    35                 /**  绑定主机地址 */
    36                 Set<SocketAddress> addresses = bindInternal(localAddressesCopy);
    37                 synchronized (boundAddresses) {
    38                     boundAddresses.addAll(addresses);
    39                 }
    40             } catch (IOException e) {
    41                 throw e;
    42             } catch (RuntimeException e) {
    43                 throw e;
    44             } catch (Exception e) {
    45                 throw new RuntimeIoException("Failed to bind to: " + getLocalAddresses(), e);
    46             }
    47         }
    48         /** 激活IoServiceListener */
    49         if (activate) {
    50             getListeners().fireServiceActivated();
    51         }
    52     }

    虽然代码比较多,但是核心代码就只有 bindInternal这一行, bindInternal是真正的绑定主机的方法,代码如下:

     1 /** 绑定主机*/
     2     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
     3         /** 创建一个Future对象,当IoAcceptor的Selector处理了该请求后会给Future对象发送一个信号 */
     4         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
     5 
     6         /** 添加到AcceptorOperationFuture队列中*/
     7         registerQueue.add(request);
     8 
     9         /** 开启Acceptor*/
    10         startupAcceptor();
    11 
    12         try {
    13             /** 信号量设置为1,相当于加锁处理*/
    14             lock.acquire();
    15 
    16             wakeup();
    17         } finally {
    18             /** 信号量释放, 相当于解锁处理*/
    19             lock.release();
    20         }
    21 
    22         // Now, we wait until this request is completed.
    23         request.awaitUninterruptibly();
    24 
    25         if (request.getException() != null) {
    26             throw request.getException();
    27         }
    28         Set<SocketAddress> newLocalAddresses = new HashSet<>();
    29 
    30         for (H handle : boundHandles.values()) {
    31             newLocalAddresses.add(localAddress(handle));
    32         }
    33 
    34         return newLocalAddresses;
    35     }

    核心逻辑是调用了startupAcceptor方法,代码如下:

     1 private void startupAcceptor() throws InterruptedException {
     2         if (!selectable) {
     3             registerQueue.clear();
     4             cancelQueue.clear();
     5         }
     6 
     7         /** 创建Acceptor对象, Acceptor实现了Runnable接口*/
     8         Acceptor acceptor = acceptorRef.get();
     9 
    10         if (acceptor == null) {
    11             lock.acquire();
    12             acceptor = new Acceptor();
    13 
    14             if (acceptorRef.compareAndSet(null, acceptor)) {
    15                 /** 将实现了Runnable接口的acceptor对象放入线程池中执行*/
    16                 executeWorker(acceptor);
    17             } else {
    18                 lock.release();
    19             }
    20         }
    21     }
     1 protected final void executeWorker(Runnable worker) {
     2         executeWorker(worker, null);
     3     }
     4 
     5     protected final void executeWorker(Runnable worker, String suffix) {
     6         String actualThreadName = threadName;
     7         if (suffix != null) {
     8             actualThreadName = actualThreadName + '-' + suffix;
     9         }
    10         executor.execute(new NamePreservingRunnable(worker, actualThreadName));
    11     }

    该方法的核心逻辑是创建一个Acceptor对象,Acceptor对象是实现了Runnable接口的,所以是一个可执行逻辑。然后调用executeWorker方法将Acceptor对象交给线程池执行,而线程池就是初始化IoService时初始化的线程池,默认是Executors.newCachedThreadPool

    既然Acceptor实现了Runnable接口,那么就再看下Acceptor的具体实现逻辑:

      1 private class Acceptor implements Runnable {
      2         public void run() {
      3             assert (acceptorRef.get() == this);
      4 
      5             int nHandles = 0;
      6 
      7             /** 释放锁*/
      8             lock.release();
      9 
     10             while (selectable) {
     11                 try {
     12                     /** 获取注册的监听端口的数量*/
     13                     nHandles += registerHandles();
     14 
     15                     /** 调用Selector的select()方法,会阻塞当前线程 */
     16                     int selected = select();
     17 
     18                     // Now, if the number of registred handles is 0, we can
     19                     // quit the loop: we don't have any socket listening
     20                     // for incoming connection.
     21                     if (nHandles == 0) {
     22                         acceptorRef.set(null);
     23 
     24                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
     25                             assert (acceptorRef.get() != this);
     26                             break;
     27                         }
     28 
     29                         if (!acceptorRef.compareAndSet(null, this)) {
     30                             assert (acceptorRef.get() != this);
     31                             break;
     32                         }
     33 
     34                         assert (acceptorRef.get() == this);
     35                     }
     36 
     37                     if (selected > 0) {
     38                         /** 处理Selector返回的所有的SelectionKey */
     39                         processHandles(selectedHandles());
     40                     }
     41 
     42                     // check to see if any cancellation request has been made.
     43                     nHandles -= unregisterHandles();
     44                 } catch (ClosedSelectorException cse) {
     45                     // If the selector has been closed, we can exit the loop
     46                     ExceptionMonitor.getInstance().exceptionCaught(cse);
     47                     break;
     48                 } catch (Exception e) {
     49                     ExceptionMonitor.getInstance().exceptionCaught(e);
     50 
     51                     try {
     52                         Thread.sleep(1000);
     53                     } catch (InterruptedException e1) {
     54                         ExceptionMonitor.getInstance().exceptionCaught(e1);
     55                     }
     56                 }
     57             }
     58 
     59             /** 当销毁IoService时, 销毁所有的processor*/
     60             if (selectable && isDisposing()) {
     61                 selectable = false;
     62                 try {
     63                     if (createdProcessor) {
     64                         processor.dispose();
     65                     }
     66                 } finally {
     67                     try {
     68                         synchronized (disposalLock) {
     69                             if (isDisposing()) {
     70                                 destroy();
     71                             }
     72                         }
     73                     } catch (Exception e) {
     74                         ExceptionMonitor.getInstance().exceptionCaught(e);
     75                     } finally {
     76                         disposalFuture.setDone();
     77                     }
     78                 }
     79             }
     80         }
     81 
     82         /**
     83          * This method will process new sessions for the Worker class.  All
     84          * keys that have had their status updates as per the Selector.selectedKeys()
     85          * method will be processed here.  Only keys that are ready to accept
     86          * connections are handled here.
     87          * <p/>
     88          * Session objects are created by making new instances of SocketSessionImpl
     89          * and passing the session object to the SocketIoProcessor class.
     90          */
     91         @SuppressWarnings("unchecked")
     92         private void processHandles(Iterator<H> handles) throws Exception {
     93             /** 遍历所有SelectionKey*/
     94             while (handles.hasNext()) {
     95                 H handle = handles.next();
     96                 handles.remove();
     97 
     98                 /** 处理客户端连接请求,封装成NioSocketSession对象*/
     99                 S session = accept(processor, handle);
    100 
    101                 if (session == null) {
    102                     continue;
    103                 }
    104                 /** 初始化客户端Session对象, 设置上一次读写时间 */
    105                 initSession(session, null, null);
    106 
    107                 /** 将客户端Session添加到IoProcessor线程池,并分配一个IoProcessor和Session进行绑定 */
    108                 session.getProcessor().add(session);
    109             }
    110         }
    111     }
    112 
    113     protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
    114 
    115         SelectionKey key = null;
    116 
    117         if (handle != null) {
    118             key = handle.keyFor(selector);
    119         }
    120         /** key有效且必须触发了OP_ACCEPT事件,其他事件不处理*/
    121         if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
    122             return null;
    123         }
    124 
    125         /** 获取客户端连接的Channel*/
    126         SocketChannel ch = handle.accept();
    127 
    128         if (ch == null) {
    129             return null;
    130         }
    131         /** 封装客户端连接为NioSocketSession对象 */
    132         return new NioSocketSession(this, processor, ch);
    133     }
    134 
    135     public NioSocketSession(IoService service, IoProcessor<NioSession> processor, SocketChannel channel) {
    136         /** 调用父类构造函数 设置IoProcessor、IoService、SocketChannel属性*/
    137         super(processor, service, channel);
    138         config = new SessionConfigImpl();
    139         /** 将IoService的全局SessionConfig复制给当前session的配置*/
    140         config.setAll(service.getSessionConfig());
    141     }

    从代码中可以看出IoAcceptor的run方法基本上和NIO的服务器启动方法差不多,首先了调用Selector的select()方法获取所有客户端的请求连接accept事件,然后遍历所有的SelectionKey,将客户端的连接封装成NioSocketSession,最后再将NioSocketSession添加到IoProcessor线程池中,而绑定到具体的IoProcessor的逻辑是在IoProcessor的add方法中实现的,这里IoProcessor线程池是通过SimpleIoProcessorPool类实现的,源码如下:

     1 /** 添加SocketSession*/
     2     public final void add(S session) {
     3         /** 根据session获取具体的IoProcessor,并将session添加到IoProcessor中*/
     4         getProcessor(session).add(session);
     5     }
     6 
     7     private IoProcessor<S> getProcessor(S session) {
     8         IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
     9 
    10         if (processor == null) {
    11             if (disposed || disposing) {
    12                 throw new IllegalStateException("A disposed processor cannot be accessed.");
    13             }
    14             /** 根据session的ID通过取模算法从数组中获取对应的IoProcessor*/
    15             processor = pool[Math.abs((int) session.getId()) % pool.length];
    16 
    17             if (processor == null) {
    18                 throw new IllegalStateException("A disposed processor cannot be accessed.");
    19             }
    20             /** 给session设置IoProcessor属性*/
    21             session.setAttributeIfAbsent(PROCESSOR, processor);
    22         }
    23 
    24         return processor;
    25     }

    每一个SocketSession会绑定一个IoProcessor,并会缓存在session的attribute中,而获取IoProcessor的方式则是通过取模算法从IoProcessor数组中获取。

    然后调用IoProcessor的add方法将session添加到IoProcessor中,处理IO数据的IoProcessor实现类为NioProcessor,实现的add方法源码如下:

     1 /** 新创建的session队列*/
     2     private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
     3 
     4     /** 需要移除的session队列 */
     5     private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
     6 
     7     /** 需要刷新的session队列*/
     8     private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();
     9 
    10     public final void add(S session) {
    11         if (disposed || disposing) {
    12             throw new IllegalStateException("Already disposed.");
    13         }
    14 
    15         /** 将新的session加入到队列中*/
    16         newSessions.add(session);
    17         /** 开启IoProcessor*/
    18         startupProcessor();
    19     }
    20 
    21     private void startupProcessor() {
    22         /** 原子引用类型,如果不存在就新建Processor*/
    23         Processor processor = processorRef.get();
    24 
    25         if (processor == null) {
    26             processor = new Processor();
    27 
    28             if (processorRef.compareAndSet(null, processor)) {
    29                 /** 将可执行的Processor添加到线程池中执行 */
    30                 executor.execute(new NamePreservingRunnable(processor, threadName));
    31             }
    32         }
    33 
    34         /** 唤醒Selector, 防止一直被阻塞*/
    35         wakeup();
    36     }

    这里将session添加到队列中,然后开启IoProcessor线程,过程和IoSession类型,都是封装成一个NamePreservingRunnable对象交给线程池去执行,而这里的线程池和IoService的线程池是独立的,没给IoProcessor都有一个自己的线程池

    Processor具体的执行逻辑如下:

      1 /** Processor线程,用于处理IO读写操作*/
      2     private class Processor implements Runnable {
      3         public void run() {
      4             assert (processorRef.get() == this);
      5             //会话数量
      6             int nSessions = 0;
      7             lastIdleCheckTime = System.currentTimeMillis();
      8             /** 重试次数*/
      9             int nbTries = 10;
     10 
     11             for (;;) {
     12                 try {
     13                     long t0 = System.currentTimeMillis();
     14                     int selected = select(SELECT_TIMEOUT);
     15                     long t1 = System.currentTimeMillis();
     16                     /** 计算select()方法执行的时间*/
     17                     long delta = t1 - t0;
     18 
     19                     if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
     20                         // Last chance : the select() may have been
     21                         // interrupted because we have had an closed channel.
     22                         if (isBrokenConnection()) {
     23                             LOG.warn("Broken connection");
     24                         } else {
     25                             // Ok, we are hit by the nasty epoll
     26                             // spinning.
     27                             // Basically, there is a race condition
     28                             // which causes a closing file descriptor not to be
     29                             // considered as available as a selected channel,
     30                             // but
     31                             // it stopped the select. The next time we will
     32                             // call select(), it will exit immediately for the
     33                             // same
     34                             // reason, and do so forever, consuming 100%
     35                             // CPU.
     36                             // We have to destroy the selector, and
     37                             // register all the socket on a new one.
     38                             if (nbTries == 0) {
     39                                 LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
     40                                 registerNewSelector();
     41                                 nbTries = 10;
     42                             } else {
     43                                 nbTries--;
     44                             }
     45                         }
     46                     } else {
     47                         nbTries = 10;
     48                     }
     49 
     50                     /** 处理新添加的Session
     51                      *  将session对应的channel注册到当前IoProcessor的Selector上,并监听OP_READ事件
     52                      * */
     53                     nSessions += handleNewSessions();
     54 
     55                     updateTrafficMask();
     56 
     57                     // Now, if we have had some incoming or outgoing events,
     58                     // deal with them
     59                     if (selected > 0) {
     60                         /** 当select()返回值大于0表示当前已经有IO事件需要处理,则执行process方法进行处理*/
     61                         process();
     62                     }
     63 
     64                     // Write the pending requests
     65                     long currentTime = System.currentTimeMillis();
     66                     flush(currentTime);
     67 
     68                     /**
     69                      * 移除所有需要移除的session,移除之前会将所有需要发送的数据发送给客户端
     70                      * */
     71                     nSessions -= removeSessions();
     72 
     73                     /** 更新所有session的读空闲、写空闲和读写空闲的时间值 */
     74                     notifyIdleSessions(currentTime);
     75 
     76                     // Get a chance to exit the infinite loop if there are no
     77                     // more sessions on this Processor
     78                     if (nSessions == 0) {
     79                         processorRef.set(null);
     80 
     81                         if (newSessions.isEmpty() && isSelectorEmpty()) {
     82                             // newSessions.add() precedes startupProcessor
     83                             assert (processorRef.get() != this);
     84                             break;
     85                         }
     86 
     87                         assert (processorRef.get() != this);
     88 
     89                         if (!processorRef.compareAndSet(null, this)) {
     90                             // startupProcessor won race, so must exit processor
     91                             assert (processorRef.get() != this);
     92                             break;
     93                         }
     94 
     95                         assert (processorRef.get() == this);
     96                     }
     97 
     98                     /** 当IoProcessor销毁了,则移除所有的客户端的SocketSession*/
     99                     if (isDisposing()) {
    100                         boolean hasKeys = false;
    101 
    102                         for (Iterator<S> i = allSessions(); i.hasNext();) {
    103                             IoSession session = i.next();
    104 
    105                             if (session.isActive()) {
    106                                 scheduleRemove((S)session);
    107                                 hasKeys = true;
    108                             }
    109                         }
    110 
    111                         if (hasKeys) {
    112                             wakeup();
    113                         }
    114                     }
    115                 } catch (ClosedSelectorException cse) {
    116                     // If the selector has been closed, we can exit the loop
    117                     // But first, dump a stack trace
    118                     ExceptionMonitor.getInstance().exceptionCaught(cse);
    119                     break;
    120                 } catch (Exception e) {
    121                     ExceptionMonitor.getInstance().exceptionCaught(e);
    122 
    123                     try {
    124                         Thread.sleep(1000);
    125                     } catch (InterruptedException e1) {
    126                         ExceptionMonitor.getInstance().exceptionCaught(e1);
    127                     }
    128                 }
    129             }
    130 
    131             try {
    132                 synchronized (disposalLock) {
    133                     if (disposing) {
    134                         doDispose();
    135                     }
    136                 }
    137             } catch (Exception e) {
    138                 ExceptionMonitor.getInstance().exceptionCaught(e);
    139             } finally {
    140                 disposalFuture.setValue(true);
    141             }
    142         }
    143     }

    这里代码比较多,但是核心的不多,主要功能如下:

    1、处理所有新加入的Session,将session对应的channel注册到Selector中,并监听OP_READ事件

    2、处理所有的需要移除的session,将session从Selector中移除监听

    3、调用Selector的select()方法获取IO事件,如果存在IO事件,则调用process()方法进行处理

    4、更新所有session的读写空闲时间

    而处理IO事件的逻辑全部在process方法中处理,源码如下:

     1 /** 处理IoProcessor的Selector监听返回的所有IO事件 */
     2     private void process() throws Exception {
     3         /** 遍历所有的SelectionKey*/
     4         for (Iterator<S> i = selectedSessions(); i.hasNext();) {
     5             S session = i.next();
     6             /** 处理单个客户端session的IO事件*/
     7             process(session);
     8             i.remove();
     9         }
    10     }
    11     /** 获取所有的SelectionKey*/
    12     protected Iterator<NioSession> selectedSessions() {
    13         return new IoSessionIterator(selector.selectedKeys());
    14     }

    process方法的逻辑不多,先是获取selector所有的IO事件SelectionKey集合,然后进行遍历所有的SelectionKey集合,调用process(session)方法处理每个Session的IO事件,源码如下:

     1 /***/
     2     private void process(S session) {
     3         /** 处理读事件 */
     4         if (isReadable(session) && !session.isReadSuspended()) {
     5             /** 处理session的读事件*/
     6             read(session);
     7         }
     8 
     9         /** 处理写事件 */
    10         if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
    11             /** 刷新session */
    12             flushingSessions.add(session);
    13         }
    14     }
    15 
    16     private void read(S session) {
    17         /** 获取配置的读缓冲区大小*/
    18         IoSessionConfig config = session.getConfig();
    19         int bufferSize = config.getReadBufferSize();
    20         /** 分配指定大小的缓冲区*/
    21         IoBuffer buf = IoBuffer.allocate(bufferSize);
    22 
    23         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
    24 
    25         try {
    26             int readBytes = 0;
    27             int ret;
    28 
    29             try {
    30                 if (hasFragmentation) {
    31                     /** 调用read方法将session中的IO数据读取到缓冲区*/
    32                     while ((ret = read(session, buf)) > 0) {
    33                         readBytes += ret;
    34 
    35                         if (!buf.hasRemaining()) {
    36                             break;
    37                         }
    38                     }
    39                 } else {
    40                     /** 调用read方法将session中的IO数据读取到缓冲区*/
    41                     ret = read(session, buf);
    42                     if (ret > 0) {
    43                         readBytes = ret;
    44                     }
    45                 }
    46             } finally {
    47                 /** 从读模式切换到写模式 */
    48                 buf.flip();
    49             }
    50 
    51             if (readBytes > 0) {
    52                 IoFilterChain filterChain = session.getFilterChain();
    53                 /** 将读到的数据交给过滤器链进行过滤处理 */
    54                 filterChain.fireMessageReceived(buf);
    55                 buf = null;
    56 
    57                 if (hasFragmentation) {
    58                     /** 判断读取到的数据是否小于缓冲区总大小的一半*/
    59                     if (readBytes << 1 < config.getReadBufferSize()) {
    60                         /** 缩小缓冲区大小 */
    61                         session.decreaseReadBufferSize();
    62                     } else if (readBytes == config.getReadBufferSize()) {
    63                         /** 扩大缓冲区大小 */
    64                         session.increaseReadBufferSize();
    65                     }
    66                 }
    67             }
    68 
    69             if (ret < 0) {
    70                 /** 过滤器链关闭 */
    71                 IoFilterChain filterChain = session.getFilterChain();
    72                 filterChain.fireInputClosed();
    73             }
    74         } catch (Exception e) {
    75             if (e instanceof IOException) {
    76                 if (!(e instanceof PortUnreachableException)
    77                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
    78                         || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
    79                     scheduleRemove(session);
    80                 }
    81             }
    82             /** 如果抛异常,则通知过滤器链执行异常事件*/
    83             IoFilterChain filterChain = session.getFilterChain();
    84             filterChain.fireExceptionCaught(e);
    85         }
    86     }

    这里逻辑比较多,但是整体逻辑不复杂,主要逻辑如下:

    1、如果是可读事件,那么就执行read方法进行IO数据的读取

    2、根据配置的读缓冲区大小创建IoBuffer对象进行内存分配

    3、调用read方法从session中读取数据存到IoBuffer中,读取完成将IoBuffer从读模式切换到写模式

    4、将读取到的IoBuffer数据交给过滤器链进行所有IO过滤器进行过滤处理

    5、将缓冲区大小进行扩容或降容处理

    6、将连接关闭或连接异常的事件交给过滤器链进行对应的处理

     接下来就针对每一个步骤分别进行源码分析

    1、分配缓冲区

     1 /** 内存分配工具 */
     2     private static IoBufferAllocator allocator = new SimpleBufferAllocator();
     3 
     4     /** 是否使用直接内存,默认为false表示使用堆内内存,值为true表示使用直接内存 */
     5     private static boolean useDirectBuffer = false;
     6 
     7     public static IoBuffer allocate(int capacity) {
     8         return allocate(capacity, useDirectBuffer);
     9     }
    10 
    11     public static IoBuffer allocate(int capacity, boolean useDirectBuffer) {
    12         if (capacity < 0) {
    13             throw new IllegalArgumentException("capacity: " + capacity);
    14         }
    15         /** 调用内存分配工具的allocate方法进行内存分配 */
    16         return allocator.allocate(capacity, useDirectBuffer);
    17     }

    分配缓冲区一共有两个参数,分别是缓冲区的大小和是否使用直接内存,最终调用内存分配器的allocate方法进行内存分配,源码如下:

     1 /** 创建IoBuffer对象,分配内存 */
     2     public IoBuffer allocate(int capacity, boolean direct) {
     3         return wrap(allocateNioBuffer(capacity, direct));
     4     }
     5 
     6     /** 创建NIO的ByteBuffer对象*/
     7     public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
     8         ByteBuffer nioBuffer;
     9         if (direct) {
    10             /** 分配直接内存*/
    11             nioBuffer = ByteBuffer.allocateDirect(capacity);
    12         } else {
    13             /** 分配堆内内存*/
    14             nioBuffer = ByteBuffer.allocate(capacity);
    15         }
    16         return nioBuffer;
    17     }
    18 
    19     /** 将ByteBuffer对象包装成IoBuffer对象 */
    20     public IoBuffer wrap(ByteBuffer nioBuffer) {
    21         return new SimpleBuffer(nioBuffer);
    22     }

    这里就回到了Java的NIO分配内存的逻辑了,MINA分配缓冲区的逻辑实际底层就是调用了Java NIO的ByteBuffer的分配逻辑,根据直接内存还是堆内内存进行内存分配,最终分配了之后再封装成了IoBuffer对象。

    2、IO数据读取

    1 @Override
    2     protected int read(NioSession session, IoBuffer buf) throws Exception {
    3         /** 从session中获取channel*/
    4         ByteChannel channel = session.getChannel();
    5         /** 调用Java NIO的channel.read(ByteBuffer buffer)方法读取数据 */
    6         return channel.read(buf.buf());
    7     }

    读取数据的逻辑也不复杂,同样是直接调用了Java NIO的channel读取Buffer数据的方式进行读取,具体的实现逻辑可以参考Java NIO的channel读取缓冲区数据的逻辑

    3、过滤器链对IO数据进行过滤处理

     1 public void fireMessageReceived(Object message) {
     2         if (message instanceof IoBuffer) {
     3             session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis());
     4         }
     5 
     6         callNextMessageReceived(head, session, message);
     7     }
     8 
     9     public final void increaseReadBytes(long increment, long currentTime) {
    10         if (increment <= 0) {
    11             return;
    12         }
    13         //统计已读字节数
    14         readBytes += increment;
    15         //设置上一次读操作时间
    16         lastReadTime = currentTime;
    17         //设置空闲为0
    18         idleCountForBoth.set(0);
    19         idleCountForRead.set(0);
    20 
    21         if (getService() instanceof AbstractIoService) {
    22             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
    23         }
    24     }
    25 
    26     /** 通知下一个节点处理接收消息事件 */
    27     private void callNextMessageReceived(Entry entry, IoSession session, Object message) {
    28         try {
    29             IoFilter filter = entry.getFilter();
    30             NextFilter nextFilter = entry.getNextFilter();
    31             /** 依次执行每一个IoFilter的messageReceived方法 */
    32             filter.messageReceived(nextFilter, session, message);
    33         } catch (Exception e) {
    34             fireExceptionCaught(e);
    35         } catch (Error e) {
    36             fireExceptionCaught(e);
    37             throw e;
    38         }
    39     }

    过滤器链路处理消息时,会从过滤器链路的头节点开始依次执行messageReceived方法,直到执行到最后一个过滤器,过滤器链路头节点和尾节点是固定不变的,头节点实现为HeadFilter,尾节点实现为TailFilter。

    当处理读事件时,会从HeadFilter开始处理,然后按自定义的过滤器依次执行,最后执行TailFilter的处理;而处理写事件时,顺序完全相反,会从TailFilter开始到HeadFilter结束。所以读数据时最后会执行TailFilter的messageReceived方法,源码如下:

     1  @Override
     2     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
     3         AbstractIoSession s = (AbstractIoSession) session;
     4 
     5         if (!(message instanceof IoBuffer)) {
     6             s.increaseReadMessages(System.currentTimeMillis());
     7         } else if (!((IoBuffer) message).hasRemaining()) {
     8             s.increaseReadMessages(System.currentTimeMillis());
     9         }
    10 
    11         // Update the statistics
    12         if (session.getService() instanceof AbstractIoService) {
    13             ((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis());
    14         }
    15 
    16         try {
    17             /** 尾部过滤器将消息交给业务处理器IoHandler处理 */
    18             session.getHandler().messageReceived(s, message);
    19         } finally {
    20             if (s.getConfig().isUseReadOperation()) {
    21                 s.offerReadFuture(message);
    22             }
    23         }
    24     }

    可以看出TailFilter最重要的一个步骤是需要将发送的IO数据交给业务层处理,通过从IoSession中获取IoHandler对象,并调用IoHandler的messageReceived方法交给业务层处理IO数据

    4、缓冲区扩容或降容处理

     1 /** 扩容读缓冲区大小为原先的两倍 */
     2     public final void increaseReadBufferSize() {
     3         /** 扩大两倍*/
     4         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
     5         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
     6             getConfig().setReadBufferSize(newReadBufferSize);
     7         } else {
     8             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
     9         }
    10 
    11         deferDecreaseReadBuffer = true;
    12     }
    13 
    14     /**
    15      * 降容读缓冲区大小为原先的一半
    16      */
    17     public final void decreaseReadBufferSize() {
    18         if (deferDecreaseReadBuffer) {
    19             deferDecreaseReadBuffer = false;
    20             return;
    21         }
    22 
    23         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
    24             /** 缩小一半*/
    25             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
    26         }
    27 
    28         deferDecreaseReadBuffer = true;
    29     }

    配置的读缓冲区大小和实际的IO数据大小可能会存在偏差,所以需要动态的调整读缓冲区的大小,避免缓冲区内存不足或内存浪费,每次扩容都会扩容到原大小的两倍,降容也会缩小到原先的一半

    5、连接关闭或连接异常事件处理

     1 public void fireInputClosed() {
     2         /** 获取过滤器链头节点*/
     3         Entry head = this.head;
     4         callNextInputClosed(head, session);
     5     }
     6 
     7     /** 通知下一个节点处理IO输入关闭事件*/
     8     private void callNextInputClosed(Entry entry, IoSession session) {
     9         try {
    10             IoFilter filter = entry.getFilter();
    11             /** 获取下一个过滤器*/
    12             NextFilter nextFilter = entry.getNextFilter();
    13             /** 处理IO输入关闭事件*/
    14             filter.inputClosed(nextFilter, session);
    15         } catch (Throwable e) {
    16             fireExceptionCaught(e);
    17         }
    18     }
    19 
    20     public void fireExceptionCaught(Throwable cause) {
    21         callNextExceptionCaught(head, session, cause);
    22     }
    23 
    24     private void callNextExceptionCaught(Entry entry, IoSession session, Throwable cause) {
    25         /** 唤醒关联的Future*/
    26         ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE);
    27         if (future == null) {
    28             try {
    29                 IoFilter filter = entry.getFilter();
    30                 NextFilter nextFilter = entry.getNextFilter();
    31                 /** 执行过滤的异常处理事件*/
    32                 filter.exceptionCaught(nextFilter, session, cause);
    33             } catch (Throwable e) {
    34                 LOGGER.warn("Unexpected exception from exceptionCaught handler.", e);
    35             }
    36         } else {
    37             /** 关闭session*/
    38             if (!session.isClosing()) {
    39                 // Call the closeNow method only if needed
    40                 session.closeNow();
    41             }
    42 
    43             future.setException(cause);
    44         }
    45     }

     处理逻辑基本上一样,都是先获取过滤器链的头节点,开始处理对应的事件,然后再依次调用下一个节点的过滤器的方法。另外最后一个过滤器TailFilter会将对应的事件会最终交给业务层处理,调用IoHandler的对应方法,让业务层处理事件。

    总结MINA服务端的工作机制:

    1、创建IoService,每个IoService内部有一个线程池用于处理客户端的连接请求,并且有一个IoProcessor池,默认数量为CPU个数+1,用于处理客户端的IO操作

    2、IoService内部有一个Selector用于监听客户端的连接请求,连接成功之后会创建IoSession并交给IoProcessor池处理

    3、IoProcessor池根据IoSession的ID进行取模算法选取一个IoProcessor和IoSession进行绑定,一个IoProcessor可以绑定多个IoSession,一个IoSession只可以绑定一个IoProcessor

    4、IoProcessor内部也有一个Selector用来监控所有关联的IoSession的IO操作状态,并且有一个线程池用来处理IO操作

    5、IoProcessor将处理的IO事件交给IoService绑定的过滤器链,过滤器链上的所有过滤器依次处理IO事件

    7、过滤器链的最后一个过滤器为TailFilter,处理完IO事件之后将IO事件交给业务层处理器IoHandler

    8、IoHandler处理业务数据,而写数据的话顺序完全相反,才业务层到过滤器链路,通过HeadFilter再由IoProcessor发送出去

  • 相关阅读:
    POJ 3468_A Simple Problem with Integers(树状数组)
    POJ 3468_A Simple Problem with Integers(线段树)
    ZOJ 2770_Burn the Linked Camp
    ZOJ 2770_Burn the Linked Camp
    POJ 3169_Layout
    POJ 3169_Layout
    POJ 3684_Physics Experiment
    POJ 3255_Roadblocks
    POJ 3723 Conscription【最小生成树】
    POJ 3279 Fliptile【枚举】
  • 原文地址:https://www.cnblogs.com/jackion5/p/13569501.html
Copyright © 2011-2022 走看看