zoukankan      html  css  js  c++  java
  • 转:Mina2.0框架源码剖析(七)

    前面介绍完了org.apache.mina.core.session这个包,现在开始进入org.apache.mina.core. polling包。这个包里包含了实现基于轮询策略(比如NIO的select调用或其他类型的I/O轮询系统调用(如epoll,poll,kqueue等)的基类。

    先来看AbstractPollingIoAcceptor这个抽象基类,它继承自AbstractIoAcceptor,两个泛型参数分别是所处理的会话和服务器端socket连接。底层的sockets会被不断检测,并当有任何一个socket需要被处理时就会被唤醒去处理。这个类封装了服务器端socket的bind,accept和dispose等动作,其成员变量Executor负责接受来自客户端的连接请求,另一个AbstractPollingIoProcessor用于处理客户端的I/O操作请求,如读写和关闭连接。

    其最重要的几个成员变量是:

      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();//注册队列
        private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();//取消注册队列
        private final Map<SocketAddress, H> boundHandles = Collections
                .synchronizedMap(new HashMap<SocketAddress, H>());//本地地址到服务器socket的映射表

    先来看看当服务端调用bind后的处理过程:

    protected final Set<SocketAddress> bind0(
                List<? extends SocketAddress> localAddresses) throws Exception {
            AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);//注册请求
            registerQueue.add(request);//加入注册队列中,等待worker处理
            //创建一个Worker实例,开始工作
            startupWorker();
            wakeup();
            request.awaitUninterruptibly();
            // 更新本地绑定地址
            Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
            for (H handle : boundHandles.values()) {
                newLocalAddresses.add(localAddress(handle));
            }
            return newLocalAddresses;
        }

         真正的负责接收客户端请求的工作都是Worker线程完成的,

    private class Worker implements Runnable {
            public void run() {
                int nHandles = 0;
                while (selectable) {
                    try {
                        // Detect if we have some keys ready to be processed
                        boolean selected = select();//检测是否有SelectionKey已经可以被处理了
                        nHandles += registerHandles();//注册服务器sockets句柄,这样做的目的是将Selector的状态置于OP_ACCEPT,并绑定到所监听的端口上,表明接受了可以接收的来自客户端的连接请求,
                        if (selected) {
                            processHandles(selectedHandles());//处理可以被处理的SelectionKey状态为OP_ACCEPT的服务器socket句柄集(即真正处理来自客户端的连接请求)
                        }
                        nHandles -= unregisterHandles();//检查是否有取消连接的客户端请求
                        if (nHandles == 0) {
                            synchronized (lock) {
                                if (registerQueue.isEmpty()
                                        && cancelQueue.isEmpty()) {//完成工作
                                    worker = null;
                                    break;
                                }
                            }
                        }
                    } catch (Throwable e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                        try {
                            Thread.sleep(1000);//线程休眠一秒
                        } catch (InterruptedException e1) {
                            ExceptionMonitor.getInstance().exceptionCaught(e1);
                        }
                    }
                }
                if (selectable && isDisposing()) {//释放资源
                    selectable = false;
                    try {
                        if (createdProcessor) {
                            processor.dispose();
                        }
                    } finally {
                        try {
                            synchronized (disposalLock) {
                                if (isDisposing()) {
                                    destroy();
                                }
                            }
                        } catch (Exception e) {
                            ExceptionMonitor.getInstance().exceptionCaught(e);
                        } finally {
                            disposalFuture.setDone();
                        }
                    }
                }
            }
    private int registerHandles() {//注册服务器sockets句柄
            for (;;) {
                AcceptorOperationFuture future = registerQueue.poll();
                Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
                List<SocketAddress> localAddresses = future.getLocalAddresses();
                try {
                    for (SocketAddress a : localAddresses) {
                        H handle = open(a);//打开指定地址,返回服务器socket句柄
                        newHandles.put(localAddress(handle), handle);//加入地址—服务器socket映射表中
                    }
                    boundHandles.putAll(newHandles);//更新本地绑定地址集
                    // and notify.
                    future.setDone();//完成注册过程
                    return newHandles.size();
                } catch (Exception e) {
                    future.setException(e);
                } finally {
                    // Roll back if failed to bind all addresses.
                    if (future.getException() != null) {
                        for (H handle : newHandles.values()) {
                            try {
                                close(handle);//关闭服务器socket句柄
                            } catch (Exception e) {
                                ExceptionMonitor.getInstance().exceptionCaught(e);
                            }
                        }
                        wakeup();
                    }
                }
            }
        }
            private void processHandles(Iterator<H> handles) throws Exception {//处理来自客户端的连接请求
                while (handles.hasNext()) {
                    H handle = handles.next();
                    handles.remove();
                    T session = accept(processor, handle);//为一个服务器socket句柄handle真正接收来自客户端的请求,在给定的所关联的processor上返回会话session
                    if (session == null) {
                        break;
                    }
                    finishSessionInitialization(session, null, null);//结束会话初始化
                    // add the session to the SocketIoProcessor
                    session.getProcessor().add(session);
                }
            }
        }

         这个类中有个地方值得注意,就是wakeup方法,它是用来中断select方法的,当注册队列或取消注册队列发生变化时需要调用它,可以参看本类的一个子类NioSocketAcceptor的实现:

        protected boolean select() throws Exception {
            return selector.select() > 0;
        }
        protected void wakeup() {
            selector.wakeup();
        }

         我们可以查阅jdk文档,它对Selector的select方法有如下解释:选择一组键,其相应的通道已为 I/O 操作准备就绪。 此方法执行处于阻塞模式的选择操作。仅在至少选择一个通道、调用此选择器的 wakeup 方法、当前的线程已中断,或者给定的超时期满(以先到者为准)后此方法才返回。

    参考资料

    1,《Java NIO非阻塞服务器示例

     

    作者:phinecos(洞庭散人)
    出处:http://phinecos.cnblogs.com/
    本文版权归作者和博客园共有,欢迎转载,但请保留此段声明,并在文章页面明显位置给出原文连接。

    作者:洞庭散人

    出处:http://phinecos.cnblogs.com/    

    本博客遵从Creative Commons Attribution 3.0 License,若用于非商业目的,您可以自由转载,但请保留原作者信息和文章链接URL。

  • 相关阅读:
    柯西恒等式 FPGA中信号的跨时钟域处理模板(二)
    OSPF
    Windows多网卡路由设置
    使用线程时需要注意的地方
    dicom 影像通信(scu、scp)的c-echo、c-store、c-find、c-move
    关于python3没有numpy和matplotlib库怎么办
    使用centos6.5时的几个小问题
    关于用Apache Beam跑WordCount
    MarkdownPad2的安装、破解和汉化
    安装Photoshop CS64
  • 原文地址:https://www.cnblogs.com/phoebus0501/p/1878900.html
Copyright © 2011-2022 走看看