zoukankan      html  css  js  c++  java
  • Netty4 initAndRegister 解析

    我们从框架的应用层面来分析,NioEventLoopGroup在netty中的使用。

    这是我们需要配置的地方。

    紧接着我们进入netty的运行中。ServerBootstrap.bind(PORT);

    这是一个bind操作。我们来看一下NioEventLoopGroup在这个操作中的使用。

    ChannelFuture regFuture = config().group().register(channel);

     config()返回在ServerBootstrap中内部属性:private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

    注意这里传入的是this即ServerBootstrap对象。

    这个类主要是用来暴露我们的配置的,直接把我们配置的ServerBootstrap传递进来。

    我们继续ChannelFuture regFuture = config().group().register(channel);

    这里的config()返回一个ServerBootstrapConfig,然后调用他的group()。

     

    这个类是ServerBootstrapConfig的父类。调用了ServerBootstrap.group().方法。注意因为group方法是在弗雷AbstractBootstrap中定义的,所以这里进入父类的group()方法中来。

    这个方法直接返回EventLoopGroup对象。

     我们在回过头来看我们最初的配置,来发现这个group属性究竟是什么。

    我们配置了两个NioEventLoopGroup,一个一个线程的传递给了父类AbstractBootstrap.一个初始化给当前的ServerBootstrap的内部属性childGroup.

     我们在回到这里ChannelFuture regFuture = config().group().register(channel);

     现在通过AbstractBootstrap.group()方法返回了一个NioEventLoopGroup对象,即我们配置的第一个单线程的NioEventLoopGroup对象。

     现在进入他的register(channel)方法。由于这个方法是在他的父类中定义的,所以我们进入他的父类MultithreadEventLoopGroup()中。

    next()方法返回一个EventLoop对象。

    下面重点分析这个chooser。(在=======以内,然后接着分析next()方法)

     ====================================================================

    这个chooser是我们在初始化NioEventLoopGroup的时候初始化的。

    回到我们初始化NioEventLoopGroup的地方:

    还是这里。

    重点注意这下面的这个SelectorProvider.provider(),在后面的构造方法NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
    SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler)中会用到。

    下面就到了初始化chooser的地方了:

    /**
         * Create a new instance.
         *
         * @param nThreads          the number of threads that will be used by this instance.
         * @param executor          the Executor to use, or {@code null} if the default should be used.
         * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
         * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
         */
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }

    这个构造方法内容比较多。别的先不管,我们来看:chooser = chooserFactory.newChooser(children);

    这个EventExecutorChooserFactory chooserFactory是我们上个构造方法中传过来的:

    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);

     是一个DefaultEventExecutorChooserFactory.INSTANCE。

    然后我们看他的newChooser(EventExecutor[] executors)方法。

      @SuppressWarnings("unchecked")
        @Override
        public EventExecutorChooser newChooser(EventExecutor[] executors) {
            if (isPowerOfTwo(executors.length)) {
                return new PowerOfTwoEventExecutorChooser(executors);
            } else {
                return new GenericEventExecutorChooser(executors);
            }
        }

     这里有个判断就是:判断是否为2的次方。

    private static boolean isPowerOfTwo(int val) {
            return (val & -val) == val;
    }

    然后根据是否为2的次方分别进行两个构造方法。分别实现了EventExecutorChooser的next()方法。

        private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            @Override
            public EventExecutor next() {
                return executors[idx.getAndIncrement() & executors.length - 1];
            }
        }
    
        private static final class GenericEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            GenericEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            @Override
            public EventExecutor next() {
                return executors[Math.abs(idx.getAndIncrement() % executors.length)];
            }
        }

    这两个实现类只是对于next的方法有不同的实现。但是都是遍历每一个executors。

    为什么要这样呢?

    原因是位操作&  比 % 操作要高效。netty为了提高效率也是拼了。

    总的来说这个DefaultEventExecutorChooserFactory非常简单,就上面这些内容。现在我们也知道了chooser是什么了,就是一个时限了遍历所有EventExecutor的next()方法的对象。功能非常简单,只有一个方法next。遍历每一个EventExecutor

    =====================================================================

    好了到此为止我们分析玩了chooser,现在进入主逻辑。

    上面我们分析道了这里:

    同时我们通过====中的内容我们也知道了EventLoopGroup是如何初始化内部属性EventExecutors的:

    children[i] = newChild(executor, args);

    但是这个newChild(executor,args)方法是在子类NioEventLoopGroup中实现的:

        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }

    ==============这里插入分析一下NioEventLoop构造方法=====================================

    上面的selectorProvider就是在上面我们划了重点注意的地方:

    上面紧接着初始化了内部属性private Selector selector;这是一个java.nio.channels.Selector类型的内部属性。

    这个属性在后面注册channel到selector的时候将作为参数传入channel的注册方法中。

    所以我们接着看下这个openSelector()方法:

        private Selector openSelector() {
            try {
                unwrappedSelector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            if (DISABLE_KEYSET_OPTIMIZATION) {
                return unwrappedSelector;
            }
    
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
            Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                //重点2-1。这里设置了SelectorImpl.
    return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return unwrappedSelector; } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField); if (cause != null) { return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return unwrappedSelector; } selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); return new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet); }

     nio中大量使用了AccessController.doPrivileged可以关于这个的使用可以参考《java 的 AccessController.doPrivileged使用》。

    ======================================================================

    也就是说上面箭头所指的2处返回的是一个NioEventLoop对象。

    也就是说next().register(channel)调用的是NioEventLoop.register(channel);

    这就是netty的eventloop线程模型的应用了。每个EventLoop有一个Selector, boss用Selector处理accept, worker用Selector处理read,write等

    这个方法是在它的父类SingleThreadEventLoop中定义的:

    首先构造一个DefaultChannelPromise(channel, this)对象。

    然后进入register(new DefaultChannelPromise(channel, this))方法。

    promise.channel()返回的就是我们构造promise时传入的channel。还记得channel是怎么构造的吗:

    这里的channelFactory.newChannel()返回的是一个我们之前配置的channel类型的channel实例:(这块细节可以参考我的另一篇博客《ChannelFactory初始化》)

    因为NioServerSocketChannel没有定义unsafe()这个方法。所以我们一直找到了它的祖祖父类:

    这个NioMessageUnsafe是AbstractNioMessageChannel的内部类。

    知道了unsafe的类型后我们继续主流程:promise.channel().unsafe().register(this, promise);

    现在进入NioMessageUnsafe.register(this,promise);同样调用的是它的父类中的register方法:

    内容比较多我们用代码展示:

    下面体现了netty中channel与eventloop的一一对应关系。体现了eventloop线程处理channel工作的逻辑。

            @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                if (eventLoop == null) {
                    throw new NullPointerException("eventLoop");
                }
           //这里体现了一个channel只能对应一个eventloop
    if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; }        //注册到channel AbstractChannel.this.eventLoop = eventLoop;        //判断当前线程是否为此eventloop线程 if (eventLoop.inEventLoop()) { register0(promise); } else {
              //如果当前线程不是此eventloop线程
    try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }

    上面就体现了eventloop的在netty中的应用。

    我们直接看这个方法好了:register0(promise);这个方法就定义在上面方法的下面:

            private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // This channel was registered before and autoRead() is set. This means we need to begin read
                            // again so that we process inbound data.
                            //
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }

    下面分析下doRegister()然后回来继续分析

    ==========我们开个分区分析下doRegister()方法=================================

    eventloop中除了定义了一个线程外,它还包含了selector等操作。

    循环执行selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

    一直到成功为止。

    import java.nio.channels.SelectableChannel;
    public abstract SelectionKey register(Selector sel, int ops, Object att);这歌是个抽象方法。

     然后这歌ServerSocketChannelImpl中并没有定义register(Selector sel, int ops, Object att)方法,所以我们就在SelectableChannel的子类中找。在AbstractSelectableChannel中找到了这个方法的实现,通过debug的确也定为到了这个方法:

    java.nio.channels.spi.AbstractSelectableChannel;
        /**
         * Registers this channel with the given selector, returning a selection key.
         *
         * <p>  This method first verifies that this channel is open and that the
         * given initial interest set is valid.
         *
         * <p> If this channel is already registered with the given selector then
         * the selection key representing that registration is returned after
         * setting its interest set to the given value.
         *
         * <p> Otherwise this channel has not yet been registered with the given
         * selector, so the {@link AbstractSelector#register register} method of
         * the selector is invoked while holding the appropriate locks.  The
         * resulting key is added to this channel's key set before being returned.
         * </p>
         *
         * @throws  ClosedSelectorException {@inheritDoc}
         *
         * @throws  IllegalBlockingModeException {@inheritDoc}
         *
         * @throws  IllegalSelectorException {@inheritDoc}
         *
         * @throws  CancelledKeyException {@inheritDoc}
         *
         * @throws  IllegalArgumentException {@inheritDoc}
         */
        public final SelectionKey register(Selector sel, int ops,
                                           Object att)
            throws ClosedChannelException
        {
            synchronized (regLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                if ((ops & ~validOps()) != 0)
                    throw new IllegalArgumentException();
                if (blocking)
                    throw new IllegalBlockingModeException();
                SelectionKey k = findKey(sel);
                if (k != null) {
                    k.interestOps(ops);
                    k.attach(att);
                }
                if (k == null) {
                    // New registration
                    synchronized (keyLock) {
                        if (!isOpen())
                            throw new ClosedChannelException();
                        k = ((AbstractSelector)sel).register(this, ops, att);
                        addKey(k);
                    }
                }
                return k;
            }
        }

    该实现是通过spi的机制实现的(关于spi可参考《java中的SPI机制》)

    SelectableChannel的register(Selector selector, ...)和Selector的select()方法都会操作Selector对象的共享资源all-keys集合.

    根据这个方法的注释我们可以很直观的看到这个方法的作用:Registers this channel with the given selector, returning a selection key

    仔细看k = ((AbstractSelector)sel).register(this, ops, att);

    这个地方喝我们平时经常写的nio代码是一样的。

     分析完这个方法后我们再来分析下eventLoop().unwrappedSelector();

    这个东西我们在上面分析过了,直接返回NioEventLoop 的private Selector unwrappedSelector;属性。

    =============================================================

     然后我们继续回到register0(ChannelPromise promise)方法中来。上面我们分析完了doRegister();

    doRegister()主要完成了channel注册到eventloop中selector的操作。

    下面分析isActive()方法,这个方法定义在NioServerSocketChannel中:

      @Override
        public boolean isActive() {
            return javaChannel().socket().isBound();
        }

    这个方法Returns the binding state of the ServerSocket.

    如果是绑定状态并且是第一次注册则进入:

    pipeline.fireChannelActive();

    ((ChannelInboundHandler) handler()).channelInactive(this);

     这里穿进来this(AbstractChannelHandlerContext)。ChannelInboundHandler.channelInactive(this)中完成业务逻辑后还可以返回到这个方法。

    正如我所说,默认情况下所有的handler都没有具体的业务都返回到上层方法来。

    到此为止我们就分析完了AbstactBootstrap.doBind(..)方法中的initAndRegister()方法,并在这个过程中观察了NioEventLoopGroup的使用。

    结下来就到了bind环节。有兴趣的可以参考我的另一篇博客《netty4 ServerBootstrap.bind(port) debug》。

  • 相关阅读:
    vmware vcenter appliance dhcp 改为 静态IP导致web service认证失败
    pptp记录用户登陆日志
    MySQL内存使用分析
    mysql慢查日志分析工具 percona-toolkit
    my.cnf详解
    ios9 升级后 企业版app plist无法安装
    redmine发送邮件
    swap文件
    算法--合法序括号序列判断
    算法--空格替换
  • 原文地址:https://www.cnblogs.com/guazi/p/6612375.html
Copyright © 2011-2022 走看看