zoukankan      html  css  js  c++  java
  • netty(三)---NioEventLoop分析

    问题 :

    • NioEventLoop 作用到底是什么?是在哪里用到的?
    • NioEventLoop 和我们开头创建的 ServerBootstrap 和 EventLoopGroup 是什么关系 ?
    • NioEventLoop 和 NioChannel 怎么传递的(按合理,一个channel应该分配一个NioEventLoop)
    • NioEventLoop 工作原理

    概述

    要知道 NioEventLoop 就必须从 EventLoopGroup 说起,以下是它们的类结构图。
    

    NioEventLoop.PNG 我们在分析 netty 服务端处理的过程中,有一个 createChannel() 的过程,然后调用 group().next() 方法返回一个 NioEventLoop ,剩下的东西就交给这个 NioEventLoop 来处理了,实际中 EventLoopGroup 包含这一个 NioEventLoop 数组,它们是执行处理的实施者。

    源码分析

    EventLoopGroup 的父类 MultithreadEventLoopGroup分析

    public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
    
        private static final int DEFAULT_EVENT_LOOP_THREADS;
    
        //默认EventLoopThreads处理线程数
        static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }
    
        /**
         * @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)}
         */
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }
    
        /**
         * @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)}
         */
        protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
        }
    
        @Override
        protected ThreadFactory newDefaultThreadFactory() {
            return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
        }
    
        @Override
        public EventLoop next() {
            return (EventLoop) super.next();
        }
    
        @Override
        protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
    }
    
    
    
     MultithreadEventExecutorGroup 类
    
    public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    
    	//接受到任务,交给children 执行 
        private final EventExecutor[] children;
        private final Set<EventExecutor> readonlyChildren;
        private final AtomicInteger childIndex = new AtomicInteger();
        private final AtomicInteger terminatedChildren = new AtomicInteger();
        private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    
        /**
         * Create a new instance.
         *
         * @param nThreads          the number of threads that will be used by this instance.
         * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
         * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
         */
        protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
        }
    
        /**
         * Create a new instance.
         *
         * @param nThreads          the number of threads that will be used by this instance.
         * @param executor          the Executor to use, or {@code null} if the default should be used.
         * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
         */
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    
        protected ThreadFactory newDefaultThreadFactory() {
            return new DefaultThreadFactory(getClass());
        }
    
        @Override
        public EventExecutor next() {
            return children[Math.abs(childIndex.getAndIncrement() % children.length)];
        }
    
        ...	
    
    

    可以看到构造方法,就是创建多个 child (真正处理的执行者)。其中 newChild 交给子类实现,我们看一下NioEventLoopGroup newChild 的实现。

        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
        }
    

    至此,我们摸清了最终“干活”的人就是 NioEventLoop 。

    参考资料

  • 相关阅读:
    tomcat配置数据源
    Spring 配置详解
    典型的软件开发模型
    600字让你读懂Git
    JVM的自愈能力
    Maven的pom.xml文件详解
    如何使用Log4j
    掌握jQuery插件开发,这篇文章就够了
    CSS Gradient详解
    CSS Transition
  • 原文地址:https://www.cnblogs.com/Benjious/p/11613298.html
Copyright © 2011-2022 走看看