zoukankan      html  css  js  c++  java
  • mina2中的session

    简介

    session类图

    Mina每建立一个连接同时会创建一个session对象,用于保存这次读写需要用到的所有信息。从抽象类AbstractIoSession中可以看出session具有如下功能:
    1、从attributes成员可以看出session可以存放用户关心的键值对
    2、注意到WriteRequestQueue,这是一个写请求队列,processor中调用flush或者flushNow方法时会将用户写入的数据包装成一个writeRequest对象,并加入这个队列中。
    3、提供了大量的统计功能,比如接收到了多少消息、最后读取时间等
    在代码中一般是这样使用session的
    // 创建服务器监听  
    IoAcceptor acceptor = new NioSocketAcceptor();  
    // 设置buffer的长度  
    acceptor.getSessionConfig().setReadBufferSize(2048);  
    // 设置连接超时时间  
    acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);  

    session做为一个连接的具体对象,缓存当前连接用户的一些信息。

    创建好acceptor或者connector之后,通过IoSessionConfig对session对行配置。
    用得最多的是通过session写入数据,这是调用了IoSession的write方法
    WriteFuture write(Object message);  
    WriteFuture write(Object message, SocketAddress destination);  

    下面着重分析创建过程以及session的状态

    创建与初始化

    每建立一个连接,就会创建一个session,IoAcceptor的accept方法的返回值正是一个session。见NioSocketAcceptor.accept(IoProcessor<NioSession> processor, ServerSocketChannel handle)方法:
     protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
    
            SelectionKey key = handle.keyFor(selector);
    
            if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
                return null;
            }
    
            // accept the connection from the client
            SocketChannel ch = handle.accept();
    
            if (ch == null) {
                return null;
            }
    
            return new NioSocketSession(this, processor, ch);
        }
    由以上代码可知,session包含了对众多对象的引用,比如processor,socketChannel,SelectionKey,IoFilter等。
    session在创建好后,紧接着就会对其进行初始化。AbstractIoService.initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer)方法如下:
        protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
            // Update lastIoTime if needed.
            if (stats.getLastReadTime() == 0) {
                stats.setLastReadTime(getActivationTime());
            }
    
            if (stats.getLastWriteTime() == 0) {
                stats.setLastWriteTime(getActivationTime());
            }
    
            // Every property but attributeMap should be set now.
            // Now initialize the attributeMap.  The reason why we initialize
            // the attributeMap at last is to make sure all session properties
            // such as remoteAddress are provided to IoSessionDataStructureFactory.
            try {
                ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
                        .getAttributeMap(session));
            } catch (IoSessionInitializationException e) {
                throw e;
            } catch (Exception e) {
                throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
            }
    
            try {
                ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
                        .getWriteRequestQueue(session));
            } catch (IoSessionInitializationException e) {
                throw e;
            } catch (Exception e) {
                throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
            }
    
            if ((future != null) && (future instanceof ConnectFuture)) {
                // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
                session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
            }
    
            if (sessionInitializer != null) {
                sessionInitializer.initializeSession(session, future);
            }
    
            finishSessionInitialization0(session, future);
        }
    设置上次读写时间,初始化属性map和写请求队列等。
    session被初始化好之后会加入到processor中,processor中有一个队列专门存放session:
    例如:AbstractPollingIoProcessor.java中有:
    private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
    
        /** A queue used to store the sessions to be removed */
        private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
    
        /** A queue used to store the sessions to be flushed */
        private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
    
        /**
         * A queue used to store the sessions which have a trafficControl to be
         * updated
         */
        private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
    
        /** The processor thread : it handles the incoming messages */
        private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
    加入队列之后,processor就会从队列中取出session,以下是processor的run方法关键代码:
    1. private class Processor implements Runnable {  
           public void run() {  
               for (;;) {  
                   long t0 = System.currentTimeMillis();  
                   int selected = select(SELECT_TIMEOUT);  
                   long t1 = System.currentTimeMillis();  
                   long delta = (t1 - t0);  
        
                   if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {  
                       if (isBrokenConnection()) {  
                           wakeupCalled.getAndSet(false);  
                           continue;  
                       } else {  
                           registerNewSelector();  
                       }  
                       wakeupCalled.getAndSet(false);  
                       continue;  
                   }  
        
                   nSessions += handleNewSessions();  
        
                   updateTrafficMask();  
        
                   if (selected > 0) {  
                       process();  
                   }  
                 
                   nSessions -= removeSessions();  
                                
               }  
           }  
       }  
    1、不断地调用select方法来检查是否有session准备就绪,如果没有或者间隔时间小于100ms则检查selector是否可用,如果不可用重新建一个selector(这里linux下的epoll的问题可能导致selector不可用。)
    2、从newSessions队列中取出这些session,并将其负责的通道注册到selector上
    3、处理准备就绪的session
    AbstractPollingIoProcessor.java
    private void process(S session) {  
        // Process Reads  
        if (isReadable(session) && !session.isReadSuspended()) {  
            read(session);  
        }  
      
        // Process writes  
        if (isWritable(session) && !session.isWriteSuspended()) {  
            // add the session to the queue, if it's not already there  
            if (session.setScheduledForFlush(true)) {  
                flushingSessions.add(session);  
            }  
        }  
    }
    总结一下创建与初始化过程:连接到来创建一个session,初始化好之后加入到processor负责的一个队列中。processor线程会把队列中的session对应的通道都注册到它自己的selector上,然后这个selector轮询这些通道是否准备就绪,一旦准备就绪就调用对应方法进行处理(read or flushNow)。

    状态

    Mina中的session具有状态,且状态之间是可以相互转化的
    Connected:session被创建时处于这种状态
    Idle:没有请求可以处理(可配置)
    Closing:正处于关闭状态(可能正在做一些清理工作)
    Closed:关闭状态
    下图是这几种状态之间的转化图:
    IoFilter与IoHandler就是在这些状态上面加以干预,下面重点看一下IDLE状态,它分三种:
    Idle for read:在规定时间内没有数据可读
    Idle for write:在规定时间内没有数据可写
    Idle for both:在规定时间内没有数据可读和可写
    这三种状态分别对应IdleStatus类的三个常量:READER_IDLE、WRITER_IDLE、BOTH_IDLE
    前面session的用法中有如下设置:
    acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 
    acceptor的run方法中调用了notifyIdleSessions
    private void notifyIdleSessions( long currentTime )  
        {  
            // process idle sessions  
            if ( currentTime - lastIdleCheckTime >= 1000 )  
            {  
                lastIdleCheckTime = currentTime;  
                AbstractIoSession.notifyIdleness( getListeners().getManagedSessions().values().iterator(), currentTime );  
            }  
     } 
    每隔一秒一检查是否到达了设置的空闲时间
    1. private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,  
              long lastIoTime) {  
          if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {  
              session.getFilterChain().fireSessionIdle(status);  
          }  
      }  
    如果当前时间减去上一次IDLE事件触发的时间大于用户设置的idleTime,则触发一次sessionIdle事件。
    public void fireSessionIdle(IdleStatus status) {  
        session.increaseIdleCount(status, System.currentTimeMillis());  
        Entry head = this.head;  
        callNextSessionIdle(head, session, status);  
    }  
    increaseIdleCount这个方法会更新lastToTime的值为当前时间,紧接着穿透过滤器链(当然在过滤器的sessionIdle中可能会做一些操作)到达IoHandler的sessionIdle方法,如果需要在session空闲的时候做一些操作,就可以在这个方法里面做。
     
     
  • 相关阅读:
    CNZZ公告:近期无法获取百度关键词
    怎样注册uber司机 如何注册uber司机 最新详细攻略
    Uber司机一周体验记:成单率仅57%
    Uber司机手机终端问答篇
    Uber 司机有话说:你以为当个 Uber 司机很轻松?大错特错!
    每门编程语言修复了什么
    let区别(关于racket和r5rs)
    Bloom Filter
    静态作用域与动态作用域
    C 语言的可变参数表函数的设计
  • 原文地址:https://www.cnblogs.com/duanxz/p/6754604.html
Copyright © 2011-2022 走看看