zoukankan      html  css  js  c++  java
  • [编织消息框架][netty源码分析]6 ChannelPipeline 实现类DefaultChannelPipeline职责与实现

    ChannelPipeline 负责channel数据进出处理,如数据编解码等。采用拦截思想设计,经过A handler处理后接着交给next handler

    ChannelPipeline 并不是直接管理handler 而是通过 context 包装管理,一般以context 命名的是个重量级对象,提供给多层使用 

    public interface ChannelPipeline
            extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
            
        //链表追加handler方法
        ChannelPipeline addLast(String name, ChannelHandler handler);
        ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
        
        ///////////////公开获取 ChannelHandler ChannelHandlerContext/////////////////
        ChannelHandler first();
        ChannelHandlerContext firstContext();
        
        ChannelHandler last();
        ChannelHandlerContext lastContext();
        
        //省略部份代码.....
        
        ////////////////ChannelInboundInvoker接口的所有方法 统一返回 ChannelPipeline 对象//////////////////////
        ChannelPipeline fireChannelRegistered();
        ChannelPipeline fireChannelUnregistered();
        ChannelPipeline fireChannelActive();
        ChannelPipeline fireChannelInactive();
        ChannelPipeline fireExceptionCaught(Throwable cause);
        ChannelPipeline fireUserEventTriggered(Object event);
        ChannelPipeline fireChannelRead(Object msg);
        ChannelPipeline fireChannelReadComplete();
        ChannelPipeline fireChannelWritabilityChanged();
        ChannelPipeline flush();
        
        ////////////////ChannelOutboundInvoker接口的所有方法 统一返回 ChannelFuture ChannelOutboundInvoker 对象//////////////////////
        ChannelFuture bind(SocketAddress localAddress);
        ChannelFuture connect(SocketAddress remoteAddress);
        ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
        ChannelFuture disconnect();
        ChannelFuture close();
        ChannelFuture deregister();
        ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
        ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
        ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
        ChannelFuture disconnect(ChannelPromise promise);
        ChannelFuture close(ChannelPromise promise);
        ChannelFuture deregister(ChannelPromise promise);
        ChannelOutboundInvoker read();
        ChannelFuture write(Object msg);
        ChannelFuture write(Object msg, ChannelPromise promise);
        ChannelOutboundInvoker flush();
        ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
        ChannelFuture writeAndFlush(Object msg);
        ChannelPromise newPromise();
        ChannelProgressivePromise newProgressivePromise();
        ChannelFuture newSucceededFuture();
        ChannelFuture newFailedFuture(Throwable cause);
        ChannelPromise voidPromise();
    }

    DefaultChannelPipeline实现做了代码整理,其中在添加handler时生成context由于代码比较简单不显示出来

    public class DefaultChannelPipeline implements ChannelPipeline {
        final AbstractChannelHandlerContext head;
        final AbstractChannelHandlerContext tail;
        
        //绑定返回对象
        private final Channel channel;
        private final ChannelFuture succeededFuture;
        private final VoidChannelPromise voidPromise;
        
        protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }
        
        //双向链表追加,在tail之前插入
        private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }
        //双向链表删除
        private static void remove0(AbstractChannelHandlerContext ctx) {
            AbstractChannelHandlerContext prev = ctx.prev;
            AbstractChannelHandlerContext next = ctx.next;
            prev.next = next;
            next.prev = prev;
        }
        
        //HeadContext ChannelOutboundHandler出站操作通过Unsafe委托处理
        final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
    
            private final Unsafe unsafe;
    
            HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, HEAD_NAME, false, true);
                unsafe = pipeline.channel().unsafe();
                setAddComplete();
            }
      
            @Override
            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                    throws Exception {
                unsafe.bind(localAddress, promise);
            }
    
            @Override
            public void connect(
                    ChannelHandlerContext ctx,
                    SocketAddress remoteAddress, SocketAddress localAddress,
                    ChannelPromise promise) throws Exception {
                unsafe.connect(remoteAddress, localAddress, promise);
            }
    
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                unsafe.write(msg, promise);
            }
    
            @Override
            public void flush(ChannelHandlerContext ctx) throws Exception {
                unsafe.flush();
            }
    
    
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                invokeHandlerAddedIfNeeded();
                ctx.fireChannelRegistered();
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ctx.fireChannelRead(msg);
            }
            //......
        }
     }
     
        //ChannelHandlerContext 部份代码
        //通过findContextInbound 查找下一个ctx 再通过ctx内部调用handler方法
        @Override
        public ChannelHandlerContext fireChannelRegistered() {
            invokeChannelRegistered(findContextInbound());
            return this;
        }
    
        static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRegistered();
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRegistered();
                    }
                });
            }
        }
    
        private void invokeChannelRegistered() {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRegistered(this);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRegistered();
            }
        }
        //数据入站时从头到尾查找ctx
        private AbstractChannelHandlerContext findContextInbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while (!ctx.inbound);
            return ctx;
        }
    
             //数据出站时从尾到头查找ctx
        private AbstractChannelHandlerContext findContextOutbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.prev;
            } while (!ctx.outbound);
            return ctx;
        }

    小总:

    1.从设计上可以看出,统一返回一个对象能减少大量的学习成本同开发成本

    2.追加handler可以绑定一个线程组,在处理比较耗时的handler可以独立绑定线程组

    3.从源码上看出:数据入站时如 channelRead(ChannelHandlerContext ctx, Object msg) 当处理完数据要手动执行next ctx action ctx.fireChannelRead(msg) 这点是比如奇怪的,如果开发者忘记调用了链表就断啦

    4.DefaultChannelPipeline构造时默认生成head、tail,数据出站时操作顺序是tail ->linkHandler-> head,数据入站时是 head->linkHandler->tail

    5.ChannelPipeline用到双向链表技术,大家在研发过程中可参考设计

  • 相关阅读:
    IIS的各种身份验证详细测试
    HTTP Error 401.3 Unauthorized Error While creating IIS 7.0 web site on Windows 7
    C/S and B/S
    WCF ContractFilter mismatch at the EndpointDispatcher exception
    Configure WCF
    Inheritance VS Composition
    Unhandled Error in Silverlight Application, code 2103 when changing the namespace
    Java RMI VS TCP Socket
    Principles Of Object Oriented Design
    Socket处理发送和接收数据包,一个小实例:
  • 原文地址:https://www.cnblogs.com/solq111/p/7055091.html
Copyright © 2011-2022 走看看