zoukankan      html  css  js  c++  java
  • Netty源码(三)-EventLoop源码和心跳源码

    1. NIOEventLoop 源码

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);  这里会创建一个group, 同时group 内部包含1个EventLoop 事件循环器

    1. 类图继承关系

    1. AbstractScheduledEventExecutor 表示该接口接收定时任务,可以处理定时任务。

    2. EventLoop 接口的作用是一旦channel 注册了,就处理该channel 对应的所有IO操作

    3. SingleThreadEventExecutor 表示这是一个单线程的线程池

    4. EventLoop 是一个单例的线程池, 里面包含一个死循环的线程不断的做三件事:监听端口, 处理端口事件, 处理队列事件。 每个EventLoop 都可以绑定多个Channel, 而每个Channel 始终 只能由一个EventLoop 来处理。

    2. 源码

    在EventLoop 的使用,一般就是eventloop.execute(task); execute 方法是父类方法, 其第一次调用是:

    io.netty.bootstrap.AbstractBootstrap#bind(int) 》 io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress) 》 io.netty.bootstrap.AbstractBootstrap#doBind 》 io.netty.bootstrap.AbstractBootstrap#initAndRegister 》 

    io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel) 》 io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel) 》 io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise) 》 io.netty.channel.AbstractChannel.AbstractUnsafe#register 》 io.netty.util.concurrent.SingleThreadEventExecutor#execute

    对应的线程调用栈如下: 可以理解为服务器端启动或接收请求的过程中注册Channel 的时候会第一次调用该方法开始线程

    io.netty.util.concurrent.SingleThreadEventExecutor#execute 源码如下:

        @Override
        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread();
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }

    1. inEventLoop()判断该EventLoop的线程是否是当前线程,如果是,直接添加到任务队列中; 如果不是,则尝试启动线程,但是由于线程是单个的,因此只能启动一次; 随后再将任务添加到队列中去。

    (1) io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop 判断线程

        @Override
        public boolean inEventLoop() {
            return inEventLoop(Thread.currentThread());
        }
    
        @Override
        public boolean inEventLoop(Thread thread) {
            return thread == this.thread;
        }

    开始的时候线程为null, 所以返回的inEventLoop 为false, 从字面也可以理解是线程不在事件循环中。

    (2) io.netty.util.concurrent.SingleThreadEventExecutor#startThread开启线程的方式如下:

        private void startThread() {
            if (state == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    doStartThread();
                }
            }
        }
    
        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
    
                        // Check if confirmShutdown() was called at the end of the loop.
                        if (success && gracefulShutdownStartTime == 0) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                    "before run() implementation terminates.");
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                threadLock.release();
                                if (!taskQueue.isEmpty()) {
                                    logger.warn(
                                            "An event executor terminated with " +
                                                    "non-empty task queue (" + taskQueue.size() + ')');
                                }
    
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
        }

    1》executor 的类型是:io.netty.util.concurrent.ThreadPerTaskExecutor

    package io.netty.util.concurrent;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadFactory;
    
    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }

    2》 启动之后会调用到SingleThreadEventExecutor.this.run(); 也就是调用io.netty.channel.nio.NioEventLoop#run 方法, 至此开启是EventLoop 也就是事件循环。

    3》io.netty.util.concurrent.SingleThreadEventExecutor#addTask 添加任务如下: 其实就是加到任务队列中

        protected void addTask(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
            if (!offerTask(task)) {
                reject(task);
            }
        }
    
        final boolean offerTask(Runnable task) {
            if (isShutdown()) {
                reject();
            }
            return taskQueue.offer(task);
        }

    2. 如果线程已经停止,并且删除任务失败,则执行拒绝策略,默认是抛出异常。

    3. 如果addTaskWakesUp 并且任务不是NonWakeupRunnable 类型的,就尝试唤醒Selector。 这个时候,这是在selector 的线程就会立即返回。

    io.netty.channel.nio.NioEventLoop#wakeup如下:

        protected void wakeup(boolean inEventLoop) {
            if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
                selector.wakeup();
            }
        }

    也就是当第一次调用execute 的时候会启动线程,然后调用NioEventLoop.run 方法 开始事件循环。然后再次调用execute 方法的时候会加到任务队列中,等待NioEventLoop.run 方法处理任务队列中的任务。

    4. io.netty.channel.nio.NioEventLoop#run 方法如下:

        @Override
        protected void run() {
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
    
                            // 'wakenUp.compareAndSet(false, true)' is always evaluated
                            // before calling 'selector.wakeup()' to reduce the wake-up
                            // overhead. (Selector.wakeup() is an expensive operation.)
                            //
                            // However, there is a race condition in this approach.
                            // The race condition is triggered when 'wakenUp' is set to
                            // true too early.
                            //
                            // 'wakenUp' is set to true too early if:
                            // 1) Selector is waken up between 'wakenUp.set(false)' and
                            //    'selector.select(...)'. (BAD)
                            // 2) Selector is waken up between 'selector.select(...)' and
                            //    'if (wakenUp.get()) { ... }'. (OK)
                            //
                            // In the first case, 'wakenUp' is set to true and the
                            // following 'selector.select(...)' will wake up immediately.
                            // Until 'wakenUp' is set to false again in the next round,
                            // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                            // any attempt to wake up the Selector will fail, too, causing
                            // the following 'selector.select(...)' call to block
                            // unnecessarily.
                            //
                            // To fix this problem, we wake up the selector again if wakenUp
                            // is true immediately after selector.select(...).
                            // It is inefficient in that it wakes up the selector for both
                            // the first case (BAD - wake-up required) and the second case
                            // (OK - no wake-up required).
    
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                            // fallthrough
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }

    做的三件事情:

    (1) select 感兴趣的事件

    (2) processSelectedKeys 处理事件

    (3) runAllTasks();  执行队列中的任务。

    总结:

     每次执行execute 方法都是向队列中添加任务。第一次添加的时候会启动线程,执行run 方法,而run 方法是整个eventLoop的核心,进行事件循环。主要做三件事:

    (1) select 感兴趣的事件

    (2) processSelectedKeys 处理事件

    (3) runAllTasks();  执行队列中的任务。

    2. 心跳检测源码

    Netty 除了提供了诸多的编码解码器,还提供了一个重要的服务-心跳机制heartbeat。通过心跳检查对方是否有效,这在RPC框架中是不可少的功能。

    源码解析:

    Netty 提供了三个handler 来检测连接的有效性。

    • IdleStateHandler:当连接的空闲时间(读或者写)太长时,将会触发一个 IdleStateEvent 事件。 后续的handler 可以重写 userEventTriggered 方法处理不同的事件。
    • ReadTimeoutHandler: 指定的时间没有读事件抛出ReadTimeoutException 异常,后续的handler 可以重写exceptionCaught 处理异常
    • WriteTimeoutHandler: 指定的时间没有写事件抛出 WriteTimeoutException 异常,后续的handler 可以重写exceptionCaught 处理异常

    ReadTimeoutHandler 和 WriteTimeoutHandler 都活抛出异常,而且会关闭通道。 一般建议用 IdleStateHandler。

    1. 使用方法

    1. handler

    package cn.xm.netty.example.heartbeat;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleStateEvent;
    
    public class MyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         *
         * @param ctx 上下文
         * @param evt 事件
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    
            if(evt instanceof IdleStateEvent) {
    
                //将  evt 向下转型 IdleStateEvent
                IdleStateEvent event = (IdleStateEvent) evt;
                String eventType = null;
                switch (event.state()) {
                    case READER_IDLE:
                      eventType = "读空闲";
                      break;
                    case WRITER_IDLE:
                        eventType = "写空闲";
                        break;
                    case ALL_IDLE:
                        eventType = "读写空闲";
                        break;
                }
                System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType);
                System.out.println("服务器做相应处理..");
    
                //如果发生空闲,我们关闭通道
               // ctx.channel().close();
            }
        }
    }

    2. 服务启动类

    package cn.xm.netty.example.heartbeat;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.util.concurrent.TimeUnit;
    
    public class MyServer {
        public static void main(String[] args) throws Exception{
            //创建两个线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(bossGroup, workerGroup);
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //加入一个netty 提供 IdleStateHandler
                        /*
                        说明
                        1. IdleStateHandler 是netty 提供的处理空闲状态的处理器
                        2. long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
                        3. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
                        4. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
    
                        5. 文档说明
                        triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
     * read, write, or both operation for a while.
     *                  6. 当 IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理
     *                  通过调用(触发)下一个handler 的 userEventTiggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
                         */
                        pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS));
                        //加入一个对空闲检测进一步处理的handler(自定义)
                        pipeline.addLast(new MyServerHandler());
                    }
                });
    
                //启动服务器
                ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
                channelFuture.channel().closeFuture().sync();
    
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }

    3. 测试结果:

    /0:0:0:0:0:0:0:1:60237--超时时间--读写空闲
    服务器做相应处理..
    /0:0:0:0:0:0:0:1:61437--超时时间--读写空闲
    服务器做相应处理..

    2. 源码解读

    源码如下:

    package io.netty.handler.timeout;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.Channel.Unsafe;
    import io.netty.channel.ChannelDuplexHandler;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOutboundBuffer;
    import io.netty.channel.ChannelPromise;
    import io.netty.util.concurrent.EventExecutor;
    
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.TimeUnit;
    
    public class IdleStateHandler extends ChannelDuplexHandler {
        private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
    
        // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
        private final ChannelFutureListener writeListener = new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                lastWriteTime = ticksInNanos();
                firstWriterIdleEvent = firstAllIdleEvent = true;
            }
        };
    
        private final boolean observeOutput;
        private final long readerIdleTimeNanos;
        private final long writerIdleTimeNanos;
        private final long allIdleTimeNanos;
    
        private ScheduledFuture<?> readerIdleTimeout;
        private long lastReadTime;
        private boolean firstReaderIdleEvent = true;
    
        private ScheduledFuture<?> writerIdleTimeout;
        private long lastWriteTime;
        private boolean firstWriterIdleEvent = true;
    
        private ScheduledFuture<?> allIdleTimeout;
        private boolean firstAllIdleEvent = true;
    
        private byte state; // 0 - none, 1 - initialized, 2 - destroyed
        private boolean reading;
    
        private long lastChangeCheckTimeStamp;
        private int lastMessageHashCode;
        private long lastPendingWriteBytes;
    
        public IdleStateHandler(
                int readerIdleTimeSeconds,
                int writerIdleTimeSeconds,
                int allIdleTimeSeconds) {
    
            this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
                 TimeUnit.SECONDS);
        }
    
        /**
         * @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
         */
        public IdleStateHandler(
                long readerIdleTime, long writerIdleTime, long allIdleTime,
                TimeUnit unit) {
            this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
        }
    
        public IdleStateHandler(boolean observeOutput,
                long readerIdleTime, long writerIdleTime, long allIdleTime,
                TimeUnit unit) {
            if (unit == null) {
                throw new NullPointerException("unit");
            }
    
            this.observeOutput = observeOutput;
    
            if (readerIdleTime <= 0) {
                readerIdleTimeNanos = 0;
            } else {
                readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
            }
            if (writerIdleTime <= 0) {
                writerIdleTimeNanos = 0;
            } else {
                writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
            }
            if (allIdleTime <= 0) {
                allIdleTimeNanos = 0;
            } else {
                allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
            }
        }
    
        /**
         * Return the readerIdleTime that was given when instance this class in milliseconds.
         *
         */
        public long getReaderIdleTimeInMillis() {
            return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
        }
    
        /**
         * Return the writerIdleTime that was given when instance this class in milliseconds.
         *
         */
        public long getWriterIdleTimeInMillis() {
            return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
        }
    
        /**
         * Return the allIdleTime that was given when instance this class in milliseconds.
         *
         */
        public long getAllIdleTimeInMillis() {
            return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
                // channelActive() event has been fired already, which means this.channelActive() will
                // not be invoked. We have to initialize here instead.
                initialize(ctx);
            } else {
                // channelActive() event has not been fired yet.  this.channelActive() will be invoked
                // and initialization will occur there.
            }
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            destroy();
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            // Initialize early if channel is active already.
            if (ctx.channel().isActive()) {
                initialize(ctx);
            }
            super.channelRegistered(ctx);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // This method will be invoked only if this handler was added
            // before channelActive() event is fired.  If a user adds this handler
            // after the channelActive() event, initialize() will be called by beforeAdd().
            initialize(ctx);
            super.channelActive(ctx);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            destroy();
            super.channelInactive(ctx);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
                reading = true;
                firstReaderIdleEvent = firstAllIdleEvent = true;
            }
            ctx.fireChannelRead(msg);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
                lastReadTime = ticksInNanos();
                reading = false;
            }
            ctx.fireChannelReadComplete();
        }
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            // Allow writing with void promise if handler is only configured for read timeout events.
            if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
                ChannelPromise unvoid = promise.unvoid();
                unvoid.addListener(writeListener);
                ctx.write(msg, unvoid);
            } else {
                ctx.write(msg, promise);
            }
        }
    
        private void initialize(ChannelHandlerContext ctx) {
            // Avoid the case where destroy() is called before scheduling timeouts.
            // See: https://github.com/netty/netty/issues/143
            switch (state) {
            case 1:
            case 2:
                return;
            }
    
            state = 1;
            initOutputChanged(ctx);
    
            lastReadTime = lastWriteTime = ticksInNanos();
            if (readerIdleTimeNanos > 0) {
                readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                        readerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
            if (writerIdleTimeNanos > 0) {
                writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                        writerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
            if (allIdleTimeNanos > 0) {
                allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                        allIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
        }
    
        /**
         * This method is visible for testing!
         */
        long ticksInNanos() {
            return System.nanoTime();
        }
    
        /**
         * This method is visible for testing!
         */
        ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
            return ctx.executor().schedule(task, delay, unit);
        }
    
        private void destroy() {
            state = 2;
    
            if (readerIdleTimeout != null) {
                readerIdleTimeout.cancel(false);
                readerIdleTimeout = null;
            }
            if (writerIdleTimeout != null) {
                writerIdleTimeout.cancel(false);
                writerIdleTimeout = null;
            }
            if (allIdleTimeout != null) {
                allIdleTimeout.cancel(false);
                allIdleTimeout = null;
            }
        }
    
        /**
         * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
         * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
         */
        protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
            ctx.fireUserEventTriggered(evt);
        }
    
        /**
         * Returns a {@link IdleStateEvent}.
         */
        protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
            switch (state) {
                case ALL_IDLE:
                    return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
                case READER_IDLE:
                    return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
                case WRITER_IDLE:
                    return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
                default:
                    throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
            }
        }
    
        /**
         * @see #hasOutputChanged(ChannelHandlerContext, boolean)
         */
        private void initOutputChanged(ChannelHandlerContext ctx) {
            if (observeOutput) {
                Channel channel = ctx.channel();
                Unsafe unsafe = channel.unsafe();
                ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    
                if (buf != null) {
                    lastMessageHashCode = System.identityHashCode(buf.current());
                    lastPendingWriteBytes = buf.totalPendingWriteBytes();
                }
            }
        }
    
        /**
         * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
         * with {@link #observeOutput} enabled and there has been an observed change in the
         * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
         *
         * https://github.com/netty/netty/issues/6150
         */
        private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
            if (observeOutput) {
    
                // We can take this shortcut if the ChannelPromises that got passed into write()
                // appear to complete. It indicates "change" on message level and we simply assume
                // that there's change happening on byte level. If the user doesn't observe channel
                // writability events then they'll eventually OOME and there's clearly a different
                // problem and idleness is least of their concerns.
                if (lastChangeCheckTimeStamp != lastWriteTime) {
                    lastChangeCheckTimeStamp = lastWriteTime;
    
                    // But this applies only if it's the non-first call.
                    if (!first) {
                        return true;
                    }
                }
    
                Channel channel = ctx.channel();
                Unsafe unsafe = channel.unsafe();
                ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    
                if (buf != null) {
                    int messageHashCode = System.identityHashCode(buf.current());
                    long pendingWriteBytes = buf.totalPendingWriteBytes();
    
                    if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
                        lastMessageHashCode = messageHashCode;
                        lastPendingWriteBytes = pendingWriteBytes;
    
                        if (!first) {
                            return true;
                        }
                    }
                }
            }
    
            return false;
        }
    
        private abstract static class AbstractIdleTask implements Runnable {
    
            private final ChannelHandlerContext ctx;
    
            AbstractIdleTask(ChannelHandlerContext ctx) {
                this.ctx = ctx;
            }
    
            @Override
            public void run() {
                if (!ctx.channel().isOpen()) {
                    return;
                }
    
                run(ctx);
            }
    
            protected abstract void run(ChannelHandlerContext ctx);
        }
    
        private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
    
            ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
                super(ctx);
            }
    
            @Override
            protected void run(ChannelHandlerContext ctx) {
                long nextDelay = readerIdleTimeNanos;
                if (!reading) {
                    nextDelay -= ticksInNanos() - lastReadTime;
                }
    
                if (nextDelay <= 0) {
                    // Reader is idle - set a new timeout and notify the callback.
                    readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    
                    boolean first = firstReaderIdleEvent;
                    firstReaderIdleEvent = false;
    
                    try {
                        IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    // Read occurred before the timeout - set a new timeout with shorter delay.
                    readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
        }
    
        private final class WriterIdleTimeoutTask extends AbstractIdleTask {
    
            WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
                super(ctx);
            }
    
            @Override
            protected void run(ChannelHandlerContext ctx) {
    
                long lastWriteTime = IdleStateHandler.this.lastWriteTime;
                long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
                if (nextDelay <= 0) {
                    // Writer is idle - set a new timeout and notify the callback.
                    writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    
                    boolean first = firstWriterIdleEvent;
                    firstWriterIdleEvent = false;
    
                    try {
                        if (hasOutputChanged(ctx, first)) {
                            return;
                        }
    
                        IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    // Write occurred before the timeout - set a new timeout with shorter delay.
                    writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
        }
    
        private final class AllIdleTimeoutTask extends AbstractIdleTask {
    
            AllIdleTimeoutTask(ChannelHandlerContext ctx) {
                super(ctx);
            }
    
            @Override
            protected void run(ChannelHandlerContext ctx) {
    
                long nextDelay = allIdleTimeNanos;
                if (!reading) {
                    nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
                }
                if (nextDelay <= 0) {
                    // Both reader and writer are idle - set a new timeout and
                    // notify the callback.
                    allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
    
                    boolean first = firstAllIdleEvent;
                    firstAllIdleEvent = false;
    
                    try {
                        if (hasOutputChanged(ctx, first)) {
                            return;
                        }
    
                        IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    // Either read or write occurred before the timeout - set a new
                    // timeout with shorter delay.
                    allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
        }
    }

    1. 四个重要的属性

        private final boolean observeOutput;    // 是否考虑出站慢的情况
        private final long readerIdleTimeNanos;    // 读空闲时间,0则不监测读空闲
        private final long writerIdleTimeNanos;    // 写空闲时间,0则不监测写空闲
        private final long allIdleTimeNanos;    // 读或写空闲时间,0则关闭事件

    2. handlerAdded 方法:

    放handler 被添加到pipeline 时就调用 initialize 方法,该方法的作用是:

    判断三个参数(读空闲、写空闲、读写空闲),如果设置的时间大于0, 就创建定时任务。同时调用initOutputChanged 方法,初始化"监控出站数据属性"。

    ReaderIdleTimeoutTask、 WriterIdleTimeoutTask、 AllIdleTimeoutTask 都继承自AbstractIdleTask, AbstractIdleTask 是一个抽象类,采用模板方法模式进行了顶层的封装。

    3. 读事件解读

    io.netty.handler.timeout.IdleStateHandler.ReaderIdleTimeoutTask#run 方法是处理读超时的任务

    (1) 得到readerIdleTimeNanos, 也就是创建时候设置的读空闲时长。

    (2)  !reading 代表读事件结束,在channelRead 读取时设为true, channelReadComplete 读取完设置false。 如果读取事件结束, 重新计算 nextDelay, 计算规则如下:
    当前纳秒 - 上次读取完的纳秒(时间在channelReadComplete 会进行设置), 也就是从上次读完的空闲时间。nextDelay = 设置的空闲时间 - 上次读完的空闲时间
    (3) nextDelay 小于0, 证明空闲时间大于设置的时间。
    1》触发空闲时间,delay 时间为readerIdleTimeNanos, 也就是过readerIdleTimeNanos 时间之后再次执行任务; 然后
    2》创建IdleStateEvent 事件,调用io.netty.handler.timeout.IdleStateHandler#channelIdle 方法, 方法如下:

        protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
            ctx.fireUserEventTriggered(evt);
        }

    io.netty.channel.AbstractChannelHandlerContext#fireUserEventTriggered

        public ChannelHandlerContext fireUserEventTriggered(final Object event) {
            invokeUserEventTriggered(findContextInbound(), event);
            return this;
        }

    也就是触发后续入站处理器的userEventTriggered 事件。

    (4) nextDelay 大于0。证明没达到空闲时间,则再次加到定时任务,只是延迟时间设置为 nextDelay。 也就是剩余的空闲时间。

    4. io.netty.handler.timeout.IdleStateHandler.WriterIdleTimeoutTask#run 写事件解读

      写事件触发机制同上面一样,只是计算用的是设置的写空闲时间。而且在实际空闲时间大于设置的空闲时间的时候,决定是否触发写空闲事件会先调用hasOutputChanged判断 是否有出站较慢的数据。

    5. io.netty.handler.timeout.IdleStateHandler.AllIdleTimeoutTask#run 所有事件解读

      事件触发机制同上,只是计算实际空闲时间是当前时间减去 Math.max(lastReadTime, lastWriteTime)。  也就是是当前时间减去上次最近的读或者写事件。 然后调用hasOutputChanged判断 是否有出站较慢的数据, 之后触发所有空闲时间。

    总结:

    1. 读取事件的心跳机制如下

    handler 添加的时候根据设置的空闲时间创建一个定时任务,定时任务延迟的时间就是设置的空闲时间。任务执行的逻辑如下:

      先用当前时间减去上次读取完的时候(该时间会在channelReadComplete 消息读取完毕后设置),然后与设置的空闲时间做对比。 如果实际空闲时间大于设置空闲时间,重新触发定时任务,然后触发读超时事件,并交给后面的handler 进行处理。 如果实际空闲 小于设置的空闲时间, 则再次触发定时任务,只是定时任务延迟执行的时间是 (设置的空闲时间 - 实际空闲时间 = 剩余空闲时间)。

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    window下eclipse4.5+hadoop2.6.1开发环境配置
    sqoop1.4.6从mysql导入hdfshivehbase实例
    sqoop1.9.7安装和使用
    sqoop1.4.6导出oracle实例
    sqoop1.4.6配置安装
    java 操作hbase1.2
    hbase-1.2.5完全分布式部署
    hadoop2.6环境中部署hive1.2.2的错误
    hive 创建表和导入数据实例
    hive1.2.2部署
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/15095704.html
Copyright © 2011-2022 走看看