zoukankan      html  css  js  c++  java
  • Mina之session

    http://www.cnblogs.com/ggzwtj/archive/2011/10/14/2212095.html

    Mina之session

     

    1、IoSession与底层的传输层类型无关,表示通信双端的连接。提供用户自定义属性,可以用于在过滤器和处理器之间交换用户自定义协议相关信息。每个会话都由一个Service来提供服务,同时有一个Handler负责此会话的I/O事件处理。最重要的两个方法就是read和write,这两个方法都是异步执行,如要真正完成必须在其结果上进行等待。关闭会话的方法close也是异步执行的,也就是应等待返回的CloseFuture,此外,还有另一种关闭方式closeOnFlush,它和close的区别是会先flush掉写请求队列中的请求数据,但同样是异步的。会话的读写类型是可配置的,在运行中可设置此端是否可读写。

      一个会话主要包括两方面的数据:属性映射图和写请求队列,这里使用工厂模式来为新创建的会话提供这些数据结构,定义如下:

    public interface IoSessionDataStructureFactory {
    //返回属性
    IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;
    //返回写请求队列
    WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
    }

    2、IoSessionConfig表示会话的配置信息,主要包括:读缓冲区大小,会话数据吞吐量,计算吞吐量的时间间隔,指定会话段的空闲时间,写请求操作超时时间等。这个里面有两个方法需要注意,如下:

    /*
    * 只有在IoSession的read方法可用的时候返回true。 如果可用,受到的消息
    * 保存在BlockingQueue这样对那个客户端应用程序来说更方便取到消息。开
    * 启这个对服务器没什么好处,并且可能造成内存漏洞,默认是不开启的。
    */
    boolean isUseReadOperation();
    /*
    * 打开或关闭IoSession的read方法。
    */
    void setUseReadOperation(boolean useReadOperation);

    3、IoSessionInitializer定义了一个回调函数,用于把用户自定义的会话初始化行为剥离出来:

    public interface IoSessionInitializer<T extends IoFuture> {
    void initializeSession(IoSession session, T future);
    }

    4、IoSessionRecycler为一个无连接的传输服务提供回收现有会话的服务:

    public interface IoSessionRecycler {
    /**
    * 一个虚假的recycler(并不回收任何session)。但是用这个可以使得所有session
    * 的生命周期事件被fired
    */
    static IoSessionRecycler NOOP = new IoSessionRecycler() {
    public void put(IoSession session) {}
    public IoSession recycle(SocketAddress localAddress,SocketAddress remoteAddress) {
    return null;
    }
    public void remove(IoSession session) {}
    };
    /*
    * 创建或写Iossion的时候被调用。
    */
    void put(IoSession session);
    /*
    * 尝试获取一个被回收了的IoSession。
    */
    IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress);
    /*
    * 会话被关闭的时候调用。
    */
    void remove(IoSession session);
    }

    ExpiringSessionRecycler是IoSessionRecycler的一个实现,用来回收超时失效的会话:

    private ExpiringMap<Object, IoSession> sessionMap;//待处理的会话
    private ExpiringMap<Object, IoSession>.Expirer mapExpirer;//负责具体的回收工作

    下面来看Key是什么样的:

        private Object generateKey(SocketAddress localAddress,SocketAddress remoteAddress) {
    List<SocketAddress> key = new ArrayList<SocketAddress>(2);
    key.add(remoteAddress);
    key.add(localAddress);
    return key;
    }

    ExpiringMap中保存了超过限制的对象和该对象的监听器,如下:

    public class ExpiringMap<K, V> implements Map<K, V> {
    public static final int DEFAULT_TIME_TO_LIVE = 60;
    public static final int DEFAULT_EXPIRATION_INTERVAL = 1;
    private static volatile int expirerCount = 1;
    private final ConcurrentHashMap<K, ExpiringObject> delegate;
    private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners;
    private final Expirer expirer;
    }

    其中的ExpiringObject表示一个超过限制的对象,是ExpiringMap的一个内部类,如下:

    private class ExpiringObject {
    private K key;
    private V value;
    private long lastAccessTime;//上次访问时间
    private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
    }

    mapExpirer用来移除sessionMap上超过临界值的项,关键代码如下:

    private void processExpires() {
    long timeNow = System.currentTimeMillis();//当前时间
    for (ExpiringObject o : delegate.values()) {
    if (timeToLiveMillis <= 0) {
    continue;
    }
    long timeIdle = timeNow - o.getLastAccessTime();
    if (timeIdle >= timeToLiveMillis) {
    delegate.remove(o.getKey());//移除
    for (ExpirationListener<V> listener : expirationListeners) {
    listener.expired(o.getValue());//终止监听?
    }
    }
    }
    }

    启动关闭该县城都需要进行封锁机制。

    5、Mina中的I/O事件类型如下:

    public enum IoEventType {
    SESSION_CREATED,
    SESSION_OPENED,
    SESSION_CLOSED,
    MESSAGE_RECEIVED,
    MESSAGE_SENT,
    SESSION_IDLE,
    EXCEPTION_CAUGHT,
    WRITE,
    CLOSE,
    }

    IoEvent表示一个I/O事件或者一个I/O请求,包括时间类型、所属会话、时间参数:

    public class IoEvent implements Runnable {
    private final IoEventType type;
    private final IoSession session;
    private final Object parameter;
    public IoEvent(IoEventType type, IoSession session, Object parameter) {
    //...
    }
    //根据事件类型向会话的过滤链上的众多监听者发出事件到来的信号。
    public void fire() {
    switch (getType()) {
    case MESSAGE_RECEIVED:
    getSession().getFilterChain().fireMessageReceived(getParameter());break;
    case MESSAGE_SENT:
    getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());break;
    case WRITE:
    getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());break;
    case CLOSE:
    getSession().getFilterChain().fireFilterClose();break;
    case EXCEPTION_CAUGHT:
    getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());break;
    case SESSION_IDLE:
    getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());break;
    case SESSION_OPENED:
    getSession().getFilterChain().fireSessionOpened();
    break;
    case SESSION_CREATED:
    getSession().getFilterChain().fireSessionCreated();
    break;
    case SESSION_CLOSED:
    getSession().getFilterChain().fireSessionClosed();
    break;
    default:
    throw new IllegalArgumentException("Unknown event type: " + getType());
    }
    }
    public String toString() {
    if (getParameter() == null) {
    return "[" + getSession() + "] " + getType().name();
    }
    return "[" + getSession() + "] " + getType().name() + ": "+ getParameter();
    }
    }

    Mina的会话中,有三种类型的闲置状态:READER_IDLE读端空闲、WRITER_IDLE写端空闲、BOTH_IDLE读写都空闲。为了节省会话资源可以让用户设置当空闲超过一定时间后关闭会话,因为此会话可能在一段出现问题,从而导致另一端空闲超过太长时间。
    6、DefaultIoSessionDataStructureFactory是IoSessionDataStructureFactory的一个默认的实现:

    public class DefaultIoSessionDataStructureFactory implements IoSessionDataStructureFactory {
    public IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception {
    return new DefaultIoSessionAttributeMap();
    }
    public WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception {
    return new DefaultWriteRequestQueue();
    }
    private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
    private final Map<Object, Object> attributes = Collections.synchronizedMap(new HashMap<Object, Object>(4));
    public DefaultIoSessionAttributeMap() {
    super();
    }
    public Object getAttribute(IoSession session, Object key, Object defaultValue) {
    if (key == null) {
    throw new NullPointerException("key");
    }
    Object answer = attributes.get(key);
    if (answer == null) {
    return defaultValue;
    }
    return answer;
    }

    public Object setAttribute(IoSession session, Object key, Object value) {
    if (key == null) {
    throw new NullPointerException("key");
    }
    if (value == null) {
    return attributes.remove(key);
    }
    return attributes.put(key, value);
    }
    public Object setAttributeIfAbsent(IoSession session, Object key, Object value) {
    if (key == null) {
    throw new NullPointerException("key");
    }
    if (value == null) {
    return null;
    }

    Object oldValue;
    synchronized (attributes) {
    oldValue = attributes.get(key);
    if (oldValue == null) {
    attributes.put(key, value);
    }
    }
    return oldValue;
    }
    public Object removeAttribute(IoSession session, Object key) {
    if (key == null) {
    throw new NullPointerException("key");
    }
    return attributes.remove(key);
    }
    public boolean removeAttribute(IoSession session, Object key, Object value) {
    if (key == null) {
    throw new NullPointerException("key");
    }
    if (value == null) {
    return false;
    }
    synchronized (attributes) {
    if (value.equals(attributes.get(key))) {
    attributes.remove(key);
    return true;
    }
    }
    return false;
    }
    public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) {
    synchronized (attributes) {
    Object actualOldValue = attributes.get(key);
    if (actualOldValue == null) {
    return false;
    }
    if (actualOldValue.equals(oldValue)) {
    attributes.put(key, newValue);
    return true;
    }
    return false;
    }
    }
    public boolean containsAttribute(IoSession session, Object key) {
    return attributes.containsKey(key);
    }
    public Set<Object> getAttributeKeys(IoSession session) {
    synchronized (attributes) {
    return new HashSet<Object>(attributes.keySet());
    }
    }
    public void dispose(IoSession session) throws Exception {}
    }
    private static class DefaultWriteRequestQueue implements WriteRequestQueue {
    //一个队列存储传入的写请求
    private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
    public DefaultWriteRequestQueue() {
    super();
    }
    //...
    }
    }

    7、AbstractIoSession是IoSession的一个抽象实现类,如下:

    private IoSessionAttributeMap attributes;//会话属性
    private WriteRequestQueue writeRequestQueue;//写请求队列
    private WriteRequest currentWriteRequest;//当前写请求

    下面是关闭的时候涉及的成员:

    //要结束当前会话时发送写请求CLOSE_REQUEST
    private static final WriteRequest CLOSE_REQUEST =
    new DefaultWriteRequest(new Object());
    //在连接关闭时状态被设置为closed
    private final CloseFuture closeFuture = new DefaultCloseFuture(this);
    //关闭的监听器
    private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
    new IoFutureListener<CloseFuture>() {
    public void operationComplete(CloseFuture future) {
    AbstractIoSession session = (AbstractIoSession) future.getSession();
    session.scheduledWriteBytes.set(0);
    session.scheduledWriteMessages.set(0);
    session.readBytesThroughput = 0;
    session.readMessagesThroughput = 0;
    session.writtenBytesThroughput = 0;
    session.writtenMessagesThroughput = 0;
    }
    };

    close和closeOnFlush都是异步操作的,区别是前者立即关闭连接,后者是在写请求队列中放入一个CLOSE_REQUEST并将其即时刷新储蓄,若要真正等到关闭完成,需要调用方法在返回的CloseFuture等待。下面是close的代码:

        public final CloseFuture close() {
    synchronized (lock) {
    if (isClosing()) {
    return closeFuture;
    }

    closing = true;
    }
    getFilterChain().fireFilterClose();//fire出关闭事件
    return closeFuture;
    }

    下面是closeOnFlush的代码:

        private final CloseFuture closeOnFlush() {
    getWriteRequestQueue().offer(this, CLOSE_REQUEST);
    getProcessor().flush(this);
    return closeFuture;
    }

    对于读的情况,下面是取得读的数据队列:

        //返回可被读取数据队列
    private Queue<ReadFuture> getReadyReadFutures() {
    Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
    //如果是第一次读取数据
    if (readyReadFutures == null) {
    //构造一个新的读数据队列
    readyReadFutures = new CircularQueue<ReadFuture>();
    Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, readyReadFutures);
    if (oldReadyReadFutures != null) {
    readyReadFutures = oldReadyReadFutures;
    }
    }
    return readyReadFutures;
    }

    读数据的过程:

        public final ReadFuture read() {
    //配置不允许读数据
    if (!getConfig().isUseReadOperation()) {
    throw new IllegalStateException("useReadOperation is not enabled.");
    }
    //获得已经被许可的可被读数据队列
    Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
    ReadFuture future;
    synchronized (readyReadFutures) {
    future = readyReadFutures.poll();
    if (future != null) {
    //如果关联的会话已关闭,通知读者
    if (future.isClosed()) {
    readyReadFutures.offer(future);
    }
    } else {
    future = new DefaultReadFuture(this);
    //将数据出入等待读的队列
    getWaitingReadFutures().offer(future);
    }
    }
    return future;
    }

    写数据的过程:

        //IoBuffer、文件、文件部分区域
    public WriteFuture write(Object message, SocketAddress remoteAddress) {
    if (message == null) {
    throw new NullPointerException("message");
    }
    //如果没有远端地址
    if (!getTransportMetadata().isConnectionless() && remoteAddress != null) {
    throw new UnsupportedOperationException();
    }
    //如果会话已被关闭
    if (isClosing() || !isConnected()) {
    WriteFuture future = new DefaultWriteFuture(this);
    WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
    WriteException writeException = new WriteToClosedSessionException(request);
    future.setException(writeException);
    return future;
    }
    FileChannel openedFileChannel = null;
    try {
    if (message instanceof IoBuffer && !((IoBuffer) message).hasRemaining()) {//如果是空消息
    throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
    } else if (message instanceof FileChannel) {//文件的某一区域
    FileChannel fileChannel = (FileChannel) message;
    message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
    } else if (message instanceof File) {//要发送的是文件
    File file = (File) message;
    openedFileChannel = new FileInputStream(file).getChannel();//打开文件通道
    message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
    }
    } catch (IOException e) {
    ExceptionMonitor.getInstance().exceptionCaught(e);
    return DefaultWriteFuture.newNotWrittenFuture(this, e);
    }
    //构造写请求,通过过滤器链发送出去
    WriteFuture writeFuture = new DefaultWriteFuture(this);
    WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
    IoFilterChain filterChain = getFilterChain();
    filterChain.fireFilterWrite(writeRequest);

    //如果打开文件通道应该在完成时关闭通道
    if (openedFileChannel != null) {
    final FileChannel finalChannel = openedFileChannel;
    writeFuture.addListener(new IoFutureListener<WriteFuture>() {
    public void operationComplete(WriteFuture future) {
    try {
    finalChannel.close();
    } catch (IOException e) {
    ExceptionMonitor.getInstance().exceptionCaught(e);
    }
    }
    });
    }
    return writeFuture;
    }
     
  • 相关阅读:
    swift 第十四课 可视化view: @IBDesignable 、@IBInspectable
    swift 第十三课 GCD 的介绍和使用
    swift 第十二课 as 的使用方法
    swift 第十一课 结构体定义model类
    swift 第十课 cocopod 网络请求 Alamofire
    swift 第九课 用tableview 做一个下拉菜单Menu
    swift 第八课 CollectView的 添加 footerView 、headerView
    swift 第七课 xib 约束的优先级
    swift 第六课 scrollview xib 的使用
    swift 第五课 定义model类 和 导航栏隐藏返回标题
  • 原文地址:https://www.cnblogs.com/balaamwe/p/2317339.html
Copyright © 2011-2022 走看看