zoukankan      html  css  js  c++  java
  • Mina之session

    http://www.cnblogs.com/ggzwtj/archive/2011/10/14/2212095.html

    Mina之session

     

    1、IoSession与底层的传输层类型无关,表示通信双端的连接。提供用户自定义属性,可以用于在过滤器和处理器之间交换用户自定义协议相关信息。每个会话都由一个Service来提供服务,同时有一个Handler负责此会话的I/O事件处理。最重要的两个方法就是read和write,这两个方法都是异步执行,如要真正完成必须在其结果上进行等待。关闭会话的方法close也是异步执行的,也就是应等待返回的CloseFuture,此外,还有另一种关闭方式closeOnFlush,它和close的区别是会先flush掉写请求队列中的请求数据,但同样是异步的。会话的读写类型是可配置的,在运行中可设置此端是否可读写。

      一个会话主要包括两方面的数据:属性映射图和写请求队列,这里使用工厂模式来为新创建的会话提供这些数据结构,定义如下:

    public interface IoSessionDataStructureFactory {
    //返回属性
    IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;
    //返回写请求队列
    WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
    }

    2、IoSessionConfig表示会话的配置信息,主要包括:读缓冲区大小,会话数据吞吐量,计算吞吐量的时间间隔,指定会话段的空闲时间,写请求操作超时时间等。这个里面有两个方法需要注意,如下:

    /*
    * 只有在IoSession的read方法可用的时候返回true。 如果可用,受到的消息
    * 保存在BlockingQueue这样对那个客户端应用程序来说更方便取到消息。开
    * 启这个对服务器没什么好处,并且可能造成内存漏洞,默认是不开启的。
    */
    boolean isUseReadOperation();
    /*
    * 打开或关闭IoSession的read方法。
    */
    void setUseReadOperation(boolean useReadOperation);

    3、IoSessionInitializer定义了一个回调函数,用于把用户自定义的会话初始化行为剥离出来:

    public interface IoSessionInitializer<T extends IoFuture> {
    void initializeSession(IoSession session, T future);
    }

    4、IoSessionRecycler为一个无连接的传输服务提供回收现有会话的服务:

    public interface IoSessionRecycler {
    /**
    * 一个虚假的recycler(并不回收任何session)。但是用这个可以使得所有session
    * 的生命周期事件被fired
    */
    static IoSessionRecycler NOOP = new IoSessionRecycler() {
    public void put(IoSession session) {}
    public IoSession recycle(SocketAddress localAddress,SocketAddress remoteAddress) {
    return null;
    }
    public void remove(IoSession session) {}
    };
    /*
    * 创建或写Iossion的时候被调用。
    */
    void put(IoSession session);
    /*
    * 尝试获取一个被回收了的IoSession。
    */
    IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress);
    /*
    * 会话被关闭的时候调用。
    */
    void remove(IoSession session);
    }

    ExpiringSessionRecycler是IoSessionRecycler的一个实现,用来回收超时失效的会话:

    private ExpiringMap<Object, IoSession> sessionMap;//待处理的会话
    private ExpiringMap<Object, IoSession>.Expirer mapExpirer;//负责具体的回收工作

    下面来看Key是什么样的:

        private Object generateKey(SocketAddress localAddress,SocketAddress remoteAddress) {
    List<SocketAddress> key = new ArrayList<SocketAddress>(2);
    key.add(remoteAddress);
    key.add(localAddress);
    return key;
    }

    ExpiringMap中保存了超过限制的对象和该对象的监听器,如下:

    public class ExpiringMap<K, V> implements Map<K, V> {
    public static final int DEFAULT_TIME_TO_LIVE = 60;
    public static final int DEFAULT_EXPIRATION_INTERVAL = 1;
    private static volatile int expirerCount = 1;
    private final ConcurrentHashMap<K, ExpiringObject> delegate;
    private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners;
    private final Expirer expirer;
    }

    其中的ExpiringObject表示一个超过限制的对象,是ExpiringMap的一个内部类,如下:

    private class ExpiringObject {
    private K key;
    private V value;
    private long lastAccessTime;//上次访问时间
    private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
    }

    mapExpirer用来移除sessionMap上超过临界值的项,关键代码如下:

    private void processExpires() {
    long timeNow = System.currentTimeMillis();//当前时间
    for (ExpiringObject o : delegate.values()) {
    if (timeToLiveMillis <= 0) {
    continue;
    }
    long timeIdle = timeNow - o.getLastAccessTime();
    if (timeIdle >= timeToLiveMillis) {
    delegate.remove(o.getKey());//移除
    for (ExpirationListener<V> listener : expirationListeners) {
    listener.expired(o.getValue());//终止监听?
    }
    }
    }
    }

    启动关闭该县城都需要进行封锁机制。

    5、Mina中的I/O事件类型如下:

    public enum IoEventType {
    SESSION_CREATED,
    SESSION_OPENED,
    SESSION_CLOSED,
    MESSAGE_RECEIVED,
    MESSAGE_SENT,
    SESSION_IDLE,
    EXCEPTION_CAUGHT,
    WRITE,
    CLOSE,
    }

    IoEvent表示一个I/O事件或者一个I/O请求,包括时间类型、所属会话、时间参数:

    public class IoEvent implements Runnable {
    private final IoEventType type;
    private final IoSession session;
    private final Object parameter;
    public IoEvent(IoEventType type, IoSession session, Object parameter) {
    //...
    }
    //根据事件类型向会话的过滤链上的众多监听者发出事件到来的信号。
    public void fire() {
    switch (getType()) {
    case MESSAGE_RECEIVED:
    getSession().getFilterChain().fireMessageReceived(getParameter());break;
    case MESSAGE_SENT:
    getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());break;
    case WRITE:
    getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());break;
    case CLOSE:
    getSession().getFilterChain().fireFilterClose();break;
    case EXCEPTION_CAUGHT:
    getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());break;
    case SESSION_IDLE:
    getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());break;
    case SESSION_OPENED:
    getSession().getFilterChain().fireSessionOpened();
    break;
    case SESSION_CREATED:
    getSession().getFilterChain().fireSessionCreated();
    break;
    case SESSION_CLOSED:
    getSession().getFilterChain().fireSessionClosed();
    break;
    default:
    throw new IllegalArgumentException("Unknown event type: " + getType());
    }
    }
    public String toString() {
    if (getParameter() == null) {
    return "[" + getSession() + "] " + getType().name();
    }
    return "[" + getSession() + "] " + getType().name() + ": "+ getParameter();
    }
    }

    Mina的会话中,有三种类型的闲置状态:READER_IDLE读端空闲、WRITER_IDLE写端空闲、BOTH_IDLE读写都空闲。为了节省会话资源可以让用户设置当空闲超过一定时间后关闭会话,因为此会话可能在一段出现问题,从而导致另一端空闲超过太长时间。
    6、DefaultIoSessionDataStructureFactory是IoSessionDataStructureFactory的一个默认的实现:

    public class DefaultIoSessionDataStructureFactory implements IoSessionDataStructureFactory {
    public IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception {
    return new DefaultIoSessionAttributeMap();
    }
    public WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception {
    return new DefaultWriteRequestQueue();
    }
    private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
    private final Map<Object, Object> attributes = Collections.synchronizedMap(new HashMap<Object, Object>(4));
    public DefaultIoSessionAttributeMap() {
    super();
    }
    public Object getAttribute(IoSession session, Object key, Object defaultValue) {
    if (key == null) {
    throw new NullPointerException("key");
    }
    Object answer = attributes.get(key);
    if (answer == null) {
    return defaultValue;
    }
    return answer;
    }

    public Object setAttribute(IoSession session, Object key, Object value) {
    if (key == null) {
    throw new NullPointerException("key");
    }
    if (value == null) {
    return attributes.remove(key);
    }
    return attributes.put(key, value);
    }
    public Object setAttributeIfAbsent(IoSession session, Object key, Object value) {
    if (key == null) {
    throw new NullPointerException("key");
    }
    if (value == null) {
    return null;
    }

    Object oldValue;
    synchronized (attributes) {
    oldValue = attributes.get(key);
    if (oldValue == null) {
    attributes.put(key, value);
    }
    }
    return oldValue;
    }
    public Object removeAttribute(IoSession session, Object key) {
    if (key == null) {
    throw new NullPointerException("key");
    }
    return attributes.remove(key);
    }
    public boolean removeAttribute(IoSession session, Object key, Object value) {
    if (key == null) {
    throw new NullPointerException("key");
    }
    if (value == null) {
    return false;
    }
    synchronized (attributes) {
    if (value.equals(attributes.get(key))) {
    attributes.remove(key);
    return true;
    }
    }
    return false;
    }
    public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) {
    synchronized (attributes) {
    Object actualOldValue = attributes.get(key);
    if (actualOldValue == null) {
    return false;
    }
    if (actualOldValue.equals(oldValue)) {
    attributes.put(key, newValue);
    return true;
    }
    return false;
    }
    }
    public boolean containsAttribute(IoSession session, Object key) {
    return attributes.containsKey(key);
    }
    public Set<Object> getAttributeKeys(IoSession session) {
    synchronized (attributes) {
    return new HashSet<Object>(attributes.keySet());
    }
    }
    public void dispose(IoSession session) throws Exception {}
    }
    private static class DefaultWriteRequestQueue implements WriteRequestQueue {
    //一个队列存储传入的写请求
    private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
    public DefaultWriteRequestQueue() {
    super();
    }
    //...
    }
    }

    7、AbstractIoSession是IoSession的一个抽象实现类,如下:

    private IoSessionAttributeMap attributes;//会话属性
    private WriteRequestQueue writeRequestQueue;//写请求队列
    private WriteRequest currentWriteRequest;//当前写请求

    下面是关闭的时候涉及的成员:

    //要结束当前会话时发送写请求CLOSE_REQUEST
    private static final WriteRequest CLOSE_REQUEST =
    new DefaultWriteRequest(new Object());
    //在连接关闭时状态被设置为closed
    private final CloseFuture closeFuture = new DefaultCloseFuture(this);
    //关闭的监听器
    private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
    new IoFutureListener<CloseFuture>() {
    public void operationComplete(CloseFuture future) {
    AbstractIoSession session = (AbstractIoSession) future.getSession();
    session.scheduledWriteBytes.set(0);
    session.scheduledWriteMessages.set(0);
    session.readBytesThroughput = 0;
    session.readMessagesThroughput = 0;
    session.writtenBytesThroughput = 0;
    session.writtenMessagesThroughput = 0;
    }
    };

    close和closeOnFlush都是异步操作的,区别是前者立即关闭连接,后者是在写请求队列中放入一个CLOSE_REQUEST并将其即时刷新储蓄,若要真正等到关闭完成,需要调用方法在返回的CloseFuture等待。下面是close的代码:

        public final CloseFuture close() {
    synchronized (lock) {
    if (isClosing()) {
    return closeFuture;
    }

    closing = true;
    }
    getFilterChain().fireFilterClose();//fire出关闭事件
    return closeFuture;
    }

    下面是closeOnFlush的代码:

        private final CloseFuture closeOnFlush() {
    getWriteRequestQueue().offer(this, CLOSE_REQUEST);
    getProcessor().flush(this);
    return closeFuture;
    }

    对于读的情况,下面是取得读的数据队列:

        //返回可被读取数据队列
    private Queue<ReadFuture> getReadyReadFutures() {
    Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
    //如果是第一次读取数据
    if (readyReadFutures == null) {
    //构造一个新的读数据队列
    readyReadFutures = new CircularQueue<ReadFuture>();
    Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, readyReadFutures);
    if (oldReadyReadFutures != null) {
    readyReadFutures = oldReadyReadFutures;
    }
    }
    return readyReadFutures;
    }

    读数据的过程:

        public final ReadFuture read() {
    //配置不允许读数据
    if (!getConfig().isUseReadOperation()) {
    throw new IllegalStateException("useReadOperation is not enabled.");
    }
    //获得已经被许可的可被读数据队列
    Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
    ReadFuture future;
    synchronized (readyReadFutures) {
    future = readyReadFutures.poll();
    if (future != null) {
    //如果关联的会话已关闭,通知读者
    if (future.isClosed()) {
    readyReadFutures.offer(future);
    }
    } else {
    future = new DefaultReadFuture(this);
    //将数据出入等待读的队列
    getWaitingReadFutures().offer(future);
    }
    }
    return future;
    }

    写数据的过程:

        //IoBuffer、文件、文件部分区域
    public WriteFuture write(Object message, SocketAddress remoteAddress) {
    if (message == null) {
    throw new NullPointerException("message");
    }
    //如果没有远端地址
    if (!getTransportMetadata().isConnectionless() && remoteAddress != null) {
    throw new UnsupportedOperationException();
    }
    //如果会话已被关闭
    if (isClosing() || !isConnected()) {
    WriteFuture future = new DefaultWriteFuture(this);
    WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
    WriteException writeException = new WriteToClosedSessionException(request);
    future.setException(writeException);
    return future;
    }
    FileChannel openedFileChannel = null;
    try {
    if (message instanceof IoBuffer && !((IoBuffer) message).hasRemaining()) {//如果是空消息
    throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
    } else if (message instanceof FileChannel) {//文件的某一区域
    FileChannel fileChannel = (FileChannel) message;
    message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
    } else if (message instanceof File) {//要发送的是文件
    File file = (File) message;
    openedFileChannel = new FileInputStream(file).getChannel();//打开文件通道
    message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
    }
    } catch (IOException e) {
    ExceptionMonitor.getInstance().exceptionCaught(e);
    return DefaultWriteFuture.newNotWrittenFuture(this, e);
    }
    //构造写请求,通过过滤器链发送出去
    WriteFuture writeFuture = new DefaultWriteFuture(this);
    WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
    IoFilterChain filterChain = getFilterChain();
    filterChain.fireFilterWrite(writeRequest);

    //如果打开文件通道应该在完成时关闭通道
    if (openedFileChannel != null) {
    final FileChannel finalChannel = openedFileChannel;
    writeFuture.addListener(new IoFutureListener<WriteFuture>() {
    public void operationComplete(WriteFuture future) {
    try {
    finalChannel.close();
    } catch (IOException e) {
    ExceptionMonitor.getInstance().exceptionCaught(e);
    }
    }
    });
    }
    return writeFuture;
    }
     
  • 相关阅读:
    l1-010
    l1-009
    L1-008修改
    l1-008
    Codeforces Round #406 (Div. 2)
    求N!的长度【数学】 51nod 1058 1130
    51nod 1090 & 1267 【二分简单题】
    Codeforces Round #405 (Div. 2)
    Codeforces Round #404 (Div. 2)
    PAT 天梯赛真题集(L2、L3)
  • 原文地址:https://www.cnblogs.com/balaamwe/p/2317339.html
Copyright © 2011-2022 走看看