zoukankan      html  css  js  c++  java
  • netty 的事件驱动

    netty 是事件驱动的,这里面有两个含义,一是 netty 接收到 socket 数据后,会产生事件,事件在 pipeline 上传播,二是事件由特定的线程池处理。

    NioEventLoop 轮询网络事件

    // io.netty.channel.nio.NioEventLoop#processSelectedKey
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }
    
        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
    
                // 建立连接,深层会调用 fireChannelActive
                unsafe.finishConnect();
            }
    
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
    
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                // 读数据,在流水线上传播读事件和连接关闭事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    
    // io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
    public final void read() {
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);
    
        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }
    
                allocHandle.incMessagesRead(1);
                readPending = false;
                // 触发 ChannelRead
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());
    
            allocHandle.readComplete();
            // 触发 ChannelReadComplete
            pipeline.fireChannelReadComplete();
    
            if (close) {
                // 触发 ChannelInactive 和 ChannelUnregister
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }

    HandlerContext 中有一个整数 executionMask,不同的 bit 位表示不同的事件,为 1 表示可以处理该事件。

    // io.netty.channel.AbstractChannelHandlerContext
    private final int executionMask;
    
    final class ChannelHandlerMask {
        // Using to mask which methods must be called for a ChannelHandler.
        static final int MASK_EXCEPTION_CAUGHT = 1;
        static final int MASK_CHANNEL_REGISTERED = 1 << 1;
        static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
        static final int MASK_CHANNEL_ACTIVE = 1 << 3;
        static final int MASK_CHANNEL_INACTIVE = 1 << 4;
        static final int MASK_CHANNEL_READ = 1 << 5;
        static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
        static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
        static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
        static final int MASK_BIND = 1 << 9;
        static final int MASK_CONNECT = 1 << 10;
        static final int MASK_DISCONNECT = 1 << 11;
        static final int MASK_CLOSE = 1 << 12;
        static final int MASK_DEREGISTER = 1 << 13;
        static final int MASK_READ = 1 << 14;
        static final int MASK_WRITE = 1 << 15;
        static final int MASK_FLUSH = 1 << 16;
    
        private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED |
                MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
                MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
        private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
                MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
    }

    以 ChannelActive 为例,通过比较 bit 位上的值,判断该 HandlerContext 是否处理 ChannelActive 事件

    // io.netty.channel.AbstractChannelHandlerContext#fireChannelActive
    public ChannelHandlerContext fireChannelActive() {
        invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
        return this;
    }
    
    // io.netty.channel.AbstractChannelHandlerContext#findContextInbound
    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while ((ctx.executionMask & mask) == 0);
        return ctx;
    }

    如何使用 UserEvent?

    首先让自己的 handler 实现 userEventTriggered 方法

    class MyInboundHandler extends SimpleChannelInboundHandler<Object> {
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            // 处理事件,简单打印
            System.out.println(evt);
            // 从当前 HandlerContext 向后传播 evt,如果没有这行代码,则不会向后传播事件了
            super.userEventTriggered(ctx, evt);
        }
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            super.channelRead0(ctx, msg);
        }
    }

    通过 pipeline 传播事件,从 HeadContext 向后传播事件

    channel.pipeline().fireUserEventTriggered("i am an event");

    read 事件,是从 HeadContext 开始向后传播

    write 操作,是从 TailContext 开始向前传播

  • 相关阅读:
    poj 1182:食物链(种类并查集,食物链问题)
    hdu 1556:Color the ball(第二类树状数组 —— 区间更新,点求和)
    南阳理工 题目9:posters(离散化+线段树)
    poj 1195:Mobile phones(二维树状数组,矩阵求和)
    poj 3984:迷宫问题(广搜,入门题)
    poj 3278:Catch That Cow(简单一维广搜)
    《重构与模式》简化(策略模式)-积木系列
    builder模式-积木系列
    spring事务管理-Spring 源码系列(6)
    思考如何阅读
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12372937.html
Copyright © 2011-2022 走看看