zoukankan      html  css  js  c++  java
  • 浅析Netty的异步事件驱动(一)

    本篇文章着重于浅析一下Netty的事件处理流程,Netty版本为netty-3.6.6.Final。

    Netty定义了非常丰富的事件类型,代表了网络交互的各个阶段。并且当各个阶段发生时,触发相应的事件交给pipeline中定义的handler处理。

    举个例子,如下一段简单的代码:

    ChannelFactory factory =
                new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
            ServerBootstrap bootstrap = new ServerBootstrap(factory);
    
            bootstrap.setPipelineFactory(new PipelineFactory());
    
            bootstrap.setOption("child.tcpNoDelay", true);
            bootstrap.setOption("child.keepAlive", true);
    
            bootstrap.bind(new InetSocketAddress(7080));

    Netty中触发事件几乎都是靠Channels类中的几个静态fire函数,因此通过在这些函数中加上Sysout方法,就可以看出这一个简单的bind方法触发了多少事件,如下:

    fireChannelOpen(final Channel channel) upstream
    bind(final Channel channel, final SocketAddress localAddress) downstream
    fireChannelBound(final Channel channel, final SocketAddress localAddress) upstream

    由此可见由这几个函数触发了OPEN、BOUND和BIND事件。

    Netty中的事件大致可以分为upstream事件和downstream事件。简单的说,upstream事件是内获取外资源时触发的事件如messageReceived等等,而downstream事件则是内向外发送资源时触发的事件如write、connect等等。

    与之相对应的,处理upstream事件的是upstreamhandler,处理downstream事件的是downstreamhandler,也有可以处理两类事件的channelhandler。我们可以通过继承handler来实现自己的业务逻辑。

    Upstream事件的典型是messageReceived,在Netty中抽象为MessageEvent,即接收到了消息。而downstream事件的典型是write,在Netty中也抽象为MessageEvent,即发送消息。一个比较完整的事件表如下:

    upstream事件包括:

    downstream事件包括

    Netty通过pipeline来存放upstreamhandler和downstreamhandler,在pipeline中添加handler的源代码如下:

    public class PipelineFactory implements ChannelPipelineFactory
    {
        public ChannelPipeline getPipeline()
            throws Exception
        {
            ChannelPipeline pipeline = Channels.pipeline();
    
            //并不具体处理事件,只是输出相关事件的string
            pipeline.addLast("1", new UpStreamHandler1());
            //单纯的丢弃事件
            pipeline.addLast("2", new DiscardServer());
            return pipeline;
        }
    }

    在Netty中,upstreamhandler的处理顺序是从前向后,而downstreamhandler的顺序是从后往前。根本原因是pipeline中维护了一个双向链表,handler的处理顺序不同是因为upstream是从head->tail遍历,而downstream事件是从tail->head遍历。

    以DefaultChannelPipeline为例,以下分别是添加handler至链表的代码和访问upstreamhandler的代码

        public synchronized void addLast(String name, ChannelHandler handler) {
            if (name2ctx.isEmpty()) {
                init(name, handler);
            } else {
                checkDuplicateName(name);
                //一段典型的插入到链表尾部并更新尾指针的代码
                DefaultChannelHandlerContext oldTail = tail;
                DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
    
                callBeforeAdd(newTail);
    
                oldTail.next = newTail;
                tail = newTail;
                name2ctx.put(name, newTail);
    
                callAfterAdd(newTail);
            }
        }
        public void sendUpstream(ChannelEvent e) {
            //从头部开始遍历,相对的是,downstream则是从尾部开始遍历
            DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
            if (head == null) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "The pipeline contains no upstream handlers; discarding: " + e);
                }
    
                return;
            }
    
            sendUpstream(head, e);
        }

    前文已经说过,Netty中通过Channels中的静态方法来触发事件,这些静态函数列举如下:

     1.fireChannelOpen;2.fireChannelBound;3.fireChannelConnected等等。

    直接来看fireChannelOpen的源码,看看Netty到底是怎么做的。

        public static void fireChannelOpen(final Channel channel) {
            // Notify the parent handler.
            if (channel.getParent() != null) {
                fireChildChannelStateChanged(channel.getParent(), channel);
            }
            channel.getPipeline().sendUpstream(
                    new UpstreamChannelStateEvent(
                            channel, ChannelState.OPEN, Boolean.TRUE));
        }

    这个sendUpstream到底是干嘛的了?

        void sendUpstream(final DefaultChannelHandlerContext ctx, final ChannelEvent e) {
            try {
                //从链表头部开始,取出每个节点中的handler直接对channelevent进行处理
                ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
            } catch (Throwable t) {
                notifyHandlerException(e, t);
            }
        }

    然后具体到handler又是怎么处理各个事件的了?以SimpleChannelUpstreamHandler为例,如下:

        public void handleUpstream(
                final ChannelHandlerContext ctx, final ChannelEvent e) throws Exception {
            //根据事件类型进行不同的处理
            if (e instanceof MessageEvent) {
                messageReceived(ctx, (MessageEvent) e);
            } else if (e instanceof WriteCompletionEvent) {
                WriteCompletionEvent evt = (WriteCompletionEvent) e;
                writeComplete(ctx, evt);
            } else if (e instanceof ChildChannelStateEvent) {
                ......
            }
        }
       
        public void messageReceived(
                final ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
            //直接将事件传至下一个handler进行处理
            ctx.sendUpstream(e);
        }

    源码看到现在已经很明显了,在Netty里,pipeline中维护了一个handler的链表。每当事件触发时,就会从双向链表的头部(对于downstream事件则是尾部)开始遍历,这样每个handler都会对事件进行处理。在handler里,可以根据事件类型做相应的处理后传至下一个handler继续处理(甚至可以截断处理链)。

    需要注意的是,单次流程是在一个线程中实现的,是串行的。因此如果其中一个handler是阻塞的,就会影响整体的效果。

    当然netty也已经提供了解决方案,可以通过继承ExecutionHandler的handler来处理这类耗时的操作。而这么做的原理是什么,请期待下一篇文章。

  • 相关阅读:
    HUST 1372 marshmallow
    HUST 1371 Emergency relief
    CodeForces 629D Babaei and Birthday Cake
    CodeForces 629C Famil Door and Brackets
    ZOJ 3872 Beauty of Array
    ZOJ 3870 Team Formation
    HDU 5631 Rikka with Graph
    HDU 5630 Rikka with Chess
    CodeForces 626D Jerry's Protest
    【POJ 1964】 City Game
  • 原文地址:https://www.cnblogs.com/sorheart/p/3190400.html
Copyright © 2011-2022 走看看