zoukankan      html  css  js  c++  java
  • netty 的 worker 线程池

    pipeline 添加 handler 的时候,如果没有指定线程池,则使用 channel 的 IO 线程池,即 NioEventLoop。

    所以,NioEventLoop 的作用是,轮询 SocketChannel 的网络读事件,同时可以处理 handler 中的代码,以及 ChannelOutboundeBuffer 的 addMessage 和 addFlush。 

    当执行 ChannelOutboundeBuffer 的 addMessage 和 addFlush 时,如果当前线程就是 channel 的 NioEventLoop,则直接执行,如果不是则封装成任务放入队列中,该线程会从取出队列中的任务执行。

    public interface ChannelPipeline
            extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
    
        ChannelPipeline addLast(ChannelHandler... handlers);
    
        ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
    }
    // io.netty.channel.SingleThreadEventLoop#hasTasks
    protected boolean hasTasks() {
        return super.hasTasks() || !tailTasks.isEmpty();
    }
    
    // io.netty.util.concurrent.SingleThreadEventExecutor#hasTasks
    protected boolean hasTasks() {
        assert inEventLoop();
        return !taskQueue.isEmpty();
    }
    
    
    // io.netty.channel.nio.NioEventLoop#run
    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    // 检查 taskQueue 中是否有任务,有任务时 strategy = 0
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;
    
                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO
    
                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }
    
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    // strategy = 0,走这个分支
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
    
                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

    取出 taskQueue 的任务,在当前线程执行

    // io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }
    
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);
    
            runTasks ++;
    
            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
    
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
    
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

    NioEventLoop 兼具执行 IO 和业务操作的能力,通常情况为了避免 IO 和业务处理互相影响,添加 handler 会指定线程池。

    往 socket 写数据的终点:

    io.netty.channel.socket.nio.NioSocketChannel#doWrite
  • 相关阅读:
    Mifare系列3-卡的能源和数据传递(转)
    Mifare系列2-非接触卡标准(转)
    Mifare系列1-简介(转)
    oot 空间不足解决方法
    C语言位操作(转)
    C语言面试题(三)
    C语言运算符和优先级
    C语言面试题(二)
    C语言面试题(一)
    Ubuntu+Win7双系统grub的修复问题
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12327034.html
Copyright © 2011-2022 走看看