zoukankan      html  css  js  c++  java
  • netty reactor线程模型分析

    netty4线程模型

    ServerBootstrap http示例

            // Configure the server.
            EventLoopGroup bossGroup = new EpollEventLoopGroup(1);
            EventLoopGroup workerGroup = new EpollEventLoopGroup();
            
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.channel(EpollServerSocketChannel.class);
                b.option(ChannelOption.SO_BACKLOG, 1024);
                b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                b.group(bossGroup, workerGroup)
                // .handler(new LoggingHandler(LogLevel.INFO))
                 .childHandler(new HttpHelloWorldServerInitializer(sslCtx));
    
                Channel ch = b.bind(PORT).sync().channel();
    /*            System.err.println("Open your web browser and navigate to " +
                        (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');*/
                ch.closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }

    绑定过程:

     private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.executor = channel.eventLoop();
                        }
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                });
                return promise;
            }
        }

    初始化过程:

    final ChannelFuture initAndRegister() {
            final Channel channel = channelFactory().newChannel();
            try {
                init(channel);
            } catch (Throwable t) {
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regFuture;
        }

    ServerBootStrap的初始化过程:

    @Override
        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options();
            synchronized (options) {
                channel.config().setOptions(options);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }

    接收器ServerBootstrapAcceptor

    @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                for (Entry<ChannelOption<?>, Object> e: childOptions) {
                    try {
                        if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                            logger.warn("Unknown channel option: " + e);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to set a channel option: " + child, t);
                    }
                }
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }

    ThreadPerChannelEventLoopGroup实现注册

        @Override
        public ChannelFuture register(Channel channel) {
            if (channel == null) {
                throw new NullPointerException("channel");
            }
            try {
                EventLoop l = nextChild();
                return l.register(channel, new DefaultChannelPromise(channel, l));
            } catch (Throwable t) {
                return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
            }
        }

    获取子eventLoop

        private EventLoop nextChild() throws Exception {
            if (shuttingDown) {
                throw new RejectedExecutionException("shutting down");
            }
    
            EventLoop loop = idleChildren.poll();
            if (loop == null) {
                if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
                    throw tooManyChannels;
                }
                loop = newChild(childArgs);
                loop.terminationFuture().addListener(childTerminationListener);
            }
            activeChildren.add(loop);
            return loop;
        }

    产生新子eventLoop(SingleThreadEventExecutor.java)

        /**
         * Create a new instance
         *
         * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
         * @param executor          the {@link Executor} which will be used for executing
         * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
         *                          executor thread
         */
        protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
            super(parent);
    
            if (executor == null) {
                throw new NullPointerException("executor");
            }
    
            this.addTaskWakesUp = addTaskWakesUp;
            this.executor = executor;
            taskQueue = newTaskQueue();
        }

    其执行方法(SingleThreadEventExecutor.java):

    @Override
        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread();
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }

    启动处理线程(SingleThreadEventExecutor.java):

    private void startThread() {
            if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    doStartThread();
                }
            }
        }
    
        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        for (;;) {
                            int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
    
                        // Check if confirmShutdown() was called at the end of the loop.
                        if (success && gracefulShutdownStartTime == 0) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                    "before run() implementation terminates.");
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                threadLock.release();
                                if (!taskQueue.isEmpty()) {
                                    logger.warn(
                                            "An event executor terminated with " +
                                                    "non-empty task queue (" + taskQueue.size() + ')');
                                }
    
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
        }

    其中的run方法由其子类(DefaultEventLoop,EpollEventLoop,NioEventLoop,ThreadPerChannelEventLoop)各种实现,以NioEventLoop为例:

    @Override
        protected void run() {
            for (;;) {
                boolean oldWakenUp = wakenUp.getAndSet(false);
                try {
                    if (hasTasks()) {
                        selectNow();
                    } else {
                        select(oldWakenUp);
    
                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).
    
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        processSelectedKeys();
                        runAllTasks();
                    } else {
                        final long ioStartTime = System.nanoTime();
    
                        processSelectedKeys();
    
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
    
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Unexpected exception in the selector loop.", t);
    
                    // Prevent possible consecutive immediate failures that lead to
                    // excessive CPU consumption.
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore.
                    }
                }
            }
        }

    运行所有任务(SingleThreadEventExecutor.java)

     /**
         * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
         * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
         */
        protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();
            if (task == null) {
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                try {
                    task.run();
                } catch (Throwable t) {
                    logger.warn("A task raised an exception.", t);
                }
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }

    小结

      本文从一个简单的示例程序,一步步分析netty4的线程模型,从ServerBootstrapAcceptor到SingleThreadEventExecutor的源码,环环相扣,可以根据上面的分析链理解

    一个请求过来后,netty的处理流程。

  • 相关阅读:
    1.4.2.3. SETUP(Core Data 应用程序实践指南)
    1.4.2.2. PATHS(Core Data 应用程序实践指南)
    1.4.2.1. FILES(Core Data 应用程序实践指南)
    1.4.2. 实现 Core Data Helper 类(Core Data 应用程序实践指南)
    1.4.1. Core Data Helper 简介(Core Data 应用程序实践指南)
    1.4. 为现有的应用程序添加 Core Data 支持(Core Data 应用程序实践指南)
    1.3.2. App Icon 和 Launch Image(Core Data 应用程序实践指南)
    1.3.1. 新建Xcode项目并设置故事板(Core Data 应用程序实践指南)
    php验证邮箱是否合法
    如何使js函数异步执行
  • 原文地址:https://www.cnblogs.com/davidwang456/p/5118802.html
Copyright © 2011-2022 走看看