zoukankan      html  css  js  c++  java
  • netty 的事件驱动

    netty 是事件驱动的,这里面有两个含义,一是 netty 接收到 socket 数据后,会产生事件,事件在 pipeline 上传播,二是事件由特定的线程池处理。

    NioEventLoop 轮询网络事件

    // io.netty.channel.nio.NioEventLoop#processSelectedKey
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }
    
        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
    
                // 建立连接,深层会调用 fireChannelActive
                unsafe.finishConnect();
            }
    
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
    
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                // 读数据,在流水线上传播读事件和连接关闭事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    
    // io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
    public final void read() {
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);
    
        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }
    
                allocHandle.incMessagesRead(1);
                readPending = false;
                // 触发 ChannelRead
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());
    
            allocHandle.readComplete();
            // 触发 ChannelReadComplete
            pipeline.fireChannelReadComplete();
    
            if (close) {
                // 触发 ChannelInactive 和 ChannelUnregister
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }

    HandlerContext 中有一个整数 executionMask,不同的 bit 位表示不同的事件,为 1 表示可以处理该事件。

    // io.netty.channel.AbstractChannelHandlerContext
    private final int executionMask;
    
    final class ChannelHandlerMask {
        // Using to mask which methods must be called for a ChannelHandler.
        static final int MASK_EXCEPTION_CAUGHT = 1;
        static final int MASK_CHANNEL_REGISTERED = 1 << 1;
        static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
        static final int MASK_CHANNEL_ACTIVE = 1 << 3;
        static final int MASK_CHANNEL_INACTIVE = 1 << 4;
        static final int MASK_CHANNEL_READ = 1 << 5;
        static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
        static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
        static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
        static final int MASK_BIND = 1 << 9;
        static final int MASK_CONNECT = 1 << 10;
        static final int MASK_DISCONNECT = 1 << 11;
        static final int MASK_CLOSE = 1 << 12;
        static final int MASK_DEREGISTER = 1 << 13;
        static final int MASK_READ = 1 << 14;
        static final int MASK_WRITE = 1 << 15;
        static final int MASK_FLUSH = 1 << 16;
    
        private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED |
                MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
                MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
        private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
                MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
    }

    以 ChannelActive 为例,通过比较 bit 位上的值,判断该 HandlerContext 是否处理 ChannelActive 事件

    // io.netty.channel.AbstractChannelHandlerContext#fireChannelActive
    public ChannelHandlerContext fireChannelActive() {
        invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
        return this;
    }
    
    // io.netty.channel.AbstractChannelHandlerContext#findContextInbound
    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while ((ctx.executionMask & mask) == 0);
        return ctx;
    }

    如何使用 UserEvent?

    首先让自己的 handler 实现 userEventTriggered 方法

    class MyInboundHandler extends SimpleChannelInboundHandler<Object> {
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            // 处理事件,简单打印
            System.out.println(evt);
            // 从当前 HandlerContext 向后传播 evt,如果没有这行代码,则不会向后传播事件了
            super.userEventTriggered(ctx, evt);
        }
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            super.channelRead0(ctx, msg);
        }
    }

    通过 pipeline 传播事件,从 HeadContext 向后传播事件

    channel.pipeline().fireUserEventTriggered("i am an event");

    read 事件,是从 HeadContext 开始向后传播

    write 操作,是从 TailContext 开始向前传播

  • 相关阅读:
    CF 461B Appleman and Tree
    POJ 1821 Fence
    NOIP 2012 开车旅行
    CF 494B Obsessive String
    BZOJ2337 XOR和路径
    CF 24D Broken robot
    POJ 1952 BUY LOW, BUY LOWER
    SPOJ NAPTIME Naptime
    POJ 3585
    CF 453B Little Pony and Harmony Chest
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12372937.html
Copyright © 2011-2022 走看看