zoukankan      html  css  js  c++  java
  • Netty 4源码解析:服务端启动

    Netty 4源码解析:服务端启动

    1.基础知识

    1.1 Netty 4示例

    因为Netty 5还处于测试版,所以选择了目前比较稳定的Netty 4作为学习对象。而且5.0的变化也不像4.0这么大,好多网上的例子都已经过时了。

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.0.25.Final</version>
            </dependency>

    Netty 4服务端的典型用法如下面代码示例所示,核心组件就是EventLoopGroup、ServerBootstrap、Handler等。其中像EventLoopGroup、Channel等都是可以灵活调配的。这里以比较常用的“主从Reactor”+Nio非阻塞为例,分析代码的执行流程。如果没有接触过Netty的话,建议先简单了解一下Reactor模型等知识再学习源码,不然可能会一头雾水。

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(port)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new XXXHandler());
                        }
                    });
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync();
    
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    1.2 NIO示例

    不管用Netty还是其他网络框架,最终都绕不开JDK NIO提供的接口。那直接用NIO可以分为几步呢?

    1. Selector.open():创建当前平台的Selector。
    2. ServerSocketChannel.open():创建服务端的Channel。
    3. bind():绑定到某个端口上。
    4. register():注册Channel和关注的事件到Selector上。
    5. select():拿到已经就绪的事件。

    下面就是一段NIO的示例代码,用单线程和一个Selector监控两个Channel的事件。

        public static void main(String[] args) throws Exception {
            Selector selector = Selector.open();
    
            int[] ports = { 1234, 5678 };
            for (int port : ports) {
                ServerSocketChannel listenChannel = ServerSocketChannel.open();
                listenChannel.socket().bind(new InetSocketAddress("localhost", port));
                listenChannel.configureBlocking(false);
                listenChannel.register(selector, SelectionKey.OP_ACCEPT);
            }
    
            while (true) {
                if (selector.select(3000) == 0) {
                    System.out.print(".");
                    continue;
                }
    
                Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
                while (keyIter.hasNext()) {
                    SelectionKey key = keyIter.next();
    
                    if (key.isAcceptable()) {
                        SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(32));
                    }
    
                    if (key.isReadable()) {
                        SocketChannel clientChannel = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        long bytesRead = clientChannel.read(buffer);
                        // ...
                    }
    
                    keyIter.remove();
                }
            }
        }

    既然Netty也肯定使用NIO,那么下面分析代码流程时也着重看一下Netty是在哪、如何使用NIO的API。

    2.EventLoopGroup预准备

    在主流程开始之前,EventLoopGroup构造方法里做了一些预准备的工作。

    2.1 创建EventLoop组

    NioEventLoopGroup继承自MultithreadEventLoopGroup和更上层的MultithreadEventExecutorGroup。其中,EventLoopGroup中指定使用的EventExecutor是NioEventLoop,而MultithreadEventLoopGroup指定了线程数(CPU数*2)和使用的线程工厂是DefaultThreadFactory。

    注意:SelectorProvider.provider()始终返回第一次调用创建的SelectorProvider,所以这里调用provider()与后面NioServerSocketChannel中再次调用并不冲突。

    // NioEventLoopGroup
        public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
            this(nThreads, threadFactory, SelectorProvider.provider());
        }
    
        @Override
        protected EventExecutor newChild(
                ThreadFactory threadFactory, Object... args) throws Exception {
            return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
        }
    
    // MultithreadEventLoopGroup
        private static final int DEFAULT_EVENT_LOOP_THREADS;
    
        static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
        }
    
        protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
        }
    
        @Override
        protected ThreadFactory newDefaultThreadFactory() {
            return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
        }

    利用这两个子类提供的信息,父类MultithreadEventExecutorGroup创建出NioEventLoop组和EventExecutorChooser。

    // MultithreadEventExecutorGroup
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            if (threadFactory == null) {
                threadFactory = newDefaultThreadFactory();
            }
    
            children = new SingleThreadEventExecutor[nThreads];
            if (isPowerOfTwo(children.length)) {
                chooser = new PowerOfTwoEventExecutorChooser();
            } else {
                chooser = new GenericEventExecutorChooser();
            }
    
            for (int i = 0; i < nThreads; i ++) {
                try {
                    children[i] = newChild(threadFactory, args);
                } catch (Exception e) {
                    throw new IllegalStateException("failed to create a child event loop", e);
                }
            }
        }

    2.2 EventLoop线程启动

    NioEventLoop也是在构造方法中做了很多工作。它的父类SingleThreadEventExecutor会调用刚才NioEventLoopGroup中的线程工厂创建一个线程,并调用NioEventLoop覆写的run()方法。而run()方法中就是最为关键的事件循环代码,它对NioEventLoop构造方法创建的Selector不断的select()出就绪的事件。

    // SingleThreadEventExecutor
        protected SingleThreadEventExecutor(
                EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
            thread = threadFactory.newThread(new Runnable() {
                @Override
                public void run() {
                    try {
                        SingleThreadEventExecutor.this.run();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    }
                }
            });
    
            taskQueue = newTaskQueue();
        }
    
    // NioEventLoop
        NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
            super(parent, threadFactory, false);
            provider = selectorProvider;
            selector = openSelector();
        }
    
        private Selector openSelector() {
            final Selector selector;
            try {
                selector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
            return selector;
        }
    
        @Override
        protected void run() {
            for (;;) {
                try {
                    if (hasTasks()) {
                        selectNow();
                    } else {
                        select(oldWakenUp);
                    }
                } catch (Throwable t) {
                    logger.warn("Unexpected exception in the selector loop.", t);
                }
            }
        }

    3.ServerBootstrap主流程

    粗看前面的Netty 4代码示例,好像看不出哪里是框架的起点。实际上,当我们调用bind()方法时,这就是整个Netty框架的起点。具体来说,可以分为三步:

    1. 创建Channel:创建NioServerSocketChannel以及底层NIO的Channel。
    2. 初始化Channel:初始化Channel和ChannelPipeline。
    3. 注册事件:绑定一个EventLoop到Channel上,并将Channel和关注的SelectionKey注册到Selector上。
    4. 绑定端口:绑定到某个监听端口上。
    // AbstractBootstrap
        private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
    
            if (regFuture.isDone()) {
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            }
        }
    
        final ChannelFuture initAndRegister() {
            final Channel channel = channelFactory().newChannel();
            try {
                init(channel);
            } catch (Throwable t) {
                channel.unsafe().closeForcibly();
            }
    
            ChannelFuture regFuture = group().register(channel);
            return regFuture;
        }

    3.1 创建Channel

    ServerBoostrap根据我们传入channel()方法的NioServerSocketChannel.class,通过反射创建出Channel对象。注意:NioServerSocketChannel是Netty的包装类。真正的NIO Channel是在其构造方法中通过SelectorProvider创建的。

    这里Netty没有用之前我们的NIO示例代码中的ServerSocketChannel.open()方法创建Channel,而是使用SelectorProvider。注释里写道是为了避免多个Channel同时创建时open()方法中的竞争条件。

    // AbstractBootstrap
        public B channel(Class<? extends C> channelClass) {
            return channelFactory(new BootstrapChannelFactory<C>(channelClass));
        }
    
        private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
            @Override
            public T newChannel() {
                try {
                    return clazz.newInstance();
                } catch (Throwable t) {
                    throw new ChannelException("Unable to create Channel from class " + clazz, t);
                }
            }
        }
    
    // NioServerSocketChannel
        private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    
        public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
    
        private static ServerSocketChannel newSocket(SelectorProvider provider) {
            try {
                /**
                 *  Use the SelectorProvider to open SocketChannel and so remove *  condition in SelectorProvider#provider() which is called by
                 *  each ServerSocketChannel.open() otherwise.
                 */
                return provider.openServerSocketChannel();
            } catch (IOException e) {
                throw new ChannelException(
                        "Failed to open a server socket.", e);
            }
        }

    传入父类AbstractNioChannel的构造方法后,父类负责设置成了非阻塞模式。

    // AbstractNioChannel
        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }

    3.2 初始化Channel

    创建完Channel后就可以为其做一些配置了。ServerBootstrap的init()方法会配置Channel的参数和属性,并创建ServerBootstrapAcceptor,它真正地持有workerGroup(childGroup)和我们定制的Handler。

    // ServerBootstrap
        @Override
        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options();
            synchronized (options) {
                channel.config().setOptions(options);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
            if (handler() != null) {
                p.addLast(handler());
            }
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }

    默认情况下,ChannelPipeline里只有head和tail两个默认的Handler,tail是InBoundHandler,head是OutBoundHandler。真正完成主从Reactor交互的自然就是这里加入到Pipeline的ServerBootstrapAcceptor。

    // AbstractChannel
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            unsafe = newUnsafe();
            pipeline = new DefaultChannelPipeline(this);
        }
    
    // DefaultChannelPipeline
        public DefaultChannelPipeline(AbstractChannel channel) {
            this.channel = channel;
    
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }

    3.3 注册事件

    完成了Channel和ChannelPipeline的初始化后,就要为Channel注册我们感兴趣的I/O事件了。尽管NIO的API很简单,但Netty中的注册流程还是比较复杂的:

    1. 以bossGroup的NioEventLoopGroup.register(Channel)方法为源头
    2. 经过由Chooser选取出的NioEventLoop的register(Channel)
    3. 最终才委托给Channel的unsafe().register(EventLoop)

    首先,NioEventLoopGroup.register()方法会使用next(),借助EventExecutorChooser从EventExecutor数组中选出一个NioEventLoop,并调用其register()方法。

    // MultithreadEventExecutorGroup
        @Override
        public ChannelFuture register(Channel channel) {
            return next().register(channel);
        }
    
    // MultithreadEventExecutorGroup
        @Override
        public EventExecutor next() {
            return chooser.next();
        }
    
        private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
            @Override
            public EventExecutor next() {
                return children[childIndex.getAndIncrement() & children.length - 1];
            }
        }

    NioEventLoop继承自SingleThreadEventLoop,它的register()方法会调用NioServerSocketChannel的unsafe工具进行注册。

    // SingleThreadEventLoop
        @Override
        public ChannelFuture register(Channel channel) {
            return register(channel, new DefaultChannelPromise(channel, this));
        }
    
        @Override
        public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
            channel.unsafe().register(this, promise);
            return promise;
        }

    AbstractChannel中Unsafe匿名类会将传入的NioEventLoop绑定到当前Channel,最终触发doRegister()子方法完成注册工作。同时在注册完成后,Netty会向ChannelPipeline中发送channelRegistered和channelActive通知,这就是我们获得到的Channel通知的源头。

    // AbstractChannel.AbstractUnsafe
        protected abstract class AbstractUnsafe implements Unsafe {
            /** true if the channel has never been registered, false otherwise */
            private boolean neverRegistered = true;
    
            @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                AbstractChannel.this.eventLoop = eventLoop;
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                }
            }
    
            private void register0(ChannelPromise promise) {
                try {
                    boolean firstRegistration = neverRegistered;
                    doRegister();
                    neverRegistered = false;
    
                    pipeline.fireChannelRegistered();
    
                    if (firstRegistration && isActive()) {
                        pipeline.fireChannelActive();
                    }
                } catch (Throwable t) {
                    closeForcibly();
                    closeFuture.setClosed();
                }
            }
        }

    最终终于到了真正实现注册的地方:AbstractNioChannel.doRegister()会将底层JDK的ServerSocketChannel注册到当前绑定的eventLoop持有的Selector上。

    // AbstractNioChannel
        @Override
        protected void doRegister() throws Exception {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                // ...
            }
        }

    3.4 端口绑定

    绑定流程与注册类似,最终都是调用Channel的unsafe()工具类来完成。但区别是注册是从EventLoopGroup开始最终直接调用到Channel,而绑定是从Channel开始,经过了Pipeline中tail和head的处理才调用到Channel的。

    // AbstractBootstrap
        private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    }
                }
            });
        }
    
    // AbstractChannelHandlerContext(TailContext)
        @Override
        public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
            final AbstractChannelHandlerContext next = findContextOutbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeBind(localAddress, promise);
            }
            return promise;
        }
    
    // DefaultChannelPipeline.HeadContext
        static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler {
            @Override
            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                    throws Exception {
                unsafe.bind(localAddress, promise);
            }
        }
    
    // AbstractChannel.AbstractUnsafe
        protected abstract class AbstractUnsafe implements Unsafe {
            @Override
            public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
                boolean wasActive = isActive();
                try {
                    doBind(localAddress);
                } catch (Throwable t) {
                    closeIfClosed();
                    return;
                }
            }
        }
    
    // NioServerSocketChannel
        @Override
        protected void doBind(SocketAddress localAddress) throws Exception {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }

    4.总结梳理

    至此,Netty服务就算是启动完毕,它已经开始监听端口上的请求了。现在就总结一下整个代码流程比较关键的地方。其实这一大片代码看下来,会发现ServerBootstrap和EventLoopGroup都是在互相配合,真正的核心是它们创建出NioEventLoop组和NioServerSocketChannel。每个NioEventLoop对应一个线程和一个Selector,NioServerSocketChannel会主动注册到某一个NioEventLoop的Selector上,NioEventLoop负责事件轮询。

  • 相关阅读:
    openh264 动态调整码率
    ffmpeg的avcodec_encode_video2延迟
    深入浅出c++协程
    asio的异步与线程模型解析
    libco分析
    《深入理解kafka》阅读笔记
    记一次shm_open返回EINVAL的错误排查
    css 圆形脉冲动画
    animate.css VUE 使用
    python 装饰器
  • 原文地址:https://www.cnblogs.com/xiaomaohai/p/6157603.html
Copyright © 2011-2022 走看看