zoukankan      html  css  js  c++  java
  • Netty源码跟踪 一:服务端的启动

    初始化对象

        public void run() throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap(); // (2)
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class) // (3)
                 .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                    	 
                    	 // 添加ChannelHandler到ChannelPipeline
                         ch.pipeline().addLast(new DiscardServerHandler());
                     }
                 })
                 .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                 .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
                // 绑定端口,开始接收进来的连接
                ChannelFuture f = b.bind(port).sync(); // (7)
    
                System.out.println("DiscardServer已启动,端口:" + port);
                
                // 等待服务器  socket 关闭 。
                // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    

    由跟踪源码可知:
    Netty的关键元素有:启动类 BootStrap,对应的服务端启动类为ServerBootStrap,客户端为BootStrap;
    在声明启动类以前,我们要创建线程组,对于服务端:要创建两个线程组EventLoopGroup,分别为:bossGroup和workerGroup;boosGroup线程池在服务端主要为acceptor提供执行线程,acceptor处理客户端的连接请求,在客户端则为连接服务端提供执行线程。通常bossGroup的数量为1即可。workerGroup在服务端用于处理已建立连接的客户端。

    分配线程池后,我们需要建立Channel,来建立通信。在服务端,生成监听客户端连接的acceptorChannel,即ServerSocketChannel,对应socket编程中的ServerSocket,通常使用的为NioServerSocketChannel,在b.channel()方法中创建了一个构造函数字段为NioServerSocketChannel类型的ChannelFactory;客户端则生产发起与服务端连接的channel,即NioSocketChannel。

    建立通信信道后,创建处理channel事件的事件处理器,ChannelHandler,用来处理channel的读写请求,其中inbound类型是处理读,OutBound处理写,Duplex处理读写,并在handler内部自定义处理逻辑。在服务端则是处理客户端的连接请求,同时在扩展类ServerBootStrap中还定义了一个childChannelhandler,这个是与已建立连接的客户端对应的channel的事件处理器,可以通过实现ChannelInitializer的initChannel方法来添加更多的handler来处理;在客户端则是处理客户端channel的读写请求。

    创建好ChannelHandler之后,需要为channel设置一些基本选项值,如buffer的分配、tcp的连接时长、是否延迟;监听地址和连接地址等。

    绑定阶段doBind

    配置好启动类需要的对象后,调用bind方法,启动监听服务,并绑定监听端口。客户端:UCP连接时才会调用bind方法绑定本地接口;如果是TCP连接,则使用扩展类BootStrap提供的connect方法连接服务端;服务端:创建ServerSocketChannel,为ServerSocketChannel绑定监听端口,监听客户端的连接请求。

    public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
        private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
    
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    
    }
    

    initAndRegister

    首先调用了initAndRegister初始化和注册方法,完成channel 的创建,此处调用ChannelFactory的newChannel方法,返回一个NioServerSocketChannel,通过init方法完成初始化,参考init(channel)信道初始化,通过group().register(channel),从eventLoopGroup获取eventLoop线程,由该eventLoop处理整个生命周期的IO请求。

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regFuture;
        }
    
    

    init(channel)信道初始化

    ServerBootStrap为AbstractBootStrap的实现类,服务端启动类,ServerSocketChannel接收客户端连接,SocketChannel处理已成功建立连接的客户端后续的请求。这两个功能类似于socket编程中的ServerSocket和Socket,即接收连接的channel为parent channel,而由他接收和建立连接生成的channel为child channel。为了拓展性,即提供区分对待这两种channel的拓展性,在ServerBootStrap中,定义了childHandler, childGroup, childOptions, childAttrs用于处理child channel。

    调用pipeline的addLast方法为ServerSocketChannel添加handlers,其中最后一个handler是ServerBootstrapAcceptor,acceptor handler创建,构造childGroup、childHandler作为参数,创建ServerBootstrapAcceptor,ServerBootstrapAcceptor用来accept客户端的连接请求,然后创建对应的socketChannel,该socketChannel用于处理客户端连接后序的读写请求。

        void init(Channel channel) {
            setChannelOptions(channel, newOptionsArray(), logger);
            setAttributes(channel, newAttributesArray());
    
            ChannelPipeline p = channel.pipeline();
            // 针对ServerBootStrap创建的已建立连接的channel的扩展对象
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    ServerBootstrapAcceptor服务端接收请求Handler

    ServerBootstrapAcceptor 为ChannelnBoundHandlerAdapter的实现类,在服务端的监听channel,即ServerSocketChannel,的pipeline中,为第二个或第一个channelInBoundHandler,在有新的客户端connect请求到来时,调用channelRead方法处理,创建和初始化类型SocketChannel的child channel,然后从childEventLoopGroup中获取一个eventLoop线程,该线程为IO线程,由该线程负责处理该child channel后续的IO请求。源码如下:
    childGroup和监听channel可为同一个线程池,也可以是不同的两个线程池,由用户代码指定,即如果调用group(group)为同一个,调用group(parentGroup, childGroup)分别指定两个不同的线程池。childGroup为该child channel分配一个线程,该child channel的整个生命周期的IO事件均在这个线程中处理,而不会切换到其他线程,所以没有线程安全问题,不需要使用到线程同步。

    public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
        private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
            private final EventLoopGroup childGroup;
            private final ChannelHandler childHandler;
            private final Entry<ChannelOption<?>, Object>[] childOptions;
            private final Entry<AttributeKey<?>, Object>[] childAttrs;
            private final Runnable enableAutoReadTask;
    
            ServerBootstrapAcceptor(
                    final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                    Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
                this.childGroup = childGroup;
                this.childHandler = childHandler;
                this.childOptions = childOptions;
                this.childAttrs = childAttrs;
    
                // Task which is scheduled to re-enable auto-read.
                // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
                // not be able to load the class because of the file limit it already reached.
                //
                // See https://github.com/netty/netty/issues/1328
                enableAutoReadTask = new Runnable() {
                    @Override
                    public void run() {
                        channel.config().setAutoRead(true);
                    }
                };
            }
    
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                // 处理读请求,将接收到的msg强转为Channel,即SocketChannel,然后获取channel的pipeline,并绑定childhandler,并将当前channel添加到childGroup绑定某个eventLoop,
                // 添加结果监听器,在处理失败的情况下,调用forceClose方法,强制关闭连接
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
                // 设置options,attrs等
                setChannelOptions(child, childOptions, logger);
                setAttributes(child, childAttrs);
    
                try {
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
    
            private static void forceClose(Channel child, Throwable t) {
                child.unsafe().closeForcibly();
                logger.warn("Failed to register an accepted channel: {}", child, t);
            }
        }
    }
    

    register(channel)eventLoop注册绑定channel

    channel绑定NioEventLoop线程,其实就是绑定到NioEventLoop线程的selector。通过调用register注册方法绑定。EventLoopGroup从自身所管理的eventLoop线程池中获取一个eventLoop线程,然后将channel绑定到这个eventLoop线程的。

    public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
        @Override
        public ChannelFuture register(Channel channel) {
            return next().register(channel);
        }
    
        @Override
        public ChannelFuture register(ChannelPromise promise) {
            return next().register(promise);
        }
    }
    

    具体实现在SingleThreadEventLoop中,从promise中获取到channel,获取到对应的unsafe对象,然后调用unsafe的register方法

    public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
        public ChannelFuture register(Channel channel) {
            return register(new DefaultChannelPromise(channel, this));
        }
    
        @Override
        public ChannelFuture register(final ChannelPromise promise) {
            ObjectUtil.checkNotNull(promise, "promise");
            promise.channel().unsafe().register(this, promise);
            return promise;
        }
    
    }
    

    unsafe().register&Channel的内部类注册功能

    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        protected abstract class AbstractUnsafe implements Unsafe {
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                ObjectUtil.checkNotNull(eventLoop, "eventLoop");
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
                // 对channel的eventLoop进行赋值
                AbstractChannel.this.eventLoop = eventLoop;
    
                // 然后看当前执行线程是否就是eventLoop线程,是则直接执行register0
                // 不是则使用eventLoop.execute -> register0
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
        }
    
            private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                    // 将该channel注册到eventLoop的selector中,
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // This channel was registered before and autoRead() is set. This means we need to begin read
                            // again so that we process inbound data.
                            //
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    
        protected void doRegister() throws Exception {
            // NOOP
        }
    }
    

    具体调用register0方法,register0方法内部主要分三步骤运行:
    1、doRegister:将该channel注册到eventLoop的selector中,具体为在doRegister方法完成channel到selector的注册,doRegister为抽象方法,由具体实现类,即selector的类型,实现。这个方法在注册失败时,抛异常,则后面步骤不执行。
    2、pipeline.fireChannelRegistered():产生注册registered事件放到pipeline,使得pipeline中的channelHandlers按需处理该事件,即在channelRegistered方法定义处理逻辑;
    3、isActive:对应SocketChannel来说,是ch.isOpen() && ch.isConnected(),即channel已经connected成功,可以处理IO事件了:新的channel则在pipeline中传播active事件,重新注册registered的,则beginRead,继续读取,具体为在selector中注册监听OP_READ事件。

    doRegister
    public abstract class AbstractNioChannel extends AbstractChannel {
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    // 调用 channel的register方法,返回一个注册成功之后的selectionKey,此处传入的op = 0,代表注册
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    
    }
    
  • 相关阅读:
    「AtCoder AGC023F」01 on Tree
    「Wallace 笔记」平面最近点对 解法汇总
    「Codeforces 1181E」A Story of One Country (Easy & Hard)
    「NOI2018」「LOJ #2720」「Luogu P4770」 你的名字
    IdentityServer4设置RefreshTokenExpiration=Sliding不生效的原因
    【知识点】IQueryable.SumAsync方法的NULL异常
    Beyond Compare 4 密钥被吊销
    【知识点】Uri对象的完整地址
    git文件夹大小写问题
    .Net Core学习资料
  • 原文地址:https://www.cnblogs.com/nangonghui/p/15766090.html
Copyright © 2011-2022 走看看