zoukankan      html  css  js  c++  java
  • 转:Mina2.0框架源码剖析(六)

    上文的内容还有一些没有结尾,这篇补上。在ExpiringMap类中,使用了一个私有内部类ExpiringObject来表示待检查超时的对象,它包括三个域,键,值,上次访问时间,以及用于上次访问时间这个域的读写锁:

            private K key;
            private V value;
            private long lastAccessTime;
            private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();

    而ExpiringMap中包括了下述几个变量:

      private final ConcurrentHashMap<K, ExpiringObject> delegate;//超时代理集合,保存待检查对象
        private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners;//超时监听者
        private final Expirer expirer;//超时检查线程

    现在再来看看IoSession的一个抽象实现类AbstractIoSession。这是它的几个重要的成员变量:

        private IoSessionAttributeMap attributes;//会话属性映射图
        private WriteRequestQueue writeRequestQueue;//写请求队列
        private WriteRequest currentWriteRequest;//当前写请求

         当要结束当前会话时,会发送一个一个写请求CLOSE_REQUEST。而closeFuture这个CloseFuture会在连接关闭时状态被设置为”closed”,它的监听器是SCHEDULED_COUNTER_RESETTER。

    close和closeOnFlush都是异步的关闭操作,区别是前者立即关闭连接,而后者是在写请求队列中放入一个CLOSE_REQUEST,并将其即时刷新出去,若要真正等待关闭完成,需要调用方在返回的CloseFuture等待

    public final CloseFuture close() {
            synchronized (lock) {
                if (isClosing()) {
                    return closeFuture;
                } else {
                    closing = true;
                }
            }
            getFilterChain().fireFilterClose();//fire出关闭事件
            return closeFuture;
        }

        public final CloseFuture closeOnFlush() {
            getWriteRequestQueue().offer(this, CLOSE_REQUEST);
            getProcessor().flush(this);
            return closeFuture;
        }

         下面来看看读数据的过程:

    public final CloseFuture close() {
            synchronized (lock) {
                if (isClosing()) {
                    return closeFuture;
                } else {
                    closing = true;
                }
            }
            getFilterChain().fireFilterClose();//fire出关闭事件
            return closeFuture;
        }

        public final CloseFuture closeOnFlush() {
            getWriteRequestQueue().offer(this, CLOSE_REQUEST);
            getProcessor().flush(this);
            return closeFuture;
        }


        private Queue<ReadFuture> getReadyReadFutures() {//返回可被读数据队列
            Queue<ReadFuture> readyReadFutures =
                (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);//从会话映射表中取出可被读数据队列
            if (readyReadFutures == null) {//第一次读数据
                readyReadFutures = new CircularQueue<ReadFuture>();//构造一个新读数据队列
                Queue<ReadFuture> oldReadyReadFutures =
                    (Queue<ReadFuture>) setAttributeIfAbsent(
                            READY_READ_FUTURES_KEY, readyReadFutures);
                if (oldReadyReadFutures != null) {
                    readyReadFutures = oldReadyReadFutures;
                }
            }
            return readyReadFutures;
        }

        public final ReadFuture read() {//读数据
            if (!getConfig().isUseReadOperation()) {//会话配置不允许读数据(这是默认情况)
                throw new IllegalStateException("useReadOperation is not enabled.");
            }
            Queue<ReadFuture> readyReadFutures = getReadyReadFutures();//获取已经可被读数据队列
            ReadFuture future;
            synchronized (readyReadFutures) {//锁住读数据队列
                future = readyReadFutures.poll();//取队头数据
                if (future != null) {
                    if (future.isClosed()) {//关联的会话已经关闭了,让读者知道此情况
                        readyReadFutures.offer(future);
                    }
                } else {
                    future = new DefaultReadFuture(this);
                    getWaitingReadFutures().offer(future); //将此数据插入等待被读取数据的队列,这个代码和上面的getReadyReadFutures类似,只是键值不同而已

                }
            }
            return future;
        }

         再来看写数据到指定远端地址的过程,可以写三种类型数据:IoBuffer,整个文件或文件的部分区域,这会通过传递写请求给过滤器链条来完成数据向目的远端的传输。

        public final WriteFuture write(Object message, SocketAddress remoteAddress) {
            FileChannel openedFileChannel = null;
            try 
            {
                if (message instanceof IoBuffer&& !((IoBuffer) message).hasRemaining()) 
                {// 空消息
                    throw new IllegalArgumentException(
                    "message is empty. Forgot to call flip()?");
                } 
                else if (message instanceof FileChannel) 
                {//要发送的是文件的某一区域
                    FileChannel fileChannel = (FileChannel) message;
                    message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
                }
                else if (message instanceof File) 
                {//要发送的是文件,打开文件通道
                    File file = (File) message;
                    openedFileChannel = new FileInputStream(file).getChannel();
                    message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
                }
            } 
            catch (IOException e) 
            {
                ExceptionMonitor.getInstance().exceptionCaught(e);
                return DefaultWriteFuture.newNotWrittenFuture(this, e);
            }
            WriteFuture future = new DefaultWriteFuture(this); 
            getFilterChain().fireFilterWrite(
                    new DefaultWriteRequest(message, future, remoteAddress)); //构造写请求,通过过滤器链发送出去,写请求中指明了要发送的消息,目的地址,以及返回的结果
     
    //如果打开了一个文件通道(发送的文件的部分区域或全部),就必须在写请求完成时关闭文件通道
            if (openedFileChannel != null) {
                final FileChannel finalChannel = openedFileChannel;
                future.addListener(new IoFutureListener<WriteFuture>() {
                    public void operationComplete(WriteFuture future) {
                        try {
                            finalChannel.close();//关闭文件通道
                        } catch (IOException e) {
                            ExceptionMonitor.getInstance().exceptionCaught(e);
                        }
                    }
                });
            }
            return future;//写请求成功完成
        }

         最后,来看看一个WriteRequestQueue的实现,唯一加入的一个功能就是若在队头发现是请求关闭,则会去关闭会话。

     private class CloseRequestAwareWriteRequestQueue implements WriteRequestQueue {
            private final WriteRequestQueue q;//内部实际的写请求队列
            public CloseRequestAwareWriteRequestQueue(WriteRequestQueue q) {
                this.q = q;
            }
            public synchronized WriteRequest poll(IoSession session) {
                WriteRequest answer = q.poll(session);
                if (answer == CLOSE_REQUEST) {
                    AbstractIoSession.this.close();
                    dispose(session);
                    answer = null;
                }
                return answer;
            }
            public void offer(IoSession session, WriteRequest e) {
                q.offer(session, e);
            }
            public boolean isEmpty(IoSession session) {
                return q.isEmpty(session);
            }
            public void clear(IoSession session) {
                q.clear(session);
            }
            public void dispose(IoSession session) {
                q.dispose(session);
            }
        }

    作者:phinecos(洞庭散人)
    出处:http://phinecos.cnblogs.com/
    本文版权归作者和博客园共有,欢迎转载,但请保留此段声明,并在文章页面明显位置给出原文连接。

    作者:洞庭散人

    出处:http://phinecos.cnblogs.com/    

    本博客遵从Creative Commons Attribution 3.0 License,若用于非商业目的,您可以自由转载,但请保留原作者信息和文章链接URL。

  • 相关阅读:
    leetcode 18 4Sum
    leetcode 71 Simplify Path
    leetcode 10 Regular Expression Matching
    leetcode 30 Substring with Concatenation of All Words
    leetcode 355 Design Twitte
    leetcode LRU Cache
    leetcode 3Sum
    leetcode Letter Combinations of a Phone Number
    leetcode Remove Nth Node From End of List
    leetcode Valid Parentheses
  • 原文地址:https://www.cnblogs.com/phoebus0501/p/1878898.html
Copyright © 2011-2022 走看看