zoukankan      html  css  js  c++  java
  • Netty章节十八:Netty Server Start 源码分析

    Netty Server Start 源码分析

    针对程序

     public static void main(String[] args) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
         	//or  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
         
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                        .childHandler(new MyServerInitializer());
                
                ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
                channelFuture.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    

    NioEventLoopGroup()

    它是基于NIO 选择器(Selector)的Channel对象

    无参构造

    new NioEventLoopGroup(),创建一个新的实例并且使用默认的线程数,使用默认的ThreadFactory(线程工程),和SelectorProvider并且这是由SelectorProvider是由SelectorProvider.provider()静态方法方法提供的

    public NioEventLoopGroup() {
        this(0);
    }
    //一直向里调用直到父类的MultithreadEventLoopGroup()构造函数,这个是决定线程数是多少的核心方法
    
    //MultithreadEventLoopGroup类的静态代码块
    static {
       /*
         SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)
         如果第一个系统属性(需要自己设置)的值不存在则返回第二个参数的值(可用的处理器/系统核心数 * 2)
       */
       DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
              "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
       if (logger.isDebugEnabled()) {
           logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
       }
    }
    
    //调到这个方法时,参数都是
    //(0,null,SelectorProvider.provider(),DefaultSelectStrategyFactory.INSTANCE,RejectedExecutionHandlers.reject())
    //后面三个参数都是对应的返回值,这里写的是这个参数如何来的,以便理解
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    
    
    //最后做初始化的构造方法
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
     	....   
    }
    

    此时的nTreads为24是由 Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));获取的,因为我们没有配置系统属性io.netty.eventLoopThreads所以返回的是 (可用的处理器/系统核心数 * 2) 为24。最后会构造出24个线程为这个EventLoopGroup(事件循环组)工作

    带参构造

    NioEventLoopGroup(int) 如果创建的时候传入一个int值,那么它将使用这个int值个线程
    不设置构造参数的话使用默认个线程(如果没有设置"io.netty.eventLoopThreads"系统属性那么他就会使用系统核心数*2的核心数)

    为什么很多人设置为1,因为它是异步的只需要一个线程来不断的监听事件循环,当事件发生的时候获取到事件循环本身,然后将事件相应的处理工作丢给workerGroup

    ServerBootstrap对象的方法

    group()方法

    group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法,设置这个parentGroup(bossGroup(接收远端发来的连接,将处理工作交给childGroup/child))与 child(workerGroup(与客户端打交道))。
    这些EventLoopGroup 是用于处理针对于ServerChannel与Channel的所有的events(事件)以及IO(输入输出)
    param:
    parentGroup就是bossGroup childGroup是workerGroup

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
            //如果AbstractBootstrap中的group属性没有被设置,则将parentGroup赋给group
            super.group(parentGroup);
            if (this.childGroup != null) {
                throw new IllegalStateException("childGroup set already");
            }
        	//如果当前这个类的childGroup为null的话,将传进来的childGroup设置为当前这个类的childGroup
            this.childGroup设置值 = ObjectUtil.checkNotNull(childGroup, "childGroup");
            return this;
        }
    

    channel()方法

    /*
       通过channelClass的class对象来创建Channel的实例。如果你的Channel实现了无无参数的构造函数
       则可以使用this或者使用channelFactory()
    
    */
    public B channel(Class<? extends C> channelClass) {
        //将ReflectiveChannelFactory对象赋值给,成员属性channelFactory
        return channelFactory(new ReflectiveChannelFactory<C>(
              ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }
    
    /*
        一个ChannelFactory,通过反射的形式调用其默认构造函数来实例化新的Channel。
     */
    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
        //建设者
        private final Constructor<? extends T> constructor;
    
        public ReflectiveChannelFactory(Class<? extends T> clazz) {
           //检查clazz是否为空,如果为空抛出NullPointerException异常内容是第二个参数,如果不为空返回第一个参数
           ObjectUtil.checkNotNull(clazz, "clazz");
           try {
               //获取这个参数的构造函数赋给constructor,以便后面使用反射创建这个对象
               this.constructor = clazz.getConstructor();
           } catch (NoSuchMethodException e) {
               throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                            " does not have a public non-arg constructor", e);
           }
        }
    }
    

    handler()方法

    添加针对bossGroup发挥作用的Handler处理器

    childHandler()方法

    添加针对workerGroup发挥作用的Handler处理器

    /*
       设置用于为通道的请求提供服务的ChannelHandler。
    */
    public ServerBootstrap childHandler(ChannelHandler childHandler) {
       //childHandler参数不为空,则赋给 this.childHandler
       this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
            return this;
        }
    

    核心方法bind()

    /*
       创建一个新的Channel并将其绑定。
    */
    public ChannelFuture bind(int inetPort) {
            return bind(new InetSocketAddress(inetPort));
    }
    
    /--------------------------------------------------------
    
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    /--------------------------------------------------------
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
       //初始化并注册channel
       final ChannelFuture regFuture = initAndRegister();
       final Channel channel = regFuture.channel();
       if (regFuture.cause() != null) {
           return regFuture;
       }
    
       if (regFuture.isDone()) {
           // At this point we know that the registration was complete and successful.
           ChannelPromise promise = channel.newPromise();
           doBind0(regFuture, channel, localAddress, promise);
           return promise;
       } else {
           // Registration future is almost always fulfilled already, but just in case it's not.
           final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
           regFuture.addListener(new ChannelFutureListener() {
               @Override
               public void operationComplete(ChannelFuture future) throws Exception {
                   Throwable cause = future.cause();
                   if (cause != null) {
                       // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                       // IllegalStateException once we try to access the EventLoop of the Channel.
                       promise.setFailure(cause);
                   } else {
                       // Registration was successful, so set the correct executor to use.
                       // See https://github.com/netty/netty/issues/2586
                       promise.registered();
    
                       doBind0(regFuture, channel, localAddress, promise);
                   }
               }
           });
           return promise;
       }
    }
    

    initAndRegister()

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            /*使用channel()方法添加的ReflectiveChannelFactory工厂反射的创建channel对象(这个channel对象是NioServerSocketChannel),
            使用反射创建NioServerSocketChannel的时候会调用AbstractChannel父类的构造方法创建
            与这个Channel所关联的ChannelPipeline对象(实际类型是DefaultChannelPipeline)
            */
            channel = channelFactory.newChannel();
    		//初始化channel
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // 如果newChannel崩溃,则channel可以为null(例如SocketException(“打开的文件太多”))
                channel.unsafe().closeForcibly();
                // 由于尚未注册频道,因此我们需要强制使用GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // 由于尚未注册频道,因此我们需要强制使用GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
    
        /*
        	正式开始注册
        	config()  返回当前ServerBootstrap的ServerBootstrapConfig对象
        	group()	  返回当前ServerBootstrap父类AbstractBootstrap里面维护的group对象,就是我们调用group()方法设置的EventLoopGroup,
        		在本例中group()返回的是bossGroup
        */
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
    
        /*
            如果我们在这里,承诺没有失败,那就是下列情况之一:
            1)如果尝试从事件循环注册,则此时注册已完成。也就是说,现在尝试bind()或connect()是安全的,因为通道已经注册。
            2)如果我们尝试从另一个线程注册,则注册请求已成功添加到事件循环的任务队列以供以后执行。也就是说,现在尝试bind()或connect()是安全的:
                因为bind()或connect()将在执行计划的注册任务之后执行
                因为register()、bind()和connect()都绑定到同一个线程。
         */
        return regFuture;
    }
    
    init()
    /*
        完成Options与Attributes相关的设定,
     */
    @Override
    void init(Channel channel) {
        //ChannelOption是用于配置与channel相关的特别是ChannelConfig里面的这些网络层的基本的配置
        //Option初始化的值可以在serverBootstrap初始化的时候使用.option()方法进行设置,不设置netty则使用底层给根据不同情况设定好的值
        setChannelOptions(channel, newOptionsArray(), logger);
        /*
        Attribute/AttributeKey主要维护业务数据可以在程序运行过程中,动态的向里面添加key value对象,然后在后面用到的地方在取出来(类似rquest作用域)
        实现了业务数据随着netty调用流程流转,实现数据共享(类似工作流引擎当中的jBPM、Activiti,在某个流程当中可以设置一些数据,然后在
        后续的流程节点当中将数据取出来,实现了数据随着流程流转,可以在后面再取出来)
        */
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
    
        //在调用init()方法之前的channelFactory.newChannel();的时候以及创建好了与这个Channel所关联的ChannelPipeline对象,所以可以直接使用
        ChannelPipeline p = channel.pipeline();
    
        //currentChildGroup就是我们创建的workerGroup
        final EventLoopGroup currentChildGroup = childGroup;
        //currentChildHandler是我们调用childHandler()方法设置的Handler处理器,这里是MyServerInitializer
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
    
        //这里只是将ChannelInitializer对象添加到管道当中,initChannel()方法并不会执行,而是后续的某一个时刻会被调用
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                //将生成的Channel对象对应的pipeline拿到
                final ChannelPipeline pipeline = ch.pipeline();
                //如果之前调用了handler()方法则将添加的对象addLast()到这个ChannelPipeline当中
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
    
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        //向Channel中添加一个接收器
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
    
    config().group().register(channel)
    //MultithreadEventLoopGroup类的方法
    @Override
    public ChannelFuture register(Channel channel) {
       //这个register()方法调的是SingleThreadEventLoop类中的
       return next().register(channel);
    }
    
    @Override
    public EventLoop next() {
       //返回一个
       return (EventLoop) super.next();
    }
    
    @Override
    public EventExecutor next() {
        return chooser.next();
    }
    
    //DefaultEventExecutorChooserFactory类中的GenericEventExecutorChooser内部类的方法
    
    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
      private final AtomicInteger idx = new AtomicInteger();
      //保存的就是这个事件循环组的所以线程,本程序中就是一个一个的NioEventLoop。就是new NioEventLoopGroup()的时候创建的那些线程对象
      private final EventExecutor[] executors;
    
      GenericEventExecutorChooser(EventExecutor[] executors) {
          this.executors = executors;
      }
    /*返回的是这个事件循环组中的某一个事件执行器(EventExecutor)根据Math.abs(idx.getAndIncrement() % executors.length)计算结果选择其中一个*/
      @Override
      public EventExecutor next() {
          return executors[Math.abs(idx.getAndIncrement() % executors.length)];
      }
    }
    
    SingleThreadEventLoop
    @Override
    public ChannelFuture register(Channel channel) {
       //创建一个Promise,传入要注册的Channel与事当前的件循环组
       return register(new DefaultChannelPromise(channel, this));
    }
    
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
       ObjectUtil.checkNotNull(promise, "promise");
       //调用cahnnel()得到刚刚创建promise对象传进去的Channel对象,然后调用它(AbstractNioChannel)的unsafe()方法得到一个NioUnsafe对象,这个Unsafe对象来自于它的父类AbstractChannel,AbstractNioChannel做了一个向下类型转换,最后调用register()方法
       promise.channel().unsafe().register(this, promise);
       return promise;
    }
    
    //register()方法是AbstractChannel中的-----------------------------------
    
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ObjectUtil.checkNotNull(eventLoop, "eventLoop");
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        if (!isCompatible(eventLoop)) {
            promise.setFailure(
                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }
        AbstractChannel.this.eventLoop = eventLoop;
        /*
        	核心代码段
        	判断当前正在执行这行代码的线程是不是SingleThreadEventExecutor中维护的thread
        	如果是则直接调用register0()注册
        	如果不是则将这注册任务以一个任务的形式提交给eventLoop(SingleThreadEventExecutor)当中维护的那个线程对象,让它去执行解决了多线程并发的问题
        */
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }
    
    private void register0(ChannelPromise promise) {
         try {
             // check if the channel is still open as it could be closed in the mean time when the register
             // call was outside of the eventLoop
             if (!promise.setUncancellable() || !ensureOpen(promise)) {
                 return;
             }
             boolean firstRegistration = neverRegistered;
             //注册的最底层实现,完成注册
             doRegister();
             neverRegistered = false;
             registered = true;
    
             // 确保在实际通知诺言之前,先调用handlerAdded(...)。这是必需的,因为用户可能已经通过ChannelFutureListener中的管道触发了事件
             pipeline.invokeHandlerAddedIfNeeded();
    
             safeSetSuccess(promise);
             pipeline.fireChannelRegistered();
             //如果从未注册过频道,则仅触发channelActive。如果通道已注销并重新注册,则可以防止激活多个通道。
             if (isActive()) {
                 if (firstRegistration) {
                     pipeline.fireChannelActive();
                 } else if (config().isAutoRead()) {
                     // 此通道之前已注册,并且已设置autoRead()。这意味着我们需要再次开始读取,以便处理入站数据。
                     //
                     // 可以接收客户端消息了
                     beginRead();
                 }
             }
          } catch (Throwable t) {
              // Close the channel directly to avoid FD leak.
              closeForcibly();
              closeFuture.setClosed();
              safeSetFailure(promise, t);
          }
    }
    
    
    //doRegister()方法是由子类AbstractNioChannel实现的
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                //javaChannel()返回的是SelectableChannel(ServerSocketChannelImpl),然后将这个channel注册到eventLoop().unwrappedSelector()返回的Selector上,0是感兴趣的事件
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
    

    重要的类说明

    ChannelOption<T>

    ChannelOption允许以类型安全的方式配置一个ChannelConfig
    到底支持哪一种ChannelOption取决于ChannelConfig的实际实现也可能依赖于它所属的运输层的本质。

    用于存储这个channel与TCP/IP相关的一些基本的配置信息,它是不存储值的值在其它地方存储,它里面存放的是
    值的类型(ChannelOption本身并不维护选项的值的信息,它只维护这个选项本身/这个名字本身)

    param:
    泛型 T ChannelOption值的类型(某一个选项/某一个设置项的类型)

    ChannelConfig

    对于channel的一套配置属性
    下转换到更具体的配置类型,如SocketChannelConfig或使用setOptions(Map)设置与传输具体相关的属性:
    Channel ch = ...;
    SocketChannelConfig cfg = (SocketChannelConfig) ch.getConfig();
    cfg.setTcpNoDelay(false);

    Option map
    一个Option map属性是动态的只能写的属性,可以进行Channel的配置而无需向下类型转换。

    名称 相关setter方法
    ChannelOption.CONNECT_TIMEOUT_MILLIS setConnectTimeoutMillis(int)
    ChannelOption.WRITE_SPIN_COUNT setWriteSpinCount(int)
    ChannelOption.WRITE_BUFFER_WATER_MARK setWriteBufferWaterMark(WriteBufferWaterMark)
    ChannelOption.ALLOCATOR setAllocator(ByteBufAllocator)
    ChannelOption.AUTO_READ setAutoRead(boolean)

    还有很多选项都位于ChannelConfig的子类当中,比如说你可以配置一些参数特定于TCP/IP scoket 参数

    AttributeKey<T>

    属性键可以用于在AttributeMap外面访问Attribute,请注意相同的名字不可能有多个键(keys)

    param:
    T Attribute类型,其可以通过该访问AttributeKey 。

    Channel与ChannelContext作用域

    直接向Channel上面附加的属性与向ChannelContext附加的属性,它们的作用域有什么不同

    1. Netty4.0
      对于整个channel来说它有一个map,用于维护它的属性和值的映射关系。而针对每一个ChannelHandlerContext也拥有自己的一个map,在Netty的组件当中只要有一个handler就会有一个与之相关和对应的ChannelHandlerContext。如果有10个handler那么Netty就会创建10ChannelHandlerContext同时在10个ChannelHandlerContext当中就会拥有10个不同的map用于分别存放这个handler在自己的作用域中所拥有的key value值,而channel本身又有一个独立的map映射信息。

      这种做法有两个问题:

      1. 当你在A handler当中set的值,在B handler当中是拿不到的,或者说你在channel当中设置的值,在handler中也是拿不到的
      2. 浪费内存,创建了太多个map每个ChannelHandlerContext都会有一个
    2. Netty4.1之后
      只会有一个map对象,而这个map对象会被channel以及这个channel上的所有handler所共享,而且key是不会重复的

  • 相关阅读:
    Django U2 模型
    Django U0 使用Django
    Django H2 文档查看
    python模块--time模块/os模块/sys模块
    python模块-logging和collections以及random模块
    python-hashlib模块configparser模块logging模块
    python模块--序列化
    python面向对象的特殊方法和单例模式
    python类属性和类方法以及静态方法和反射
    python面向对象的三个特性
  • 原文地址:https://www.cnblogs.com/mikisakura/p/13177482.html
Copyright © 2011-2022 走看看