zoukankan      html  css  js  c++  java
  • Netty源码学习(二)NioEventLoopGroup

    0. NioEventLoopGroup简介

    NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。

    1. NioEventLoopGroup类图

    2. 构造方法

    new NioEventLoopGroup()方法会调用到MultithreadEventLoopGroup的构造方法:

       private static final int DEFAULT_EVENT_LOOP_THREADS;
    
        static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));//默认值为系统core数的两倍
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }
    
        /**
         * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
         */
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);//如果采用无参的构造函数,传入的nThreads变量为0,此时线程数会被设置为系统core数*2
        }

    然后会调用MultithreadEventExecutorGroup的构造方法:

        /**
         * Create a new instance.
         *
         * @param nThreads          the number of threads that will be used by this instance.
         * @param executor          the Executor to use, or {@code null} if the default should be used.
         * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
         * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
         */
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];//设定线程池大小
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);//新建nThreads个子线程
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {//如果新建子线程的过程中出错,则关闭所有子线程
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserFactory.newChooser(children);//设定新任务的分配策略
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {//注册一些回调函数用于清理工作
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }

    其中比较重要的地方有两处:调用子类实现的newChild方法设置子线程,以及设置新任务的分配策略

    先看一下NioEventLoopGroup中实现的newChild方法:

        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }

    很简单的新建一个NioEventLoop对象并返回,我们下一节会介绍NioEventLoop

    而chooserFactory.newChooser最终会跳转到DefaultEventExecutorChooserFactory里:

        @SuppressWarnings("unchecked")
        @Override
        public EventExecutorChooser newChooser(EventExecutor[] executors) {
            if (isPowerOfTwo(executors.length)) {
                return new PowerOfTwoEventExecutorChooser(executors);
            } else {
                return new GenericEventExecutorChooser(executors);
            }
        }
    
        private static boolean isPowerOfTwo(int val) {
            return (val & -val) == val;
        }

    虽然分配策略都是round-robin,但是在子线程的数量为2的幂时,可以用位运算来加速,效率很高。

    Netty为了追求效率确实不择手段。

  • 相关阅读:
    【11_83】Remove Duplicates from Sorted List
    【10_169】Majority Element
    【09_242】Valid Anagram
    【08_238】Product of Array Except Self
    【07_226】Invert Binary Tree
    【6_100】Same Tree
    【5_283】Move Zeroes
    【4_237】Delete Node in a Linked List
    mysql性能优化-慢查询分析、优化索引和配置
    生成商品条形码代码事例
  • 原文地址:https://www.cnblogs.com/stevenczp/p/7581940.html
Copyright © 2011-2022 走看看