zoukankan      html  css  js  c++  java
  • netty4.0.x源码分析—bootstrap

    Bootstrap的意思就是引导,辅助的意思,在编写服务端或客户端程序时,我们都需要先new一个bootstrap,然后基于这个bootstrap调用函数,添加eventloop和handler,可见对bootstrap进行分析还是有必要的。

    1、bootstrap结构图

    bootstrap的结构比较简单,涉及的类和接口很少,如下图所示,其中Bootstrap则是客户端程序用的引导类,ServerBootstrap是服务端程序用的引导类。

    2、serverbootstrap分析

    这部分,专门对serverbootstrap进行分析,bootstrap过程大同小异就不作详细的分析了。下面是我们编写服务端代码的一般化过程,整个分析过程将基于下面这段代码中用到的函数进行。

    // Configure the bootstrap.
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .childHandler(new HexDumpProxyInitializer(remoteHost, remotePort))
                 .childOption(ChannelOption.AUTO_READ, false)
                 .bind(localPort).sync().channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }

    先看关键代码(注意这里面的部分函数是在AbstractBootstrap中定义的)

        private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
        private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
        private volatile EventLoopGroup childGroup;
        private volatile ChannelHandler childHandler;
    
        /**
         * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
         * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link SocketChannel} and
         * {@link Channel}'s.
         */
        public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
            super.group(parentGroup);
            if (childGroup == null) {
                throw new NullPointerException("childGroup");
            }
            if (this.childGroup != null) {
                throw new IllegalStateException("childGroup set already");
            }
            this.childGroup = childGroup;
            return this;
        }

    属性值ChildGroup,ChildHandler,是用来处理accpt的Channel的。group函数其实就是将parentGroup和ChildGroup进行赋值,其中parentGroup用于处理accept事件,ChildGroup用于处理accpt的Channel的IO事件。

        //channel函数的实现定义在抽象父类中,其实就是通过newInstance函数生成一个具体的channel对象。
    <pre name="code" class="java">    /**
         * The {@link Class} which is used to create {@link Channel} instances from.
         * You either use this or {@link #channelFactory(ChannelFactory)} if your
         * {@link Channel} implementation has no no-args constructor.
         */
        public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new BootstrapChannelFactory<C>(channelClass));
        }
    
        /**
         * {@link ChannelFactory} which is used to create {@link Channel} instances from
         * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
         * is not working for you because of some more complex needs. If your {@link Channel} implementation
         * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
         * simplify your code.
         */
        @SuppressWarnings("unchecked")
        public B channelFactory(ChannelFactory<? extends C> channelFactory) {
            if (channelFactory == null) {
                throw new NullPointerException("channelFactory");
            }
            if (this.channelFactory != null) {
                throw new IllegalStateException("channelFactory set already");
            }
    
            this.channelFactory = channelFactory;
            return (B) this;
        }<pre name="code" class="java">    private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
            private final Class<? extends T> clazz;
    
            BootstrapChannelFactory(Class<? extends T> clazz) {
                this.clazz = clazz;
            }
    
            @Override
            public T newChannel() {
                try {
                    return clazz.newInstance();
                } catch (Throwable t) {
                    throw new ChannelException("Unable to create Channel from class " + clazz, t);
                }
            }
    
            @Override
            public String toString() {
                return clazz.getSimpleName() + ".class";
            }
        }
    
    
    
    

    Channel函数比较简单,其实就是通过newInstance函数,生成一个具体的Channel对象,例如服务端的NioServerSocketChannel。

        /**
         * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
         */
        public ServerBootstrap childHandler(ChannelHandler childHandler) {
            if (childHandler == null) {
                throw new NullPointerException("childHandler");
            }
            this.childHandler = childHandler;
            return this;
        }

    上面的函数即给serverbootstrap的childHandler赋值。

        /**
         * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created
         * (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set
         * {@link ChannelOption}.
         */
        public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
            if (childOption == null) {
                throw new NullPointerException("childOption");
            }
            if (value == null) {
                synchronized (childOptions) {
                    childOptions.remove(childOption);
                }
            } else {
                synchronized (childOptions) {
                    childOptions.put(childOption, value);
                }
            }
            return this;
        }

    上面的函数是指定accpt的channel的属性,channel有很多属性,比如SO_TIMEOUT时间,Buf长度等等。

        /**
         * Create a new {@link Channel} and bind it.
         */
        public ChannelFuture bind() {
            validate();
            SocketAddress localAddress = this.localAddress;
            if (localAddress == null) {
                throw new IllegalStateException("localAddress not set");
            }
            return doBind(localAddress);
        }
    
         /**
         * Create a new {@link Channel} and bind it.
         */
        public ChannelFuture bind(int inetPort) {
            return bind(new InetSocketAddress(inetPort));
        }
    
        /**
         * Create a new {@link Channel} and bind it.
         */
        public ChannelFuture bind(String inetHost, int inetPort) {
            return bind(new InetSocketAddress(inetHost, inetPort));
        }
    
    <pre name="code" class="java">    /**
         * Create a new {@link Channel} and bind it.
         */
        public ChannelFuture bind(SocketAddress localAddress) {
            validate();
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            return doBind(localAddress);
        }
    
        private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regPromise = initAndRegister();
            final Channel channel = regPromise.channel();
            final ChannelPromise promise = channel.newPromise();
            if (regPromise.isDone()) {
                doBind0(regPromise, channel, localAddress, promise);
            } else {
                regPromise.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        doBind0(future, channel, localAddress, promise);
                    }
                });
            }
    
            return promise;
        }<pre name="code" class="java">    private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
    
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    
    
    
    

    Bind函数层层调用过来之后,最后就调用Channel的bind函数了,下面再看channel的bind函数是如何处理的。定义在AbstractChannel中:

        @Override
        public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
            return pipeline.bind(localAddress, promise);
        }

    channel的bind函数,最终就是调用pipeline的bind,而pipeline的bind实际上就是调用contexthandler的bind,之个之前分析write和flush的时候说过了。所以这里直接看contexthandler的bind函数。下面是定义:

        @Override
        public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            validatePromise(promise, false);
    
            final DefaultChannelHandlerContext next = findContextOutbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeBind(localAddress, promise);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeBind(localAddress, promise);
                    }
                });
            }
    
            return promise;
        }<pre name="code" class="java">    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
            try {
                ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        }
    
    

    最终调用Handler的bind函数,还记得之前说的outbound类型的事件吗,这类事件提供了默认的实现方法,HeadHandler的bind函数,下面是它的定义:

            @Override
            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                    throws Exception {
                unsafe.bind(localAddress, promise);
            }

    我们又看到了unsafe这个苦力了,最终的操作还是得由它来完成啊,赶紧去看看这个bind函数吧,

            @Override
            public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
                if (!ensureOpen(promise)) {
                    return;
                }
    
                // See: https://github.com/netty/netty/issues/576
                if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
                    Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                    localAddress instanceof InetSocketAddress &&
                    !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
                    // Warn a user about the fact that a non-root user can't receive a
                    // broadcast packet on *nix if the socket is bound on non-wildcard address.
                    logger.warn(
                            "A non-root user can't receive a broadcast packet if the socket " +
                            "is not bound to a wildcard address; binding to a non-wildcard " +
                            "address (" + localAddress + ") anyway as requested.");
                }
    
                boolean wasActive = isActive();
                try {
                    doBind(localAddress);
                } catch (Throwable t) {
                    closeIfClosed();
                    promise.setFailure(t);
                    return;
                }
                if (!wasActive && isActive()) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelActive();
                        }
                    });
                }
                promise.setSuccess();
            }

    上面的代码最终调用了Channel的doBind函数,这里我们的Channel是NioServerSocketChannel,所以最终就是调用它的bind函数了,代码如下

        @Override
        protected void doBind(SocketAddress localAddress) throws Exception {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }

    其实它最终也是调用了JDK的Channel的socket bind函数。

    看到这里,你是否会觉得有点怪异,为什么没有注册accpt事件啊,一般的我们的server socket都是要注册accpt事件到selector,用于监听连接。如果你发现了这个问题,说明你是理解socket的编程的,^_^。实际上是前面在分析bind的时候我们漏掉了一个重要的函数,initAndRegister,下面再来看看它的定义:

        final ChannelFuture initAndRegister() {
            final Channel channel = channelFactory().newChannel();
            try {
                init(channel);
            } catch (Throwable t) {
                channel.unsafe().closeForcibly();
                return channel.newFailedFuture(t);
            }
    
            ChannelPromise regPromise = channel.newPromise();
            group().register(channel, regPromise);
            if (regPromise.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regPromise;
        }

    在这里,我们看到了我们之前介绍event时说的register函数,它就是用于将Channel注册到eventloop中去的。eventloop经过层层调用,最终调用了SingleThreadEventLoop类中的register函数,下面是它的定义:

        @Override
        public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
            if (channel == null) {
                throw new NullPointerException("channel");
            }
            if (promise == null) {
                throw new NullPointerException("promise");
            }
    
            channel.unsafe().register(this, promise);
            return promise;
        }

    还是逃离不了unsafe对象的调用,前面说了那么多的unsafe,这个函数猜都可以猜测出执行过程了,这里就不细细的列举代码了。

    还有一个init函数,这里需要说明一下,代码如下:

        @Override
        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options();
            synchronized (options) {
                channel.config().setOptions(options);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
            if (handler() != null) {
                p.addLast(handler());
            }
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }

    它就是用来处理channel 的pipeline,并添加一个ServerBootstrapAcceptor的handler,继续看看这个handler的定义,我们就会明白它的意图。

        private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    
            private final EventLoopGroup childGroup;
            private final ChannelHandler childHandler;
            private final Entry<ChannelOption<?>, Object>[] childOptions;
            private final Entry<AttributeKey<?>, Object>[] childAttrs;
    
            @SuppressWarnings("unchecked")
            ServerBootstrapAcceptor(
                    EventLoopGroup childGroup, ChannelHandler childHandler,
                    Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
                this.childGroup = childGroup;
                this.childHandler = childHandler;
                this.childOptions = childOptions;
                this.childAttrs = childAttrs;
            }
    
            @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                for (Entry<ChannelOption<?>, Object> e: childOptions) {
                    try {
                        if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                            logger.warn("Unknown channel option: " + e);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to set a channel option: " + child, t);
                    }
                }
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
                    childGroup.register(child);
                } catch (Throwable t) {
                    child.unsafe().closeForcibly();
                    logger.warn("Failed to register an accepted channel: " + child, t);
                }
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                final ChannelConfig config = ctx.channel().config();
                if (config.isAutoRead()) {
                    // stop accept new connections for 1 second to allow the channel to recover
                    // See https://github.com/netty/netty/issues/1328
                    config.setAutoRead(false);
                    ctx.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                           config.setAutoRead(true);
                        }
                    }, 1, TimeUnit.SECONDS);
                }
                // still let the exceptionCaught event flow through the pipeline to give the user
                // a chance to do something with it
                ctx.fireExceptionCaught(cause);
            }
        }

    上面就是这个handler的全部代码,它重写了ChannelRead函数,它的目的其实是想将server端accept的channel注册到ChildGroup的eventloop中,这样就可以理解,服务端代码workerGroup这个eventloop的作用了,它终于在这里体现出了它的作用了。

    3、总结

    这篇文章主要是分析了serverbootstrap的全过程,通过对这个的分析,我们清晰的看到了平时编写socket服务端代码时对bind,register事件,以及accept channel等的处理。

    http://blog.csdn.net/pingnanlee/article/details/11973769

  • 相关阅读:
    如何使用CCS创建一个DSP新工程
    ECAN模块学习
    C语言如何延时显示中文字符串
    C语言开发模式
    doxygen的使用教程
    WebConfig配置文件有哪些不为人知的秘密?
    ASP.NET Web.config的某些行为习惯约束
    ASP.NET 关于MD5如何加密的流程
    位,字节,字的小XX
    MR.ROBOT’s Feeling
  • 原文地址:https://www.cnblogs.com/xd502djj/p/6095287.html
Copyright © 2011-2022 走看看