zoukankan      html  css  js  c++  java
  • Netty4.x 源码实战系列(一): 深入理解ServerBootstrap 与 Bootstrap (1)

      从Java1.4开始, Java引入了non-blocking IO,简称NIO。NIO与传统socket最大的不同就是引入了Channel和多路复用selector的概念。传统的socket是基于stream的,它是单向的,有InputStream表示read和OutputStream表示写。而Channel是双工的,既支持读也支持写,channel的读/写都是面向Buffer。 NIO中引入的多路复用Selector机制(如果是linux系统,则应用的epoll事件通知机制)可使一个线程同时监听多个Channel上发生的事件。 虽然Java NIO相比于以往确实是一个大的突破,但是如果要真正上手进行开发,且想要开发出好的一个服务端网络程序,那么你得要花费一点功夫了,毕竟Java NIO只是提供了一大堆的API而已,对于一般的软件开发人员来说只能呵呵了。因此,社区中就涌现了很多基于Java NIO的网络应用框架,其中以Apache的Mina,以及Netty最为出名,从本篇开始我们将深入的分析一下Netty的内部实现细节

      本系列是基于Netty4.1.18这个版本。

      在分析源码之前,我们还是先看看Netty官方的样例代码,了解一下Netty一般是如何进行服务端及客户端开发的。

      Netty服务端示例:

    EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try
    { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup)  // (3) .channel(NioServerSocketChannel.class) // (4)
    .handler(new LoggingHandler()) // (5)
    .childHandler(new ChannelInitializer<SocketChannel>() { // (6) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (7) .childOption(ChannelOption.SO_KEEPALIVE, true); // (8) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (9) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }

    上面这段代码展示了服务端的一个基本步骤:

    (1)、 初始化用于Acceptor的主"线程池"以及用于I/O工作的从"线程池";
    (2)、 初始化ServerBootstrap实例, 此实例是netty服务端应用开发的入口,也是本篇介绍的重点, 下面我们会深入分析;
    (3)、 通过ServerBootstrap的group方法,设置(1)中初始化的主从"线程池";
    (4)、 指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel;
    (5)、 设置ServerSocketChannel的处理器(此处不详述,后面的系列会进行深入分析)
    (6)、 设置子通道也就是SocketChannel的处理器, 其内部是实际业务开发的"主战场"(此处不详述,后面的系列会进行深入分析)
    (7)、 配置ServerSocketChannel的选项
    (8)、 配置子通道也就是SocketChannel的选项
    (9)、 绑定并侦听某个端口

    接着,我们再看看客户端是如何开发的:

    Netty客户端示例:

    public class TimeClient {
        public static void main(String[] args) throws Exception {
            String host = args[0];
            int port = Integer.parseInt(args[1]);
            EventLoopGroup workerGroup = new NioEventLoopGroup(); // (1)
            
            try {
                Bootstrap b = new Bootstrap(); // (2)
                b.group(workerGroup); // (3)
                b.channel(NioSocketChannel.class); // (4)
                b.option(ChannelOption.SO_KEEPALIVE, true); // (5)
                b.handler(new ChannelInitializer<SocketChannel>() { // (6)
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new TimeClientHandler());
                    }
                });
                
                // Start the client.
                ChannelFuture f = b.connect(host, port).sync(); // (7)
    
                // Wait until the connection is closed.
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    }

     客户端的开发步骤和服务端都差不多:

    (1)、 初始化用于连接及I/O工作的"线程池";
    (2)、 初始化Bootstrap实例, 此实例是netty客户端应用开发的入口,也是本篇介绍的重点, 下面我们会深入分析;
    (3)、 通过Bootstrap的group方法,设置(1)中初始化的"线程池";
    (4)、 指定通道channel的类型,由于是客户端,故而是NioSocketChannel;
    (5)、 设置SocketChannel的选项(此处不详述,后面的系列会进行深入分析);
    (6)、 设置SocketChannel的处理器, 其内部是实际业务开发的"主战场"(此处不详述,后面的系列会进行深入分析);
    (7)、 连接指定的服务地址;

    通过对上面服务端及客户端代码分析,Bootstrap是Netty应用开发的入口,如果想要理解Netty内部的实现细节,那么有必要先了解一下Bootstrap内部的实现机制。

    首先我们先看一下ServerBootstrap及Bootstrap的类继承结构图:

    通过类图我们知道AbstractBootstrap类是ServerBootstrap及Bootstrap的基类,我们先看一下AbstractBootstrap类的主要代码:

    public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    
        volatile EventLoopGroup group;
        private volatile ChannelFactory<? extends C> channelFactory;
        private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
        private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
        private volatile ChannelHandler handler;
    
        
        public B group(EventLoopGroup group) {
            if (group == null) {
                throw new NullPointerException("group");
            }
            if (this.group != null) {
                throw new IllegalStateException("group set already");
            }
            this.group = group;
            return self();
        }
    
        private B self() {
            return (B) this;
        }
    
        public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }
    
        @Deprecated
        public B channelFactory(ChannelFactory<? extends C> channelFactory) {
            if (channelFactory == null) {
                throw new NullPointerException("channelFactory");
            }
            if (this.channelFactory != null) {
                throw new IllegalStateException("channelFactory set already");
            }
    
            this.channelFactory = channelFactory;
            return self();
        }
    
        public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
            return channelFactory((ChannelFactory<C>) channelFactory);
        }
    
        public <T> B option(ChannelOption<T> option, T value) {
            if (option == null) {
                throw new NullPointerException("option");
            }
            if (value == null) {
                synchronized (options) {
                    options.remove(option);
                }
            } else {
                synchronized (options) {
                    options.put(option, value);
                }
            }
            return self();
        }
    
        public <T> B attr(AttributeKey<T> key, T value) {
            if (key == null) {
                throw new NullPointerException("key");
            }
            if (value == null) {
                synchronized (attrs) {
                    attrs.remove(key);
                }
            } else {
                synchronized (attrs) {
                    attrs.put(key, value);
                }
            }
            return self();
        }
    
        public B validate() {
            if (group == null) {
                throw new IllegalStateException("group not set");
            }
            if (channelFactory == null) {
                throw new IllegalStateException("channel or channelFactory not set");
            }
            return self();
        }
    
        public ChannelFuture bind(int inetPort) {
            return bind(new InetSocketAddress(inetPort));
        }
    
        public ChannelFuture bind(SocketAddress localAddress) {
            validate();
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            return doBind(localAddress);
        }
    
        private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
    
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    
        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            return regFuture;
        }
    
        abstract void init(Channel channel) throws Exception;
    
        private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    
        public B handler(ChannelHandler handler) {
            if (handler == null) {
                throw new NullPointerException("handler");
            }
            this.handler = handler;
            return self();
        }public abstract AbstractBootstrapConfig<B, C> config();
    
    }

    现在我们以示例代码为出发点,来详细分析一下引导类内部实现细节:

    1、 首先看看服务端的b.group(bossGroup, workerGroup):

        调用ServerBootstrap的group方法,设置react模式的主线程池 以及 IO 操作线程池,ServerBootstrap中的group代码如下:

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
            super.group(parentGroup);
            if (childGroup == null) {
                throw new NullPointerException("childGroup");
            }
            if (this.childGroup != null) {
                throw new IllegalStateException("childGroup set already");
            }
            this.childGroup = childGroup;
            return this;
        }

    在group方法中,会继续调用父类的group方法,而通过类继承图我们知道,super.group(parentGroup)其实调用的就是AbstractBootstrap的group方法。AbstractBootstrap中group代码如下:

    public B group(EventLoopGroup group) {
            if (group == null) {
                throw new NullPointerException("group");
            }
            if (this.group != null) {
                throw new IllegalStateException("group set already");
            }
            this.group = group;
            return self();
        }

    通过以上分析,我们知道了AbstractBootstrap中定义了主线程池group的引用,而子线程池childGroup的引用是定义在ServerBootstrap中。

    当我们查看客户端Bootstrap的group方法时,我们发现,其是直接调用的父类AbstractBoostrap的group方法。

    2、示例代码中的 channel()方法

    无论是服务端还是客户端,channel调用的都是基类的channel方法,其实现细节如下:

    public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
            if (channelFactory == null) {
                throw new NullPointerException("channelFactory");
            }
            if (this.channelFactory != null) {
                throw new IllegalStateException("channelFactory set already");
            }
    
            this.channelFactory = channelFactory;
            return self();
        }

    我们发现,其实channel方法内部,只是初始化了一个用于生产指定channel类型的工厂实例。

    3、option / handler / attr 方法

         option: 设置通道的选项参数, 对于服务端而言就是ServerSocketChannel, 客户端而言就是SocketChannel;

      handler: 设置主通道的处理器, 对于服务端而言就是ServerSocketChannel,也就是用来处理Acceptor的操作;

          对于客户端的SocketChannel,主要是用来处理 业务操作;

        attr: 设置通道的属性;

     option / handler / attr方法都定义在AbstractBootstrap中, 所以服务端和客户端的引导类方法调用都是调用的父类的对应方法。

    4、childHandler / childOption / childAttr 方法(只有服务端ServerBootstrap才有child类型的方法)

       对于服务端而言,有两种通道需要处理, 一种是ServerSocketChannel:用于处理用户连接的accept操作, 另一种是SocketChannel,表示对应客户端连接。而对于客户端,一般都只有一种channel,也就是SocketChannel。

       因此以child开头的方法,都定义在ServerBootstrap中,表示处理或配置服务端接收到的对应客户端连接的SocketChannel通道。

      childHandler / childOption / childAttr 在ServerBootstrap中的对应代码如下:

    public ServerBootstrap childHandler(ChannelHandler childHandler) {
            if (childHandler == null) {
                throw new NullPointerException("childHandler");
            }
            this.childHandler = childHandler;
            return this;
        }
    public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
            if (childOption == null) {
                throw new NullPointerException("childOption");
            }
            if (value == null) {
                synchronized (childOptions) {
                    childOptions.remove(childOption);
                }
            } else {
                synchronized (childOptions) {
                    childOptions.put(childOption, value);
                }
            }
            return this;
        }
    public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value) {
            if (childKey == null) {
                throw new NullPointerException("childKey");
            }
            if (value == null) {
                childAttrs.remove(childKey);
            } else {
                childAttrs.put(childKey, value);
            }
            return this;
        }

    至此,引导类的属性配置都设置完毕了。

    本篇总结:

    1、服务端由两种线程池,用于Acceptor的React主线程和用于I/O操作的React从线程池; 客户端只有用于连接及IO操作的React的主线程池;

    2、ServerBootstrap中定义了服务端React的"从线程池"对应的相关配置,都是以child开头的属性。 而用于"主线程池"channel的属性都定义在AbstractBootstrap中;

    本篇只是简单介绍了一下引导类的配置属性, 下一篇我将详细介绍服务端引导类的Bind过程分析。



  • 相关阅读:
    机房收费系统——视图的运用
    POJ 3278: Catch That Cow
    LeetCode 66 Plus One(加一)(vector)
    iOS定位服务CoreLocation
    Python 多线程
    LuaStudio编辑调试软件
    高仿快递100--实战之RadioGroup和RadioButton应用
    HDU
    MVC项目中怎样用JS导出EasyUI DataGrid为Excel
    调用getChildFragmentManager时出现的Bug
  • 原文地址:https://www.cnblogs.com/itdriver/p/8149913.html
Copyright © 2011-2022 走看看