zoukankan      html  css  js  c++  java
  • Netty服务端的启动源码分析

    ServerBootstrap的构造:

     1 public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
     2     private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
     3     private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap();
     4     private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap();
     5     private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
     6     private volatile EventLoopGroup childGroup;
     7     private volatile ChannelHandler childHandler;
     8 
     9     public ServerBootstrap() {
    10     }
    11     ......
    12 }

    隐式地执行了父类的无参构造:

     1 public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
     2     volatile EventLoopGroup group;
     3     private volatile ChannelFactory<? extends C> channelFactory;
     4     private volatile SocketAddress localAddress;
     5     private final Map<ChannelOption<?>, Object> options = new LinkedHashMap();
     6     private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap();
     7     private volatile ChannelHandler handler;
     8 
     9     AbstractBootstrap() {
    10     }
    11     ......
    12 }

    只是初始化了几个容器成员

    在ServerBootstrap创建后,需要调用group方法,绑定EventLoopGroup,有关EventLoopGroup的创建在我之前博客中写过:Netty中NioEventLoopGroup的创建源码分析


    ServerBootstrap的group方法:

     1 public ServerBootstrap group(EventLoopGroup group) {
     2     return this.group(group, group);
     3 }
     4 
     5 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
     6     super.group(parentGroup);
     7     if (childGroup == null) {
     8         throw new NullPointerException("childGroup");
     9     } else if (this.childGroup != null) {
    10         throw new IllegalStateException("childGroup set already");
    11     } else {
    12         this.childGroup = childGroup;
    13         return this;
    14     }
    15 }

    首先调用父类的group方法绑定parentGroup:

     1 public B group(EventLoopGroup group) {
     2     if (group == null) {
     3         throw new NullPointerException("group");
     4     } else if (this.group != null) {
     5         throw new IllegalStateException("group set already");
     6     } else {
     7         this.group = group;
     8         return this.self();
     9     }
    10 }
    11 
    12 private B self() {
    13     return this;
    14 }

    将传入的parentGroup绑定给AbstractBootstrap的group成员,将childGroup绑定给ServerBootstrap的childGroup成员。
    group的绑定仅仅是交给了成员保存。

    再来看看ServerBootstrap的channel方法,,是在AbstractBootstrap中实现的:

    1 public B channel(Class<? extends C> channelClass) {
    2     if (channelClass == null) {
    3         throw new NullPointerException("channelClass");
    4     } else {
    5         return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
    6     }
    7 }

    使用channelClass构建了一个ReflectiveChannelFactory对象:

     1 public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
     2     private final Class<? extends T> clazz;
     3 
     4     public ReflectiveChannelFactory(Class<? extends T> clazz) {
     5         if (clazz == null) {
     6             throw new NullPointerException("clazz");
     7         } else {
     8             this.clazz = clazz;
     9         }
    10     }
    11 
    12     public T newChannel() {
    13         try {
    14             return (Channel)this.clazz.getConstructor().newInstance();
    15         } catch (Throwable var2) {
    16             throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);
    17         }
    18     }
    19 
    20     public String toString() {
    21         return StringUtil.simpleClassName(this.clazz) + ".class";
    22     }
    23 }

    可以看到ReflectiveChannelFactory的作用就是通过反射机制,产生clazz的实例(这里以NioServerSocketChannel为例)。

    在创建完ReflectiveChannelFactory对象后, 调用channelFactory方法:

     1 public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
     2     return this.channelFactory((ChannelFactory)channelFactory);
     3 }
     4 
     5 public B channelFactory(ChannelFactory<? extends C> channelFactory) {
     6     if (channelFactory == null) {
     7         throw new NullPointerException("channelFactory");
     8     } else if (this.channelFactory != null) {
     9         throw new IllegalStateException("channelFactory set already");
    10     } else {
    11         this.channelFactory = channelFactory;
    12         return this.self();
    13     }
    14 }

    将刚才创建的ReflectiveChannelFactory对象交给channelFactory成员,用于后续服务端NioServerSocketChannel的创建。

    再来看ServerBootstrap的childHandler方法:

    1 public ServerBootstrap childHandler(ChannelHandler childHandler) {
    2     if (childHandler == null) {
    3         throw new NullPointerException("childHandler");
    4     } else {
    5         this.childHandler = childHandler;
    6         return this;
    7     }
    8 }

    还是交给了childHandler成员保存,可以看到上述这一系列的操作,都是为了填充ServerBootstrap,而ServerBootstrap真正的启动是在bind时:
    ServerBootstrap的bind方法,在AbstractBootstrap中实现:

     1 public ChannelFuture bind(int inetPort) {
     2     return this.bind(new InetSocketAddress(inetPort));
     3 }
     4 
     5 public ChannelFuture bind(String inetHost, int inetPort) {
     6 return this.bind(SocketUtils.socketAddress(inetHost, inetPort));
     7 }
     8 
     9 public ChannelFuture bind(InetAddress inetHost, int inetPort) {
    10     return this.bind(new InetSocketAddress(inetHost, inetPort));
    11 }
    12 
    13 public ChannelFuture bind(SocketAddress localAddress) {
    14     this.validate();
    15     if (localAddress == null) {
    16         throw new NullPointerException("localAddress");
    17     } else {
    18         return this.doBind(localAddress);
    19     }
    20 }

    可以看到首先调用了ServerBootstrap的validate方法,:

     1 public ServerBootstrap validate() {
     2     super.validate();
     3     if (this.childHandler == null) {
     4         throw new IllegalStateException("childHandler not set");
     5     } else {
     6         if (this.childGroup == null) {
     7             logger.warn("childGroup is not set. Using parentGroup instead.");
     8             this.childGroup = this.config.group();
     9         }
    10     
    11         return this;
    12     }
    13 }

    先调用了AbstractBootstrap的validate方法:

    1 public B validate() {
    2     if (this.group == null) {
    3         throw new IllegalStateException("group not set");
    4     } else if (this.channelFactory == null) {
    5         throw new IllegalStateException("channel or channelFactory not set");
    6     } else {
    7         return this.self();
    8     }
    9 }


    这个方法就是用来检查是否绑定了group和channel以及childHandler,所以在执行bind方法前,无论如何都要执行group、channel和childHandler方法。

    实际的bind交给了doBind来完成:

     1 private ChannelFuture doBind(final SocketAddress localAddress) {
     2     final ChannelFuture regFuture = this.initAndRegister();
     3     final Channel channel = regFuture.channel();
     4     if (regFuture.cause() != null) {
     5         return regFuture;
     6     } else if (regFuture.isDone()) {
     7         ChannelPromise promise = channel.newPromise();
     8         doBind0(regFuture, channel, localAddress, promise);
     9         return promise;
    10     } else {
    11         final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
    12         regFuture.addListener(new ChannelFutureListener() {
    13             public void operationComplete(ChannelFuture future) throws Exception {
    14                 Throwable cause = future.cause();
    15                 if (cause != null) {
    16                     promise.setFailure(cause);
    17                 } else {
    18                     promise.registered();
    19                     AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
    20                 }
    21             }
    22         });
    23         return promise;
    24     }
    25 }

    首先调用initAndRegister,完成ServerSocketChannel的创建以及注册:

     1 final ChannelFuture initAndRegister() {
     2     Channel channel = null;
     3 
     4     try {
     5         channel = this.channelFactory.newChannel();
     6         this.init(channel);
     7     } catch (Throwable var3) {
     8         if (channel != null) {
     9             channel.unsafe().closeForcibly();
    10             return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
    11         }
    12 
    13         return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
    14     }
    15 
    16     ChannelFuture regFuture = this.config().group().register(channel);
    17     if (regFuture.cause() != null) {
    18         if (channel.isRegistered()) {
    19             channel.close();
    20         } else {
    21             channel.unsafe().closeForcibly();
    22         }
    23     }
    24 
    25     return regFuture;
    26 }

    首先调用channelFactory的newChannel通过反射机制构建Channel实例,也就是NioServerSocketChannel,


    NioServerSocketChannel的无参构造:

    1 public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel {
    2     private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    3     
    4     public NioServerSocketChannel() {
    5         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    6     }
    7     ......
    8 }

    SelectorProvider 是JDK的,关于SelectorProvider在我之前的博客中有介绍:【Java】NIO中Selector的创建源码分析

    在Windows系统下默认产生WindowsSelectorProvider,即DEFAULT_SELECTOR_PROVIDER,再来看看newSocket方法:

    1 private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
    2     try {
    3         return provider.openServerSocketChannel();
    4     } catch (IOException var2) {
    5         throw new ChannelException("Failed to open a server socket.", var2);
    6     }
    7 }

    使用WindowsSelectorProvider创建了一个ServerSocketChannelImpl,其实看到这里就明白了,NioServerSocketChannel是为了封装JDK的ServerSocketChannel

    接着调用另一个重载的构造:

    1 public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
    2     super((Channel)null, channel, 16);
    3     this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket());
    4 }

    首先调用父类的三参构造,其中16对应的是JDK中SelectionKey的ACCEPT状态:

    1 public static final int OP_ACCEPT = 1 << 4;

    其父类的构造处于一条继承链上:

    AbstractNioMessageChannel:

    1 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    2     super(parent, ch, readInterestOp);
    3 }

    AbstractNioChannel:

     1 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
     2     super(parent);
     3     this.ch = ch;
     4     this.readInterestOp = readInterestOp;
     5 
     6     try {
     7         ch.configureBlocking(false);
     8     } catch (IOException var7) {
     9         try {
    10             ch.close();
    11         } catch (IOException var6) {
    12             if (logger.isWarnEnabled()) {
    13                 logger.warn("Failed to close a partially initialized socket.", var6);
    14             }
    15         }
    16 
    17         throw new ChannelException("Failed to enter non-blocking mode.", var7);
    18     }
    19 }

    AbstractChannel:

     1 private final ChannelId id;
     2 private final Channel parent;
     3 private final Unsafe unsafe;
     4 private final DefaultChannelPipeline pipeline;
     5 
     6 protected AbstractChannel(Channel parent) {
     7     this.parent = parent;
     8     this.id = this.newId();
     9     this.unsafe = this.newUnsafe();
    10     this.pipeline = this.newChannelPipeline();
    11 }

    在AbstractChannel中使用newUnsafe和newChannelPipeline分别创建了一个Unsafe和一个DefaultChannelPipeline对象,
    在前面的博客介绍NioEventLoopGroup时候,在NioEventLoop的run方法中,每次轮询完调用processSelectedKeys方法时,都是通过这个unsafe根据SelectedKey来完成数据的读或写,unsafe是处理基础的数据读写
    (unsafe在NioServerSocketChannel创建时,产生NioMessageUnsafe实例,在NioSocketChannel创建时产生NioSocketChannelUnsafe实例)

    而pipeline的实现是一条双向责任链,负责处理unsafe提供的数据,进而进行用户的业务逻辑 (Netty中的ChannelPipeline源码分析

    在AbstractNioChannel中调用configureBlocking方法给JDK的ServerSocketChannel设置为非阻塞模式,且让readInterestOp成员赋值为16用于未来注册ACCEPT事件。

    在调用完继承链后回到NioServerSocketChannel构造,调用了javaChannel方法:

    1 protected java.nio.channels.ServerSocketChannel javaChannel() {
    2     return (java.nio.channels.ServerSocketChannel)super.javaChannel();
    3 }

    其实这个javaChannel就是刚才出传入到AbstractNioChannel中的ch成员:

    1 protected SelectableChannel javaChannel() {
    2     return this.ch;
    3 }

    也就是刚才创建的JDK的ServerSocketChannelImpl,用其socket方法,得到一个ServerSocket对象,然后产生了一个NioServerSocketChannelConfig对象,用于封装相关信息。

    NioServerSocketChannel构建完毕,回到initAndRegister方法,使用刚创建的NioServerSocketChannel调用init方法,这个方法是在ServerBootstrap中实现的:

     1 void init(Channel channel) throws Exception {
     2     Map<ChannelOption<?>, Object> options = this.options0();
     3     synchronized(options) {
     4         setChannelOptions(channel, options, logger);
     5     }
     6 
     7     Map<AttributeKey<?>, Object> attrs = this.attrs0();
     8     synchronized(attrs) {
     9         Iterator var5 = attrs.entrySet().iterator();
    10 
    11         while(true) {
    12             if (!var5.hasNext()) {
    13                 break;
    14             }
    15 
    16             Entry<AttributeKey<?>, Object> e = (Entry)var5.next();
    17             AttributeKey<Object> key = (AttributeKey)e.getKey();
    18             channel.attr(key).set(e.getValue());
    19         }
    20     }
    21 
    22     ChannelPipeline p = channel.pipeline();
    23     final EventLoopGroup currentChildGroup = this.childGroup;
    24     final ChannelHandler currentChildHandler = this.childHandler;
    25     Map var9 = this.childOptions;
    26     final Entry[] currentChildOptions;
    27     synchronized(this.childOptions) {
    28         currentChildOptions = (Entry[])this.childOptions.entrySet().toArray(newOptionArray(0));
    29     }
    30 
    31     var9 = this.childAttrs;
    32     final Entry[] currentChildAttrs;
    33     synchronized(this.childAttrs) {
    34         currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(0));
    35     }
    36 
    37     p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
    38         public void initChannel(final Channel ch) throws Exception {
    39             final ChannelPipeline pipeline = ch.pipeline();
    40             ChannelHandler handler = ServerBootstrap.this.config.handler();
    41             if (handler != null) {
    42                 pipeline.addLast(new ChannelHandler[]{handler});
    43             }
    44 
    45             ch.eventLoop().execute(new Runnable() {
    46                 public void run() {
    47                     pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
    48                 }
    49             });
    50         }
    51     }});
    52 }

    首先对attrs和options这两个成员进行了填充属性配置,这不是重点,然后获取刚才创建的NioServerSocketChannel的责任链pipeline,通过addLast将ChannelInitializer加入责任链,在ChannelInitializer中重写了initChannel方法,首先根据handler是否是null(这个handler是ServerBootstrap调用handler方法添加的,和childHandler方法不一样),若是handler不是null,将handler加入责任链,无论如何,都会异步将一个ServerBootstrapAcceptor对象加入责任链(后面会说为什么是异步)

    这个ChannelInitializer的initChannel方法的执行需要等到后面注册时才会被调用,在后面pipeline处理channelRegistered请求时,此initChannel方法才会被执行 (Netty中的ChannelPipeline源码分析

    ChannelInitializer的channelRegistered方法:

    1 public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    2     if (initChannel(ctx)) {
    3         ctx.pipeline().fireChannelRegistered();
    4     } else {
    5         ctx.fireChannelRegistered();
    6     }
    7 }

    首先调用initChannel方法(和上面的initChannel不是一个):

     1 private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
     2     if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { 
     3         try {
     4             initChannel((C) ctx.channel());
     5         } catch (Throwable cause) {
     6             exceptionCaught(ctx, cause);
     7         } finally {
     8             remove(ctx);
     9         }
    10         return true;
    11     }
    12     return false;
    13 }

    可以看到,这个ChannelInitializer只会在pipeline中初始化一次,仅用于Channel的注册,在完成注册后,会调用remove方法将其从pipeline中移除:
    remove方法:

     1 private void remove(ChannelHandlerContext ctx) {
     2     try {
     3         ChannelPipeline pipeline = ctx.pipeline();
     4         if (pipeline.context(this) != null) {
     5             pipeline.remove(this);
     6         }
     7     } finally {
     8         initMap.remove(ctx);
     9     }
    10 }

    在移除前,就会回调用刚才覆盖的initChannel方法,异步向pipeline添加了ServerBootstrapAcceptor,用于后续的NioServerSocketChannel侦听到客户端连接后,完成在服务端的NioSocketChannel的注册。

    回到initAndRegister,在对NioServerSocketChannel初始化完毕,接下来就是注册逻辑:

    1 ChannelFuture regFuture = this.config().group().register(channel);

    首先调用config().group(),这个就得到了一开始在ServerBootstrap的group方法传入的parentGroup,调用parentGroup的register方法,parentGroup是NioEventLoopGroup,这个方法是在子类MultithreadEventLoopGroup中实现的:

    1 public ChannelFuture register(Channel channel) {
    2     return this.next().register(channel);
    3 }

    首先调用next方法:

    1 public EventLoop next() {
    2     return (EventLoop)super.next();
    3 }

    实际上调用父类MultithreadEventExecutorGroup的next方法:

    1 public EventExecutor next() {
    2     return this.chooser.next();
    3 }

    关于chooser在我之前博客:Netty中NioEventLoopGroup的创建源码分析 介绍过,在NioEventLoopGroup创建时,默认会根据cpu个数创建二倍个NioEventLoop,而chooser就负责通过取模,每次选择一个NioEventLoop使用

    所以在MultithreadEventLoopGroup的register方法实际调用了NioEventLoop的register方法:

    NioEventLoop的register方法在子类SingleThreadEventLoop中实现:

    1 public ChannelFuture register(Channel channel) {
    2     return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
    3 }
    4 
    5 public ChannelFuture register(ChannelPromise promise) {
    6    ObjectUtil.checkNotNull(promise, "promise");
    7     promise.channel().unsafe().register(this, promise);
    8     return promise;
    9 }

    先把channel包装成ChannelPromise,默认是DefaultChannelPromise (Netty中的ChannelFuture和ChannelPromise),用于处理异步操作

    调用重载方法,而在重载方法里,可以看到,实际上的register操作交给了channel的unsafe来实现:

    unsafe的register方法在AbstractUnsafe中实现:

     1 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
     2     if (eventLoop == null) {
     3         throw new NullPointerException("eventLoop");
     4     } else if (AbstractChannel.this.isRegistered()) {
     5         promise.setFailure(new IllegalStateException("registered to an event loop already"));
     6     } else if (!AbstractChannel.this.isCompatible(eventLoop)) {
     7         promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
     8     } else {
     9         AbstractChannel.this.eventLoop = eventLoop;
    10         if (eventLoop.inEventLoop()) {
    11             this.register0(promise);
    12         } else {
    13             try {
    14                 eventLoop.execute(new Runnable() {
    15                     public void run() {
    16                         AbstractUnsafe.this.register0(promise);
    17                     }
    18                 });
    19             } catch (Throwable var4) {
    20                 AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
    21                 this.closeForcibly();
    22                 AbstractChannel.this.closeFuture.setClosed();
    23                 this.safeSetFailure(promise, var4);
    24             }
    25         }
    26 
    27     }
    28 }

    前面的判断做了一些检查就不细说了,直接看到else块
    首先给当前Channel绑定了eventLoop,即通过刚才chooser选择的eventLoop,该Channel也就是NioServerSocketChannel
    由于Unsafe的操作是在轮询线程中异步执行的,所里,这里需要判断inEventLoop是否处于轮询中
    在之前介绍NioEventLoopGroup的时候说过,NioEventLoop在没有调用doStartThread方法时并没有启动轮询的,所以inEventLoop判断不成立

    那么就调用eventLoop的execute方法,实际上的注册方法可以看到调用了AbstractUnsafe的register0方法,而将这个方法封装为Runnable交给eventLoop作为一个task去异步执行
    先来看eventLoop的execute方法实现,是在NioEventLoop的子类SingleThreadEventExecutor中实现的:

     1 public void execute(Runnable task) {
     2     if (task == null) {
     3         throw new NullPointerException("task");
     4     } else {
     5         boolean inEventLoop = this.inEventLoop();
     6         this.addTask(task);
     7         if (!inEventLoop) {
     8             this.startThread();
     9             if (this.isShutdown() && this.removeTask(task)) {
    10                 reject();
    11             }
    12         }
    13 
    14         if (!this.addTaskWakesUp && this.wakesUpForTask(task)) {
    15             this.wakeup(inEventLoop);
    16         }
    17 
    18     }
    19 }

    这里首先将task,即刚才的注册事件放入阻塞任务队列中,然后调用startThread方法:

     1 private void startThread() {
     2     if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
     3         try {
     4             this.doStartThread();
     5         } catch (Throwable var2) {
     6             STATE_UPDATER.set(this, 1);
     7             PlatformDependent.throwException(var2);
     8         }
     9     }
    10 
    11 }

    NioEventLoop此时还没有轮询,所以状态是1,对应ST_NOT_STARTED,此时利用CAS操作,将状态修改为2,即ST_STARTED ,标志着NioEventLoop要启动轮询了,果然,接下来就调用了doStartThread开启轮询线程:

      1 private void doStartThread() {
      2     assert this.thread == null;
      3 
      4     this.executor.execute(new Runnable() {
      5         public void run() {
      6             SingleThreadEventExecutor.this.thread = Thread.currentThread();
      7             if (SingleThreadEventExecutor.this.interrupted) {
      8                 SingleThreadEventExecutor.this.thread.interrupt();
      9             }
     10 
     11             boolean success = false;
     12             SingleThreadEventExecutor.this.updateLastExecutionTime();
     13             boolean var112 = false;
     14 
     15             int oldState;
     16             label1907: {
     17                 try {
     18                     var112 = true;
     19                     SingleThreadEventExecutor.this.run();
     20                     success = true;
     21                     var112 = false;
     22                     break label1907;
     23                 } catch (Throwable var119) {
     24                     SingleThreadEventExecutor.logger.warn("Unexpected exception from an event executor: ", var119);
     25                     var112 = false;
     26                 } finally {
     27                     if (var112) {
     28                         int oldStatex;
     29                         do {
     30                             oldStatex = SingleThreadEventExecutor.this.state;
     31                         } while(oldStatex < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldStatex, 3));
     32 
     33                         if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) {
     34                             SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");
     35                         }
     36 
     37                         try {
     38                             while(!SingleThreadEventExecutor.this.confirmShutdown()) {
     39                                 ;
     40                             }
     41                         } finally {
     42                             try {
     43                                 SingleThreadEventExecutor.this.cleanup();
     44                             } finally {
     45                                 SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);
     46                                 SingleThreadEventExecutor.this.threadLock.release();
     47                                 if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) {
     48                                     SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')');
     49                                 }
     50 
     51                                 SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);
     52                             }
     53                         }
     54 
     55                     }
     56                 }
     57 
     58                 do {
     59                     oldState = SingleThreadEventExecutor.this.state;
     60                 } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3));
     61 
     62                 if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) {
     63                     SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");
     64                 }
     65 
     66                 try {
     67                     while(!SingleThreadEventExecutor.this.confirmShutdown()) {
     68                         ;
     69                     }
     70 
     71                     return;
     72                 } finally {
     73                     try {
     74                         SingleThreadEventExecutor.this.cleanup();
     75                     } finally {
     76                         SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);
     77                         SingleThreadEventExecutor.this.threadLock.release();
     78                         if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) {
     79                             SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')');
     80                         }
     81 
     82                         SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);
     83                     }
     84                 }
     85             }
     86 
     87             do {
     88                 oldState = SingleThreadEventExecutor.this.state;
     89             } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3));
     90 
     91             if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) {
     92                 SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");
     93             }
     94 
     95             try {
     96                 while(!SingleThreadEventExecutor.this.confirmShutdown()) {
     97                     ;
     98                 }
     99             } finally {
    100                 try {
    101                     SingleThreadEventExecutor.this.cleanup();
    102                 } finally {
    103                     SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);
    104                     SingleThreadEventExecutor.this.threadLock.release();
    105                     if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) {
    106                         SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')');
    107                     }
    108 
    109                     SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);
    110                 }
    111             }
    112 
    113         }
    114     });
    115 }

    关于doStartThread方法,我在 Netty中NioEventLoopGroup的创建源码分析 中已经说的很细了,这里就不再一步一步分析了

    因为此时还没真正意义上的启动轮询,所以thread等于null成立的,然后调用executor的execute方法,这里的executor是一个线程池,在之前说过的,所以里面的run方法是处于一个线程里面的,然后给thread成员赋值为当前线程,表明正式进入了轮询。
    而SingleThreadEventExecutor.this.run()才是真正的轮询逻辑,这在之前也说过,这个run的实现是在父类NioEventLoop中:

     1 protected void run() {
     2     while(true) {
     3         while(true) {
     4             try {
     5                 switch(this.selectStrategy.calculateStrategy(this.selectNowSupplier, this.hasTasks())) {
     6                 case -2:
     7                     continue;
     8                 case -1:
     9                     this.select(this.wakenUp.getAndSet(false));
    10                     if (this.wakenUp.get()) {
    11                         this.selector.wakeup();
    12                     }
    13                 default:
    14                     this.cancelledKeys = 0;
    15                     this.needsToSelectAgain = false;
    16                     int ioRatio = this.ioRatio;
    17                     if (ioRatio == 100) {
    18                         try {
    19                             this.processSelectedKeys();
    20                         } finally {
    21                             this.runAllTasks();
    22                         }
    23                     } else {
    24                         long ioStartTime = System.nanoTime();
    25                         boolean var13 = false;
    26 
    27                         try {
    28                             var13 = true;
    29                             this.processSelectedKeys();
    30                             var13 = false;
    31                         } finally {
    32                             if (var13) {
    33                                 long ioTime = System.nanoTime() - ioStartTime;
    34                                 this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
    35                             }
    36                         }
    37 
    38                         long ioTime = System.nanoTime() - ioStartTime;
    39                         this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
    40                     }
    41                 }
    42             } catch (Throwable var21) {
    43                 handleLoopException(var21);
    44             }
    45 
    46             try {
    47                 if (this.isShuttingDown()) {
    48                     this.closeAll();
    49                     if (this.confirmShutdown()) {
    50                         return;
    51                     }
    52                 }
    53             } catch (Throwable var18) {
    54                 handleLoopException(var18);
    55             }
    56         }
    57     }
    58 }

    首先由于task已经有一个了,就是刚才的注册事件,所以选择策略calculateStrategy最终调用selectNow(也是之前说过的):

     1 private final IntSupplier selectNowSupplier = new IntSupplier() {
     2     public int get() throws Exception {
     3         return NioEventLoop.this.selectNow();
     4     }
     5 };
     6 
     7 int selectNow() throws IOException {
     8     int var1;
     9     try {
    10         var1 = this.selector.selectNow();
    11     } finally {
    12         if (this.wakenUp.get()) {
    13             this.selector.wakeup();
    14         }
    15 
    16     }
    17 
    18     return var1;
    19 }

    使用JDK原生Selector进行selectNow,由于此时没有任何Channel的注册,所以selectNow会立刻返回0,此时就进入default逻辑,由于没有任何注册,processSelectedKeys方法也做不了什么,所以在这一次的轮询实质上只进行了runAllTasks方法,此方法会执行阻塞队列中的task的run方法(还是在之前博客中介绍过),由于轮询是在线程池中的一个线程中运行的,所以task的执行是一个异步操作。(在执行完task,将task移除阻塞对立,线程继续轮询)

    这时就可以回到AbstractChannel的register方法中了,由上面可以知道task实际上异步执行了:

    1 AbstractUnsafe.this.register0(promise);

    register0方法:

     1 private void register0(ChannelPromise promise) {
     2     try {
     3         if (!promise.setUncancellable() || !this.ensureOpen(promise)) {
     4             return;
     5         }
     6 
     7         boolean firstRegistration = this.neverRegistered;
     8         AbstractChannel.this.doRegister();
     9         this.neverRegistered = false;
    10         AbstractChannel.this.registered = true;
    11         AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
    12         this.safeSetSuccess(promise);
    13         AbstractChannel.this.pipeline.fireChannelRegistered();
    14         if (AbstractChannel.this.isActive()) {
    15             if (firstRegistration) {
    16                 AbstractChannel.this.pipeline.fireChannelActive();
    17             } else if (AbstractChannel.this.config().isAutoRead()) {
    18                 this.beginRead();
    19             }
    20         }
    21     } catch (Throwable var3) {
    22         this.closeForcibly();
    23         AbstractChannel.this.closeFuture.setClosed();
    24         this.safeSetFailure(promise, var3);
    25     }
    26 
    27 }

    可以看到实际上的注册逻辑又交给了AbstractChannel的doRegister,而这个方法在AbstractNioChannel中实现:

     1 protected void doRegister() throws Exception {
     2     boolean selected = false;
     3 
     4     while(true) {
     5         try {
     6             this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
     7             return;
     8         } catch (CancelledKeyException var3) {
     9             if (selected) {
    10                 throw var3;
    11             }
    12 
    13             this.eventLoop().selectNow();
    14             selected = true;
    15         }
    16     }
    17 }

    javaChannel就是之前产生的JDK的ServerSocketChannel,unwrappedSelector在之前说过,就是未经修改的JDK原生Selector,这个Selector和eventLoop是一对一绑定的,可以看到调用JDK原生的注册方法,完成了对ServerSocketChannel的注册,但是注册的是一个0状态(缺省值),而传入的this,即AbstractNioChannel对象作为了一个附件,用于以后processSelectedKeys方法从SelectionKey中得到对应的Netty的Channel(还是之前博客说过)
    关于缺省值,是由于AbstractNioChannel不仅用于NioServerSocketChannel的注册,还用于NioSocketChannel的注册,只有都使用缺省值注册才不会产生异常  【Java】NIO中Channel的注册源码分析 ,并且,在以后processSelectedKeys方法会对0状态判断,再使用unsafe进行相应的逻辑处理。

    在完成JDK的注册后,调用pipeline的invokeHandlerAddedIfNeeded方法(Netty中的ChannelPipeline源码分析),处理ChannelHandler的handlerAdded的回调,即调用用户添加的ChannelHandler的handlerAdded方法。
    调用safeSetSuccess,标志异步操作完成:

    1 protected final void safeSetSuccess(ChannelPromise promise) {
    2     if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
    3         logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
    4     }
    5 }

    关于异步操作我在之前的博客中说的很清楚了:Netty中的ChannelFuture和ChannelPromise


    接着调用pipeline的fireChannelRegistered方法,也就是在责任链上调用channelRegistered方法,这时,就会调用之在ServerBootstrap中向pipeline添加的ChannelInitializer的channelRegistered,进而回调initChannel方法,完成对ServerBootstrapAcceptor的添加。

    回到register0方法,在处理完pipeline的责任链后,根据当前AbstractChannel即NioServerSocketChannel的isActive:

    1 public boolean isActive() {
    2     return this.javaChannel().socket().isBound();
    3 }

    获得NioServerSocketChannel绑定的JDK的ServerSocketChannel,进而获取ServerSocket,判断isBound:

    1 public boolean isBound() {
    2    // Before 1.3 ServerSockets were always bound during creation
    3     return bound || oldImpl;
    4 }

    这里实际上就是判断ServerSocket是否调用了bind方法,前面说过register0方法是一个异步操作,在多线程环境下不能保证执行顺序,若是此时已经完成了ServerSocket的bind,根据firstRegistration判断是否需要pipeline传递channelActive请求,首先会执行pipeline的head即HeadContext的channelActive方法:

    1 @Override
    2 public void channelActive(ChannelHandlerContext ctx) throws Exception {
    3     ctx.fireChannelActive();
    4 
    5     readIfIsAutoRead();
    6 }

    在HeadContext通过ChannelHandlerContext 传递完channelActive请求后,会调用readIfIsAutoRead方法:

    1 private void readIfIsAutoRead() {
    2     if (channel.config().isAutoRead()) {
    3         channel.read();
    4     }
    5 }

    此时调用AbstractChannel的read方法:

    1 public Channel read() {
    2     pipeline.read();
    3     return this;
    4 }

    最终在请求链由HeadContext执行read方法:

    1 public void read(ChannelHandlerContext ctx) {
    2     unsafe.beginRead();
    3 }

    终于可以看到此时调用unsafe的beginRead方法:

     1 public final void beginRead() {
     2     assertEventLoop();
     3 
     4     if (!isActive()) {
     5         return;
     6     }
     7 
     8     try {
     9         doBeginRead();
    10     } catch (final Exception e) {
    11         invokeLater(new Runnable() {
    12             @Override
    13             public void run() {
    14                 pipeline.fireExceptionCaught(e);
    15             }
    16         });
    17         close(voidPromise());
    18     }
    19 }

    最终执行了doBeginRead方法,由AbstractNioChannel实现:

     1 protected void doBeginRead() throws Exception {
     2     final SelectionKey selectionKey = this.selectionKey;
     3     if (!selectionKey.isValid()) {
     4         return;
     5     }
     6     
     7     readPending = true;
     8     
     9     final int interestOps = selectionKey.interestOps();
    10     if ((interestOps & readInterestOp) == 0) {
    11         selectionKey.interestOps(interestOps | readInterestOp);
    12     }
    13 }

    这里,就完成了向Selector注册readInterestOp事件,从前面来看就是ACCEPT事件

    回到AbstractBootstrap的doBind方法,在initAndRegister逻辑结束后,由上面可以知道,实际上的register注册逻辑是一个异步操作,在register0中完成
    根据ChannelFuture来判断异步操作是否完成,如果isDone,则表明异步操作先完成,即完成了safeSetSuccess方法,
    然后调用newPromise方法:

    1 public ChannelPromise newPromise() {
    2     return pipeline.newPromise();
    3 }

    给channel的pipeline绑定异步操作ChannelPromise
    然后调用doBind0方法完成ServerSocket的绑定,若是register0这个异步操作还没完成,就需要给ChannelFuture产生一个异步操作的侦听ChannelFutureListener对象,等到register0方法调用safeSetSuccess时,在promise的trySuccess中会回调ChannelFutureListener的operationComplete方法,进而调用doBind0方法

    doBind0方法:

     1 private static void doBind0(
     2         final ChannelFuture regFuture, final Channel channel,
     3         final SocketAddress localAddress, final ChannelPromise promise) {
     4     channel.eventLoop().execute(new Runnable() {
     5         @Override
     6         public void run() {
     7             if (regFuture.isSuccess()) {
     8                 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
     9             } else {
    10                 promise.setFailure(regFuture.cause());
    11             }
    12         }
    13     });
    14 }

    向轮询线程提交了一个任务,异步处理bind,可以看到,只有在regFuture异步操作成功结束后,调用channel的bind方法:

    1 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    2    return pipeline.bind(localAddress, promise);
    3 }

    实际上的bind又交给pipeline,去完成,pipeline中就会交给责任链去完成,最终会交给HeadContext完成:

    1 public void bind(
    2                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
    3                 throws Exception {
    4     unsafe.bind(localAddress, promise);
    5 }

    可以看到,绕了一大圈,交给了unsafe完成:

     1 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
     2     assertEventLoop();
     3 
     4     if (!promise.setUncancellable() || !ensureOpen(promise)) {
     5         return;
     6     }
     7     
     8     if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
     9         localAddress instanceof InetSocketAddress &&
    10         !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
    11         !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
    12         logger.warn(
    13                 "A non-root user can't receive a broadcast packet if the socket " +
    14                 "is not bound to a wildcard address; binding to a non-wildcard " +
    15                 "address (" + localAddress + ") anyway as requested.");
    16     }
    17 
    18     boolean wasActive = isActive();
    19     try {
    20         doBind(localAddress);
    21     } catch (Throwable t) {
    22         safeSetFailure(promise, t);
    23         closeIfClosed();
    24         return;
    25     }
    26 
    27     if (!wasActive && isActive()) {
    28         invokeLater(new Runnable() {
    29             @Override
    30             public void run() {
    31                 pipeline.fireChannelActive();
    32             }
    33         });
    34     }
    35 
    36     safeSetSuccess(promise);
    37 }

    然而,真正的bind还是回调了doBind方法,最终是由NioServerSocketChannel来实现:

    1 @Override
    2 protected void doBind(SocketAddress localAddress) throws Exception {
    3     if (PlatformDependent.javaVersion() >= 7) {
    4         javaChannel().bind(localAddress, config.getBacklog());
    5     } else {
    6         javaChannel().socket().bind(localAddress, config.getBacklog());
    7     }
    8 }

    在这里终于完成了对JDK的ServerSocketChannel的bind操作


    在上面的

    1 if (!wasActive && isActive()) {
    2     invokeLater(new Runnable() {
    3         @Override
    4         public void run() {
    5             pipeline.fireChannelActive();
    6         }
    7     });
    8 }

    这个判断,就是确保在register0中isActive时,还没完成绑定,也就没有beginRead操作来向Selector注册ACCEPT事件,那么就在这里进行注册,进而让ServerSocket去侦听客户端的连接


    在服务端ACCEPT到客户端的连接后,在NioEventLoop轮询中,就会调用processSelectedKeys处理ACCEPT的事件就绪,然后交给unsafe的read去处理  Netty中NioEventLoopGroup的创建源码分析

    在服务端,由NioMessageUnsafe实现:

     1 public void read() {
     2         assert eventLoop().inEventLoop();
     3         final ChannelConfig config = config();
     4         final ChannelPipeline pipeline = pipeline();
     5         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
     6         allocHandle.reset(config);
     7 
     8         boolean closed = false;
     9         Throwable exception = null;
    10         try {
    11             try {
    12                 do {
    13                     int localRead = doReadMessages(readBuf);
    14                     if (localRead == 0) {
    15                         break;
    16                     }
    17                     if (localRead < 0) {
    18                         closed = true;
    19                         break;
    20                     }
    21 
    22                     allocHandle.incMessagesRead(localRead);
    23                 } while (allocHandle.continueReading());
    24             } catch (Throwable t) {
    25                 exception = t;
    26             }
    27 
    28             int size = readBuf.size();
    29             for (int i = 0; i < size; i ++) {
    30                 readPending = false;
    31                 pipeline.fireChannelRead(readBuf.get(i));
    32             }
    33             readBuf.clear();
    34             allocHandle.readComplete();
    35             pipeline.fireChannelReadComplete();
    36 
    37             if (exception != null) {
    38                 closed = closeOnReadError(exception);
    39 
    40                 pipeline.fireExceptionCaught(exception);
    41             }
    42 
    43             if (closed) {
    44                 inputShutdown = true;
    45                 if (isOpen()) {
    46                     close(voidPromise());
    47                 }
    48             }
    49         } finally {
    50             if (!readPending && !config.isAutoRead()) {
    51                 removeReadOp();
    52             }
    53         }
    54     }
    55 }

    核心在doReadMessages方法,由NioServerSocketChannel实现:

     1 protected int doReadMessages(List<Object> buf) throws Exception {
     2     SocketChannel ch = SocketUtils.accept(javaChannel());
     3 
     4     try {
     5         if (ch != null) {
     6             buf.add(new NioSocketChannel(this, ch));
     7             return 1;
     8         }
     9     } catch (Throwable t) {
    10         logger.warn("Failed to create a new channel from an accepted socket.", t);
    11 
    12         try {
    13             ch.close();
    14         } catch (Throwable t2) {
    15             logger.warn("Failed to close a socket.", t2);
    16         }
    17     }
    18 
    19     return 0;
    20 }

    SocketUtils的accept方法其实就是用来调用JDK中ServerSocketChannel原生的accept方法,来得到一个JDK的SocketChannel对象,然后通过这个SocketChannel对象,将其包装成NioSocketChannel对象添加在buf这个List中

    由此可以看到doReadMessages用来侦听所有就绪的连接,包装成NioSocketChannel将其放在List中
    然后遍历这个List,调用 NioServerSocketChannel的pipeline的fireChannelRead方法,传递channelRead请求,、
    在前面向pipeline中添加了ServerBootstrapAcceptor这个ChannelHandler,此时,它也会响应这个请求,回调channelRead方法:

     1 public void channelRead(ChannelHandlerContext ctx, Object msg) {
     2     final Channel child = (Channel) msg;
     3 
     4     child.pipeline().addLast(childHandler);
     5 
     6     setChannelOptions(child, childOptions, logger);
     7 
     8     for (Entry<AttributeKey<?>, Object> e: childAttrs) {
     9         child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    10     }
    11 
    12     try {
    13         childGroup.register(child).addListener(new ChannelFutureListener() {
    14             @Override
    15             public void operationComplete(ChannelFuture future) throws Exception {
    16                 if (!future.isSuccess()) {
    17                     forceClose(child, future.cause());
    18                 }
    19             }
    20         });
    21     } catch (Throwable t) {
    22         forceClose(child, t);
    23     }
    24 }

    msg就是侦听到的NioSocketChannel对象,给该对象的pipeline添加childHandler,也就是我们在ServerBootstrap中通过childHandler方法添加的
    然后通过register方法完成对NioSocketChannel的注册(和NioServerSocketChannel注册逻辑一样)


    至此Netty服务端的启动结束。

  • 相关阅读:
    微信分享链接出现config:invalid signature错误的解决方法
    微信开发,分享部分出现的问题
    thinkphp 3.2 去除调试模式后报错,怎么解决
    MySQL添加新用户、为用户创建数据库、为新用户分配权限
    xshell工具source导入几个G的数据库
    thinkphp5引入公共部分header、footer等
    用样本估计总体
    随机抽样
    平面几何相关定理
    直线和曲线相切,曲线和曲线相切
  • 原文地址:https://www.cnblogs.com/a526583280/p/10969275.html
Copyright © 2011-2022 走看看