zoukankan      html  css  js  c++  java
  • netty reactor线程模型分析

    netty4线程模型

    ServerBootstrap http示例

            // Configure the server.
            EventLoopGroup bossGroup = new EpollEventLoopGroup(1);
            EventLoopGroup workerGroup = new EpollEventLoopGroup();
            
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.channel(EpollServerSocketChannel.class);
                b.option(ChannelOption.SO_BACKLOG, 1024);
                b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                b.group(bossGroup, workerGroup)
                // .handler(new LoggingHandler(LogLevel.INFO))
                 .childHandler(new HttpHelloWorldServerInitializer(sslCtx));
    
                Channel ch = b.bind(PORT).sync().channel();
    /*            System.err.println("Open your web browser and navigate to " +
                        (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');*/
                ch.closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }

    绑定过程:

     private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.executor = channel.eventLoop();
                        }
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                });
                return promise;
            }
        }

    初始化过程:

    final ChannelFuture initAndRegister() {
            final Channel channel = channelFactory().newChannel();
            try {
                init(channel);
            } catch (Throwable t) {
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regFuture;
        }

    ServerBootStrap的初始化过程:

    @Override
        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options();
            synchronized (options) {
                channel.config().setOptions(options);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }

    接收器ServerBootstrapAcceptor

    @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                for (Entry<ChannelOption<?>, Object> e: childOptions) {
                    try {
                        if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                            logger.warn("Unknown channel option: " + e);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to set a channel option: " + child, t);
                    }
                }
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }

    ThreadPerChannelEventLoopGroup实现注册

        @Override
        public ChannelFuture register(Channel channel) {
            if (channel == null) {
                throw new NullPointerException("channel");
            }
            try {
                EventLoop l = nextChild();
                return l.register(channel, new DefaultChannelPromise(channel, l));
            } catch (Throwable t) {
                return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
            }
        }

    获取子eventLoop

        private EventLoop nextChild() throws Exception {
            if (shuttingDown) {
                throw new RejectedExecutionException("shutting down");
            }
    
            EventLoop loop = idleChildren.poll();
            if (loop == null) {
                if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
                    throw tooManyChannels;
                }
                loop = newChild(childArgs);
                loop.terminationFuture().addListener(childTerminationListener);
            }
            activeChildren.add(loop);
            return loop;
        }

    产生新子eventLoop(SingleThreadEventExecutor.java)

        /**
         * Create a new instance
         *
         * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
         * @param executor          the {@link Executor} which will be used for executing
         * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
         *                          executor thread
         */
        protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
            super(parent);
    
            if (executor == null) {
                throw new NullPointerException("executor");
            }
    
            this.addTaskWakesUp = addTaskWakesUp;
            this.executor = executor;
            taskQueue = newTaskQueue();
        }

    其执行方法(SingleThreadEventExecutor.java):

    @Override
        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread();
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }

    启动处理线程(SingleThreadEventExecutor.java):

    private void startThread() {
            if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    doStartThread();
                }
            }
        }
    
        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        for (;;) {
                            int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
    
                        // Check if confirmShutdown() was called at the end of the loop.
                        if (success && gracefulShutdownStartTime == 0) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                    "before run() implementation terminates.");
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                threadLock.release();
                                if (!taskQueue.isEmpty()) {
                                    logger.warn(
                                            "An event executor terminated with " +
                                                    "non-empty task queue (" + taskQueue.size() + ')');
                                }
    
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
        }

    其中的run方法由其子类(DefaultEventLoop,EpollEventLoop,NioEventLoop,ThreadPerChannelEventLoop)各种实现,以NioEventLoop为例:

    @Override
        protected void run() {
            for (;;) {
                boolean oldWakenUp = wakenUp.getAndSet(false);
                try {
                    if (hasTasks()) {
                        selectNow();
                    } else {
                        select(oldWakenUp);
    
                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).
    
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        processSelectedKeys();
                        runAllTasks();
                    } else {
                        final long ioStartTime = System.nanoTime();
    
                        processSelectedKeys();
    
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
    
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Unexpected exception in the selector loop.", t);
    
                    // Prevent possible consecutive immediate failures that lead to
                    // excessive CPU consumption.
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore.
                    }
                }
            }
        }

    运行所有任务(SingleThreadEventExecutor.java)

     /**
         * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
         * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
         */
        protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();
            if (task == null) {
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                try {
                    task.run();
                } catch (Throwable t) {
                    logger.warn("A task raised an exception.", t);
                }
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }

    小结

      本文从一个简单的示例程序,一步步分析netty4的线程模型,从ServerBootstrapAcceptor到SingleThreadEventExecutor的源码,环环相扣,可以根据上面的分析链理解

    一个请求过来后,netty的处理流程。

  • 相关阅读:
    leetcode_09_Palindrome Number (easy)
    JQuery与JavaScript与Ajax三者的区别与联系
    Blockly常用函数
    JavaWeb中四大域对象
    Thymeleaf知识
    列表、字典、元组、集合的区别
    Spring Boot拦截器
    AWS路由表
    Android调试工具DDMS的使用详解
    Android 列表单选对话框
  • 原文地址:https://www.cnblogs.com/davidwang456/p/5118802.html
Copyright © 2011-2022 走看看