zoukankan      html  css  js  c++  java
  • Apache Mina(二)

    在mina的源码,整个框架最核心的几个包是 :

    • org.apache.mina.core.service :IoService、IoProcessor、IoHandler、IoAcceptor、IoConnector
    • org.apache.mina.core.session
    • org.apache.mina.core.polling
    • org.apache.mina.transport.socket

    IoService Base interface for all IoAcceptors and IoConnectors that provide I/O service and manage IoSessions.
          它是所有 IoAcceptor 和 IoConnector 的基接口,对于一个 IoService,有哪些信息需要我们关注呢?

    1. 底层的元数据信息 TransportMetadata,比如底层的网络服务提供者(NIO,ARP,RXTX等)。(除此方法之外,其它方法在 AbstractIoService 得以实现。)
    2. 通过这个服务创建一个新会话时,新会话的默认配置 IoSessionConfig。
    3. 此服务所管理的所有会话。
    4. 与这个服务相关所产生的事件所对应的监听者(IoServiceListener)。
    5. 处理这个服务所管理的所有连接的处理器(IoHandler)。
    6. 每个会话都有一个过滤器链(IoFilterChain),每个过滤器链通过其对应的 IoFilterChainBuilder来负责构建。
    7. 由于此服务管理了一系列会话,因此可以通过广播的方式向所有会话发送消息,返回结果是一个WriteFuture集,后者是一种表示未来预期结果的数据结构。
    8. 服务创建的会话(IoSession)相关的数据通过 IoSessionDataStructureFactory来提供。
    9. 发送消息时有一个写缓冲队列。
    10. 服务的闲置状态有三种:读端空闲,写端空闲,双端空闲。
    11. 还提供服务的一些统计信息,比如时间,数据量等。

    IoProcessor<S extends IoSession> :An internal interface to represent an 'I/O processor' that performs actual I/O operations for IoSessions. It abstracts existing reactor frameworks such as Java NIO once again to simplify transport implementations.
    一个内部接口代表一个I/O处理器,它为 IoSession 执行实际的I/O操作。它抽象现有反应器框架,如 Java NIO 再一次简化传输实现。

    1、IoAcceptor :服务器端接口,接受客户端访问的请求。

    2、AbstractIoService :设定处理函数、Filter、IoServiceListener,实现了 IoService 所有方法,除了 getTransportMetadata() 。

    3、IoServiceListener :可以用于打印日志,主要有Service启动、停止、空闲等监听方法。

    4、处理函数是一个Executor或者是Executor的包装。

    5、AbstractIoAcceptor完成绑定监听端口。

    6、AbstractPollingIoAcceptor执行具体的监听连接以及监听I/O事件,但真正实现的是NioSocketAcceptor open() 方法。

    7、NioSocketAcceptor用JAVA NIO的方式实现了具体的连接方法ServerSocketChannel,例如open,accept等

       

     

    以下代码的流程:

        IoAcceptor acceptor = new NioSocketAcceptor();
        acceptor.setHandler(new IoHandlerAdapter());
        acceptor.bind(new InetSocketAddress(6969));

    1)NioSocketAcceptor 构造函数:

        public NioSocketAcceptor() {
            super(new DefaultSocketSessionConfig(), NioProcessor.class);
            ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
        }

    定义了SessionConfig,并把this传递到SessionConfig当中,指定NioProcessor。

    AbstractPollingIoAcceptor 构造函数:

        protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
            this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true);
        }
    复制代码
        private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
                boolean createdProcessor) {
            super(sessionConfig, executor);
    
            if (processor == null) {
                throw new IllegalArgumentException("processor");
            }
    
            this.processor = processor;
            this.createdProcessor = createdProcessor;
    
            try {
                // Initialize the selector
                init();
    
                // The selector is now ready, we can switch the
                // flag to true so that incoming connection can be accepted
                selectable = true;
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new RuntimeIoException("Failed to initialize.", e);
            } finally {
                if (!selectable) {
                    try {
                        destroy();
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    }
                }
            }
        }
    复制代码
        public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor) {
            this(processorType, executor, DEFAULT_SIZE);  // int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
        }
    复制代码
        public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size) {
            if (processorType == null) {
                throw new IllegalArgumentException("processorType");
            }
    
            if (size <= 0) {
                throw new IllegalArgumentException("size: " + size + " (expected: positive integer)");
            }
    
            // Create the executor if none is provided
            createdExecutor = (executor == null);
    
            if (createdExecutor) {
                this.executor = Executors.newCachedThreadPool();
                // Set a default reject handler
                ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            } else {
                this.executor = executor;
            }
    
            pool = new IoProcessor[size];
    
            boolean success = false;
            Constructor<? extends IoProcessor<S>> processorConstructor = null;
            boolean usesExecutorArg = true;
    
            try {
                // We create at least one processor
                try {
                    try {
                        processorConstructor = processorType.getConstructor(ExecutorService.class);
                        pool[0] = processorConstructor.newInstance(this.executor);
                    } catch (NoSuchMethodException e1) {
                        // To the next step...
                        try {
                            processorConstructor = processorType.getConstructor(Executor.class);
                            pool[0] = processorConstructor.newInstance(this.executor);
                        } catch (NoSuchMethodException e2) {
                            // To the next step...
                            try {
                                processorConstructor = processorType.getConstructor();
                                usesExecutorArg = false;
                                pool[0] = processorConstructor.newInstance();
                            } catch (NoSuchMethodException e3) {
                                // To the next step...
                            }
                        }
                    }
                } catch (RuntimeException re) {
                    LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
                    throw re;
                } catch (Exception e) {
                    String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
                    LOGGER.error(msg, e);
                    throw new RuntimeIoException(msg, e);
                }
    
                if (processorConstructor == null) {
                    // Raise an exception if no proper constructor is found.
                    String msg = String.valueOf(processorType) + " must have a public constructor with one "
                            + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one "
                            + Executor.class.getSimpleName() + " parameter or a public default constructor.";
                    LOGGER.error(msg);
                    throw new IllegalArgumentException(msg);
                }
    
                // Constructor found now use it for all subsequent instantiations
                for (int i = 1; i < pool.length; i++) {
                    try {
                        if (usesExecutorArg) {
                            pool[i] = processorConstructor.newInstance(this.executor);
                        } else {
                            pool[i] = processorConstructor.newInstance();
                        }
                    } catch (Exception e) {
                        // Won't happen because it has been done previously
                    }
                }
    
                success = true;
            } finally {
                if (!success) {
                    dispose();
                }
            }
        }
    复制代码
    复制代码
        public NioProcessor(Executor executor) {
            super(executor);
    
            try {
                // Open a new selector
                selector = Selector.open();
            } catch (IOException e) {
                throw new RuntimeIoException("Failed to open a selector.", e);
            }
        }
    复制代码

    new SimpleIoProcessorPool<S>(processorClass) :是把NioProcessor包装成了pool.看类图IoProcessor就很好理解了,这是一个组成模式。

    init()的调用实际上是NioSocketAcceptor的,init() 源码:selector = Selector.open(); 值得注意的是,服务端在创建NioSocketAcceptor实现时,会生成一个线程池(AbstractIoService.executor),此线程池用来执行一个接受请求的任务,这个任务是AbstractPollingIoAcceptor的Acceptor,Acceptor会开一个Selector,用来监听NIO中的ACCEPT事件。任务初始化时并没有执行,而在调用NioSocketAcceptor实例的bind方法时,则会启动对指定端口的ACCEPT事件的监听。 

    SimpleIoProcessorPool是在NioSocketAcceptor实例化时创建的,其上有N+1(N=CPU的个数)个NIOProcessor(IoProcessor<S>[] pool)来处理实际IO的读写事件,每个pool都是从NioSocketAcceptor构造函数传过去的NioProcessor实例,并在NioProcessor构造函数传入一个线程池。

    每个NIOProcessor都会对应一个Selector和 NioSocketAcceptor.init() 中的Selector 一起构成了Mina独有的双Selector模型,这种设计的优点是不会导致阻塞),来监听Socket中的读写事件。实际对读写的操作也是在一个SimpleIoProcessorPool实例化好的一个线程池中以任务的形式执行,这个任务叫Processor(可以在AbstractPollingIoProcessor类中找到其实现)

    AbstractIoAcceptor 构造函数:

        protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
            super(sessionConfig, executor);
            defaultLocalAddresses.add(null);
        }

    AbstractIoAcceptor主要用来绑定监听端口。这个构造函数没有干其他的事情。

    复制代码
        protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
            if (sessionConfig == null) {
                throw new IllegalArgumentException("sessionConfig");
            }
    
            if (getTransportMetadata() == null) {
                throw new IllegalArgumentException("TransportMetadata");
            }
    
            if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
                throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
                        + getTransportMetadata().getSessionConfigType() + ")");
            }
    
            // Create the listeners, and add a first listener : a activation listener
            // for this service, which will give information on the service state.
            listeners = new IoServiceListenerSupport(this);
            listeners.add(serviceActivationListener);
    
            // Stores the given session configuration
            this.sessionConfig = sessionConfig;
    
            // Make JVM load the exception monitor before some transports
            // change the thread context class loader.
            ExceptionMonitor.getInstance();
    
            if (executor == null) {
                this.executor = Executors.newCachedThreadPool();
                createdExecutor = true;
            } else {
                this.executor = executor;
                createdExecutor = false;
            }
    
            threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
        }
    复制代码

    这个构造函数还有一个监听器,或者叫监听池(可以包含多个监听器)。用来监听service创建、连接、断开等动作,当上述动作发生地时候,会调用listener。里面可以写自己的一些方法。

    一次请求的过程如下 :

    Client通过Socket连接服务器,先是由Acceptor接收到请求连接的事件(即ACCEPT事件)。此事件由Acceptor进行处理,会创建一条Socket连接,并将此连接和一个NIOProcessor关联,这个过程通过图中的连接分配器进行,连接分配器会均衡的将Socket和不同的NIOProcessor绑定(轮流分配),绑定完成后,会在NIOProcessor上进行读写事件的监听,而读写的实际处理则分配给Processor任务完成。当有读写事件发生时,就会通知到对应的Processor进行数据处理。

  • 相关阅读:
    [ jquery 选择器 :hidden ] 此方法选取匹配所有不可见元素,或者type为hidden的元素
    剑指 Offer 03. 数组中重复的数字 哈希
    LeetCode 1736. 替换隐藏数字得到的最晚时间 贪心
    Leetcode 1552. 两球之间的磁力 二分
    Leetcode 88. 合并两个有序数组 双指针
    LeetCode 1744. 你能在你最喜欢的那天吃到你最喜欢的糖果吗?
    LeetCode 1743. 相邻元素对还原数组 哈希
    LeetCode 1745. 回文串分割 IV dp
    剑指 Offer 47. 礼物的最大价值 dp
    剑指 Offer 33. 二叉搜索树的后序遍历序列 树的遍历
  • 原文地址:https://www.cnblogs.com/-blog/p/5209808.html
Copyright © 2011-2022 走看看