zoukankan      html  css  js  c++  java
  • mina2中的session

    简介

    session类图

    Mina每建立一个连接同时会创建一个session对象,用于保存这次读写需要用到的所有信息。从抽象类AbstractIoSession中可以看出session具有如下功能:
    1、从attributes成员可以看出session可以存放用户关心的键值对
    2、注意到WriteRequestQueue,这是一个写请求队列,processor中调用flush或者flushNow方法时会将用户写入的数据包装成一个writeRequest对象,并加入这个队列中。
    3、提供了大量的统计功能,比如接收到了多少消息、最后读取时间等
    在代码中一般是这样使用session的
    // 创建服务器监听  
    IoAcceptor acceptor = new NioSocketAcceptor();  
    // 设置buffer的长度  
    acceptor.getSessionConfig().setReadBufferSize(2048);  
    // 设置连接超时时间  
    acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);  

    session做为一个连接的具体对象,缓存当前连接用户的一些信息。

    创建好acceptor或者connector之后,通过IoSessionConfig对session对行配置。
    用得最多的是通过session写入数据,这是调用了IoSession的write方法
    WriteFuture write(Object message);  
    WriteFuture write(Object message, SocketAddress destination);  

    下面着重分析创建过程以及session的状态

    创建与初始化

    每建立一个连接,就会创建一个session,IoAcceptor的accept方法的返回值正是一个session。见NioSocketAcceptor.accept(IoProcessor<NioSession> processor, ServerSocketChannel handle)方法:
     protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
    
            SelectionKey key = handle.keyFor(selector);
    
            if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
                return null;
            }
    
            // accept the connection from the client
            SocketChannel ch = handle.accept();
    
            if (ch == null) {
                return null;
            }
    
            return new NioSocketSession(this, processor, ch);
        }
    由以上代码可知,session包含了对众多对象的引用,比如processor,socketChannel,SelectionKey,IoFilter等。
    session在创建好后,紧接着就会对其进行初始化。AbstractIoService.initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer)方法如下:
        protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
            // Update lastIoTime if needed.
            if (stats.getLastReadTime() == 0) {
                stats.setLastReadTime(getActivationTime());
            }
    
            if (stats.getLastWriteTime() == 0) {
                stats.setLastWriteTime(getActivationTime());
            }
    
            // Every property but attributeMap should be set now.
            // Now initialize the attributeMap.  The reason why we initialize
            // the attributeMap at last is to make sure all session properties
            // such as remoteAddress are provided to IoSessionDataStructureFactory.
            try {
                ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
                        .getAttributeMap(session));
            } catch (IoSessionInitializationException e) {
                throw e;
            } catch (Exception e) {
                throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
            }
    
            try {
                ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
                        .getWriteRequestQueue(session));
            } catch (IoSessionInitializationException e) {
                throw e;
            } catch (Exception e) {
                throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
            }
    
            if ((future != null) && (future instanceof ConnectFuture)) {
                // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
                session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
            }
    
            if (sessionInitializer != null) {
                sessionInitializer.initializeSession(session, future);
            }
    
            finishSessionInitialization0(session, future);
        }
    设置上次读写时间,初始化属性map和写请求队列等。
    session被初始化好之后会加入到processor中,processor中有一个队列专门存放session:
    例如:AbstractPollingIoProcessor.java中有:
    private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
    
        /** A queue used to store the sessions to be removed */
        private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
    
        /** A queue used to store the sessions to be flushed */
        private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
    
        /**
         * A queue used to store the sessions which have a trafficControl to be
         * updated
         */
        private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
    
        /** The processor thread : it handles the incoming messages */
        private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
    加入队列之后,processor就会从队列中取出session,以下是processor的run方法关键代码:
    1. private class Processor implements Runnable {  
           public void run() {  
               for (;;) {  
                   long t0 = System.currentTimeMillis();  
                   int selected = select(SELECT_TIMEOUT);  
                   long t1 = System.currentTimeMillis();  
                   long delta = (t1 - t0);  
        
                   if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {  
                       if (isBrokenConnection()) {  
                           wakeupCalled.getAndSet(false);  
                           continue;  
                       } else {  
                           registerNewSelector();  
                       }  
                       wakeupCalled.getAndSet(false);  
                       continue;  
                   }  
        
                   nSessions += handleNewSessions();  
        
                   updateTrafficMask();  
        
                   if (selected > 0) {  
                       process();  
                   }  
                 
                   nSessions -= removeSessions();  
                                
               }  
           }  
       }  
    1、不断地调用select方法来检查是否有session准备就绪,如果没有或者间隔时间小于100ms则检查selector是否可用,如果不可用重新建一个selector(这里linux下的epoll的问题可能导致selector不可用。)
    2、从newSessions队列中取出这些session,并将其负责的通道注册到selector上
    3、处理准备就绪的session
    AbstractPollingIoProcessor.java
    private void process(S session) {  
        // Process Reads  
        if (isReadable(session) && !session.isReadSuspended()) {  
            read(session);  
        }  
      
        // Process writes  
        if (isWritable(session) && !session.isWriteSuspended()) {  
            // add the session to the queue, if it's not already there  
            if (session.setScheduledForFlush(true)) {  
                flushingSessions.add(session);  
            }  
        }  
    }
    总结一下创建与初始化过程:连接到来创建一个session,初始化好之后加入到processor负责的一个队列中。processor线程会把队列中的session对应的通道都注册到它自己的selector上,然后这个selector轮询这些通道是否准备就绪,一旦准备就绪就调用对应方法进行处理(read or flushNow)。

    状态

    Mina中的session具有状态,且状态之间是可以相互转化的
    Connected:session被创建时处于这种状态
    Idle:没有请求可以处理(可配置)
    Closing:正处于关闭状态(可能正在做一些清理工作)
    Closed:关闭状态
    下图是这几种状态之间的转化图:
    IoFilter与IoHandler就是在这些状态上面加以干预,下面重点看一下IDLE状态,它分三种:
    Idle for read:在规定时间内没有数据可读
    Idle for write:在规定时间内没有数据可写
    Idle for both:在规定时间内没有数据可读和可写
    这三种状态分别对应IdleStatus类的三个常量:READER_IDLE、WRITER_IDLE、BOTH_IDLE
    前面session的用法中有如下设置:
    acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 
    acceptor的run方法中调用了notifyIdleSessions
    private void notifyIdleSessions( long currentTime )  
        {  
            // process idle sessions  
            if ( currentTime - lastIdleCheckTime >= 1000 )  
            {  
                lastIdleCheckTime = currentTime;  
                AbstractIoSession.notifyIdleness( getListeners().getManagedSessions().values().iterator(), currentTime );  
            }  
     } 
    每隔一秒一检查是否到达了设置的空闲时间
    1. private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,  
              long lastIoTime) {  
          if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {  
              session.getFilterChain().fireSessionIdle(status);  
          }  
      }  
    如果当前时间减去上一次IDLE事件触发的时间大于用户设置的idleTime,则触发一次sessionIdle事件。
    public void fireSessionIdle(IdleStatus status) {  
        session.increaseIdleCount(status, System.currentTimeMillis());  
        Entry head = this.head;  
        callNextSessionIdle(head, session, status);  
    }  
    increaseIdleCount这个方法会更新lastToTime的值为当前时间,紧接着穿透过滤器链(当然在过滤器的sessionIdle中可能会做一些操作)到达IoHandler的sessionIdle方法,如果需要在session空闲的时候做一些操作,就可以在这个方法里面做。
     
     
  • 相关阅读:
    解决UITableView中Cell重用机制导致内容出错的方法总结
    Hdu 1052 Tian Ji -- The Horse Racing
    Hdu 1009 FatMouse' Trade
    hdu 2037 今年暑假不AC
    hdu 1559 最大子矩阵
    hdu 1004 Let the Balloon Rise
    Hdu 1214 圆桌会议
    Hdu 1081 To The Max
    Hdu 2845 Beans
    Hdu 2955 Robberies 0/1背包
  • 原文地址:https://www.cnblogs.com/duanxz/p/6754604.html
Copyright © 2011-2022 走看看