zoukankan      html  css  js  c++  java
  • mina中责任链模式的实现

    一、mina的框架回顾

      责任链模式在mina中有重要的作用,其中Filter机制就是基于责任链实现的。

    从上图看到消息的接受从IoService层先经过Filter层过滤处理后最后交给IoHander,消息的发送则是反过来从IoHander层经过Filter层再到IoService层。
    我们来想想这样做的好处:
    第一点就是可以把消息从原始字节封装成对象方便处理,或者从对象到原始字节,那这就是decode和encode过程。
    第二点过滤消息,业务层只需要处理感兴趣的消息。
    当然还有其他...
    再来想想Filter层是怎么实现的呢
    从图中看到接收消息和发送消息经过Filter层是相反处理的,那么每个Filter就必须知道前一个和后一个Filter,我们就很容易想到了双向链表结构。那么让我们来看看Filter层具体是如何实现,即mina中责任链模式的实现。

    二、mina中filter

    2.1、mina中的filterchain包结构

    • IoFilter接口:Filter层的每个filter都是对上图IoFilter接口的实现。
    • IoFilterChainBuilder接口和DefaultIoFilterChainBuilder实现不再细讲,从字面意思就是IoFilterChain的建造者。
    • IoFilterEvent是代表filter事件,IoFilterLifeCycleException是指filter中出现循环链表异常。
    下面的图是我们要重点讲解的几个类的关系

    我们重点讲解的只有四个类,为什么会出现这么多的类呢,有些类使内部实现,不太好表示就都画出来了,先来说明下
    IoFilter接口:NextFilter接口是其内部接口。
    --IoFilterAdapter类:对IoFilter接口的实现,是所有Filter的基类。
    IoFilterChain接口:Entry接口是其内部接口。
    --DefaultIoFilterChain类:是对IoFilterChain接口的实现,有EntryImpl,HeadFilter,TailFilter三个内部类,其中EntryImpl类中又有NextFilter接口的内部实现。
    最主要的一个接口IoFilterChain和它的一个内部接口Entry,其中IoFilterChain代表了过滤器的容器,它本身就是一个对象引用形成的链表结构,默认的实现DefaultIoFilterChain其实有对链表头(head)的引用,找到头后就可以顺着头向下找,一直找到尾(tail),Entry是对IoFilter和NextFilter的封装整合接口,它属于链表IoFilterChain中的元素。指向当前和下一个过滤器。
     
    HeadFilter类只对发送消息处理方法重载,TailFilter类只对接受消息处理方法重载。想一想便得知,HeadFilter是发送消息最后的处理节点,TailFilter是接受消息最后的处理节点。最后的节点处理就是将写消息交给Io线程处理,将读消息交给IoHander的业务层处理。所以说HeadFilter和TailFilter只需对某一方消息处理,反面消息默认交给下一个节点处理。

    三、mina的filter中实现类

    3.0、NextFilter接口

      NextFilter接口看上去和IoFilter接口差不多,但NextFilter接口代表的是“下一个filter”,这里的下是抽象的,因为在mina的各种链中,处理顺序有的是从头到尾,有的是从尾到头,而这里的下就代表了熟悉中的下一个filter。

    3.1、HeadFilter

      HeadFilter类只对发送消息处理方法重载,HeadFilter是发送消息最后的处理节点。

    3.1.1、方法列表

    3.1.2、调用IoService(边界)

    关键代码见下面,将待发送的数据(字节)写到缓冲队列里,或直接调用flush()写到系统缓冲区。

                WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();
    
                if (!s.isWriteSuspended()) {
                    if (writeRequestQueue.size() == 0) {
                        // We can write directly the message
                        s.getProcessor().write(s, writeRequest);
                    } else {
                        s.getWriteRequestQueue().offer(s, writeRequest);
                        s.getProcessor().flush(s);
                    }
                } else {
                    s.getWriteRequestQueue().offer(s, writeRequest);
                }

    3.1.1、全部源码

        private class HeadFilter extends IoFilterAdapter {
            @SuppressWarnings("unchecked")
            @Override
            public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
    
                AbstractIoSession s = (AbstractIoSession) session;
    
                // Maintain counters.
                if (writeRequest.getMessage() instanceof IoBuffer) {
                    IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
                    // I/O processor implementation will call buffer.reset()
                    // it after the write operation is finished, because
                    // the buffer will be specified with messageSent event.
                    buffer.mark();
                    int remaining = buffer.remaining();
    
                    if (remaining == 0) {
                        // Zero-sized buffer means the internal message
                        // delimiter.
                        s.increaseScheduledWriteMessages();
                    } else {
                        s.increaseScheduledWriteBytes(remaining);
                    }
                } else {
                    s.increaseScheduledWriteMessages();
                }
    
                WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();
    
                if (!s.isWriteSuspended()) {
                    if (writeRequestQueue.size() == 0) {
                        // We can write directly the message
                        s.getProcessor().write(s, writeRequest);
                    } else {
                        s.getWriteRequestQueue().offer(s, writeRequest);
                        s.getProcessor().flush(s);
                    }
                } else {
                    s.getWriteRequestQueue().offer(s, writeRequest);
                }
            }
    
            @SuppressWarnings("unchecked")
            @Override
            public void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
                ((AbstractIoSession) session).getProcessor().remove(session);
            }
        }

    3.2、TailFilter  

    TailFilter类只对接受消息处理方法重载。

    3.2.1、方法列表

    3.2.2、调用IoHandler(边界)

    先判断待处理数据类型,如果不满足条件则继续读取数据。否则,调用handler类来接收处理消息。

            public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
                AbstractIoSession s = (AbstractIoSession) session;
                if (!(message instanceof IoBuffer)) {
                    s.increaseReadMessages(System.currentTimeMillis());
                } else if (!((IoBuffer) message).hasRemaining()) {
                    s.increaseReadMessages(System.currentTimeMillis());
                }
    
                try {
                    session.getHandler().messageReceived(s, message);
                } finally {
                    if (s.getConfig().isUseReadOperation()) {
                        s.offerReadFuture(message);
                    }
                }
            }

    3.2.3、全部源码

        private static class TailFilter extends IoFilterAdapter {
            @Override
            public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
                try {
                    session.getHandler().sessionCreated(session);
                } finally {
                    // Notify the related future.
                    ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE);
                    if (future != null) {
                        future.setSession(session);
                    }
                }
            }
    
            @Override
            public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
                session.getHandler().sessionOpened(session);
            }
    
            @Override
            public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
                AbstractIoSession s = (AbstractIoSession) session;
                try {
                    s.getHandler().sessionClosed(session);
                } finally {
                    try {
                        s.getWriteRequestQueue().dispose(session);
                    } finally {
                        try {
                            s.getAttributeMap().dispose(session);
                        } finally {
                            try {
                                // Remove all filters.
                                session.getFilterChain().clear();
                            } finally {
                                if (s.getConfig().isUseReadOperation()) {
                                    s.offerClosedReadFuture();
                                }
                            }
                        }
                    }
                }
            }
    
            @Override
            public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {
                session.getHandler().sessionIdle(session, status);
            }
    
            @Override
            public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
                AbstractIoSession s = (AbstractIoSession) session;
                try {
                    s.getHandler().exceptionCaught(s, cause);
                } finally {
                    if (s.getConfig().isUseReadOperation()) {
                        s.offerFailedReadFuture(cause);
                    }
                }
            }
    
            @Override
            public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
                AbstractIoSession s = (AbstractIoSession) session;
                if (!(message instanceof IoBuffer)) {
                    s.increaseReadMessages(System.currentTimeMillis());
                } else if (!((IoBuffer) message).hasRemaining()) {
                    s.increaseReadMessages(System.currentTimeMillis());
                }
    
                try {
                    session.getHandler().messageReceived(s, message);
                } finally {
                    if (s.getConfig().isUseReadOperation()) {
                        s.offerReadFuture(message);
                    }
                }
            }
    
            @Override
            public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
                session.getHandler().messageSent(session, writeRequest.getMessage());
            }
    
            @Override
            public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
                nextFilter.filterWrite(session, writeRequest);
            }
    
            @Override
            public void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
                nextFilter.filterClose(session);
            }
        }

    3.3、Entry接口

      Entry接口是IoFilterChain接口的一个内部接口,Entry是对IoFilter和NextFilter的封装整合接口,它属于链表IoFilterChain中的元素。指向当前和下一个过滤器。其中IoFilterChain接口代表了过滤器的容器,它本身就是一个对象引用形成的链表结构,默认的实现DefaultIoFilterChain其实有对链表头(head)的引用,找到头后就可以顺着头向下找,一直找到尾(tail)。

    Entry接口与IoFilterChain接口关系如下:

    3.4、EntryImpl类

      EntryImpl是Entry接口的实现类,类的成员中含两个接口,filter和nextFilter,他们所用的接口是不一样的,至于为什么不用同一个接口,我想可能是因为接口职责单一的原则吧。

    从EntryImpl类的构造方法看到,EntryImpl中保持对上一个节点和下一个节点引用,双向链表结构,name即过滤层名称,filter即过滤层的具体实现,而nextFilter是在构造方法中的内部实现。

        private class EntryImpl implements Entry {
            private EntryImpl prevEntry;
    
            private EntryImpl nextEntry;
    
            private final String name;
    
            private IoFilter filter;
    
            private final NextFilter nextFilter;

    3.4.2、IoFilter接口和NextFilter接口:

    虽然有IoFilter和NextFilter两个接口,接口方法都差不多,但最后真正业务的执行者还是IoFilter的实现,

    IoFilter:IoFilterAdapter做为它的默认实现,完成了适配器的功能,以后的类可以直接继承它而不用实现IoFilter接口,想实现哪个方法只需要覆盖IoFilterAdapter的类的方法即可。

    NextFilter:只起到转发的作用,看EntryImpl中的匿名的NextFilter实现类中,基本每个方法都是调用callNextSessionIdle()来完成转发的。

    3.4.3、全部源码

        private class EntryImpl implements Entry {
            private EntryImpl prevEntry;
    
            private EntryImpl nextEntry;
    
            private final String name;
    
            private IoFilter filter;
    
            private final NextFilter nextFilter;
    
            private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry, String name, IoFilter filter) {
                if (filter == null) {
                    throw new IllegalArgumentException("filter");
                }
                if (name == null) {
                    throw new IllegalArgumentException("name");
                }
    
                this.prevEntry = prevEntry;
                this.nextEntry = nextEntry;
                this.name = name;
                this.filter = filter;
                this.nextFilter = new NextFilter() {
                    public void sessionCreated(IoSession session) {
                        Entry nextEntry = EntryImpl.this.nextEntry;
                        callNextSessionCreated(nextEntry, session);
                    }
                    public void sessionOpened(IoSession session) {
                        Entry nextEntry = EntryImpl.this.nextEntry;
                        callNextSessionOpened(nextEntry, session);
                    }
                    public void sessionClosed(IoSession session) {
                        Entry nextEntry = EntryImpl.this.nextEntry;
                        callNextSessionClosed(nextEntry, session);
                    }
                    public void sessionIdle(IoSession session, IdleStatus status) {
                        Entry nextEntry = EntryImpl.this.nextEntry;
                        callNextSessionIdle(nextEntry, session, status);
                    }
                    public void exceptionCaught(IoSession session, Throwable cause) {
                        Entry nextEntry = EntryImpl.this.nextEntry;
                        callNextExceptionCaught(nextEntry, session, cause);
                    }
                    public void messageReceived(IoSession session, Object message) {
                        Entry nextEntry = EntryImpl.this.nextEntry;
                        callNextMessageReceived(nextEntry, session, message);
                    }
                    public void messageSent(IoSession session, WriteRequest writeRequest) {
                        Entry nextEntry = EntryImpl.this.nextEntry;
                        callNextMessageSent(nextEntry, session, writeRequest);
                    }
                    public void filterWrite(IoSession session, WriteRequest writeRequest) {
                        Entry nextEntry = EntryImpl.this.prevEntry;
                        callPreviousFilterWrite(nextEntry, session, writeRequest);
                    }
                    public void filterClose(IoSession session) {
                        Entry nextEntry = EntryImpl.this.prevEntry;
                        callPreviousFilterClose(nextEntry, session);
                    }
                    public String toString() {
                        return EntryImpl.this.nextEntry.name;
                    }
                };
            }public void addAfter(String name, IoFilter filter) {
                DefaultIoFilterChain.this.addAfter(getName(), name, filter);
            }
    
            public void addBefore(String name, IoFilter filter) {
                DefaultIoFilterChain.this.addBefore(getName(), name, filter);
            }
    
            public void remove() {
                DefaultIoFilterChain.this.remove(getName());
            }
    
            public void replace(IoFilter newFilter) {
                DefaultIoFilterChain.this.replace(getName(), newFilter);
            }
        }

     3.5、IoFilterEvent

      

      在org.apache.mina.core.filterchain包下我们可以看到类IoFilterEvent,可以看到它实现了基于事件的处理模型,当一个事件(比如接收到消息)发生后会触发相应的事件,进而调用过滤器链的消息处理功能进行消息的处理和转发,这块其实会有线程池的参与完成,会在以后的文章中说明这块。
    public class IoEvent implements Runnable {
        private final IoEventType type;
    
        private final IoSession session;
    
        private final Object parameter;
    
        public IoEvent(IoEventType type, IoSession session, Object parameter) {
            if (type == null) {
                throw new IllegalArgumentException("type");
            }
            if (session == null) {
                throw new IllegalArgumentException("session");
            }
            this.type = type;
            this.session = session;
            this.parameter = parameter;
        }
    
        public void run() {
            fire();
        }
    
        public void fire() {
            switch (getType()) {
            case MESSAGE_RECEIVED:
                getSession().getFilterChain().fireMessageReceived(getParameter());
                break;
            case MESSAGE_SENT:
                getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());
                break;
            case WRITE:
                getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());
                break;
            case CLOSE:
                getSession().getFilterChain().fireFilterClose();
                break;
            case EXCEPTION_CAUGHT:
                getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());
                break;
            case SESSION_IDLE:
                getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());
                break;
            case SESSION_OPENED:
                getSession().getFilterChain().fireSessionOpened();
                break;
            case SESSION_CREATED:
                getSession().getFilterChain().fireSessionCreated();
                break;
            case SESSION_CLOSED:
                getSession().getFilterChain().fireSessionClosed();
                break;
            default:
                throw new IllegalArgumentException("Unknown event type: " + getType());
            }
        }
    
    }

     IoFilterEvent 实现类

    public class IoFilterEvent extends IoEvent {
        /** A logger for this class */
        static Logger LOGGER = LoggerFactory.getLogger(IoFilterEvent.class);
    
        /** A speedup for logs */
        static boolean DEBUG = LOGGER.isDebugEnabled();
        private final NextFilter nextFilter;
    
        public IoFilterEvent(NextFilter nextFilter, IoEventType type, IoSession session, Object parameter) {
            super(type, session, parameter);
    
            if (nextFilter == null) {
                throw new IllegalArgumentException("nextFilter must not be null");
            }
            this.nextFilter = nextFilter;
        }
        public NextFilter getNextFilter() {
            return nextFilter;
        }
    
        @Override
        public void fire() {
            IoSession session = getSession();
            NextFilter nextFilter = getNextFilter();
            IoEventType type = getType();
    
            if (DEBUG) {
                LOGGER.debug("Firing a {} event for session {}", type, session.getId());
            }
    
            switch (type) {
            case MESSAGE_RECEIVED:
                Object parameter = getParameter();
                nextFilter.messageReceived(session, parameter);
                break;
    
            case MESSAGE_SENT:
                WriteRequest writeRequest = (WriteRequest) getParameter();
                nextFilter.messageSent(session, writeRequest);
                break;
    
            case WRITE:
                writeRequest = (WriteRequest) getParameter();
                nextFilter.filterWrite(session, writeRequest);
                break;
    
            case CLOSE:
                nextFilter.filterClose(session);
                break;
    
            case EXCEPTION_CAUGHT:
                Throwable throwable = (Throwable) getParameter();
                nextFilter.exceptionCaught(session, throwable);
                break;
    
            case SESSION_IDLE:
                nextFilter.sessionIdle(session, (IdleStatus) getParameter());
                break;
    
            case SESSION_OPENED:
                nextFilter.sessionOpened(session);
                break;
    
            case SESSION_CREATED:
                nextFilter.sessionCreated(session);
                break;
    
            case SESSION_CLOSED:
                nextFilter.sessionClosed(session);
                break;
    
            default:
                throw new IllegalArgumentException("Unknown event type: " + type);
            }
    
        }
    }

     3.6、IoFilterLifeCycleException

    mina中有3处使用了它,

    DefaultIoFilterChain.clear()
    DefaultIoFilterChain.deregister(EntryImpl entry)
    DefaultIoFilterChain.register(EntryImpl prevEntry, String name, IoFilter filter)
    也就是在加入/删除filter时,为双向链表的每个元素对比是否已经存在,如果存在则有存在环的风险。抛异常。
     
    读过程:
    示例讲解filter如何工作:
    fireSessionOpened方法获取当前的头节点,然后调用callNextSessionOpened方法,而callNextSessionOpened方法是从entry中获取filter和nextfitler,触发filter的sessionOpened方法,同时将nextfilter作为参数传进去,而filter层如果对这个消息感兴趣可以处理完成后调用nextfilter的sessionOpened方法,不感兴趣的话,可能消息到此就结束了。
    再回到上面EntryImpl中对NextFilter的实现,我们看到NextFilter收到sessionOpen消息,获取当前节点的下一个节点,然后触发IoFilterChain的callNextSessionOpened方法,即上图所示。再然后就是传递到下一节点处理,要么filter层拦截过滤结束,要不就是传到最后一层由TailFilter交给业务层处理。而写消息恰好相反,nextFilter是获取前一个节点,这就实现了双向过滤的功能。
    到这我们就明白了为什么EntryImpl还有NextFilter选择内部类实现了。
    NextFilter其实是起到中转站的作用,收到Reveceive消息转交给后一节点,收到Send消息转交给前一个消息。那我们再来想想为什么要用NextFilter来作为中转呢?我想应该是接口隔离的原则。Filter只需要关心如何处理接受到的消息,至于如何转交到下一个Filter不应该由他实现。
     
    参考:http://www.iteye.com/topic/1124504
    参考:http://blog.csdn.net/qarkly112649/article/details/37498251
  • 相关阅读:
    Excel 函数
    selenium+python自动化测试(二)对浏览器的简单操作
    selenium+python自动化测试(一)环境
    py中mongodb使用
    ESQL调oracle存储过程
    boost.asio简单入坑
    浅析tcp中read阻塞
    14 天堂电影信息爬取
    13 爬取豆瓣电影网电影信息
    12 lxml&XPath结合使用(提取数据详解)
  • 原文地址:https://www.cnblogs.com/duanxz/p/3472550.html
Copyright © 2011-2022 走看看