zoukankan      html  css  js  c++  java
  • Netty源码学习系列之4-ServerBootstrap的bind方法

    前言

        今天研究ServerBootstrap的bind方法,该方法可以说是netty的重中之重、核心中的核心。前两节的NioEventLoopGroup和ServerBootstrap的初始化就是为bind做准备。照例粘贴一下这个三朝元老的demo,开始本文内容。

     1 public class NettyDemo1 {
     2     // netty服务端的一般性写法
     3     public static void main(String[] args) {
     4         EventLoopGroup boss = new NioEventLoopGroup(1);
     5         EventLoopGroup worker = new NioEventLoopGroup();
     6         try {
     7             ServerBootstrap bootstrap = new ServerBootstrap();
     8             bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
     9                     .option(ChannelOption.SO_BACKLOG, 100)
    10                     .handler(new NettyServerHandler())
    11                     .childHandler(new ChannelInitializer<SocketChannel>() {
    12                         @Override
    13                         protected void initChannel(SocketChannel socketChannel) throws Exception {
    14                             ChannelPipeline pipeline = socketChannel.pipeline();
    15                             pipeline.addLast(new StringDecoder());
    16                             pipeline.addLast(new StringEncoder());
    17                             pipeline.addLast(new NettyServerHandler());
    18                         }
    19                     });
    20             ChannelFuture channelFuture = bootstrap.bind(90);
    21             channelFuture.channel().closeFuture().sync();
    22         } catch (Exception e) {
    23             e.printStackTrace();
    24         } finally {
    25             boss.shutdownGracefully();
    26             worker.shutdownGracefully();
    27         }
    28     }
    29 }

    一、bind及doBind方法

    1.ServerBootstrap.bind方法

        该方法有多个重载方法,但核心作用只有一个,就是将参数转为InetSocketAddress对象传给 --->

    1 public ChannelFuture bind(int inetPort) {
    2         return bind(new InetSocketAddress(inetPort));
    3     }
    1 public ChannelFuture bind(String inetHost, int inetPort) {
    2         return bind(SocketUtils.socketAddress(inetHost, inetPort));
    3     }
    1 public ChannelFuture bind(InetAddress inetHost, int inetPort) {
    2         return bind(new InetSocketAddress(inetHost, inetPort));
    3     }

        下面这个bind方法,在该方法中调用了doBind方法。

    1 public ChannelFuture bind(SocketAddress localAddress) {
    2         validate();
    3         return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    4     }

    2、ServerBootstrap的doBind方法

        doBind方法位于父类AbstractBootstrap中,它有两大功能,均在下面代码中标识了出来,它们分别对应通过原生nio进行server端初始化时的两个功能,第1步对应将channel注册到selector上;第2步对应将server地址绑定到channel上。

     1 private ChannelFuture doBind(final SocketAddress localAddress) {
     2         final ChannelFuture regFuture = initAndRegister(); // 1)、初始化和注册,重要***
     3         final Channel channel = regFuture.channel();
     4         if (regFuture.cause() != null) {
     5             return regFuture;
     6         }
     7 
     8         if (regFuture.isDone()) {
     9             // At this point we know that the registration was complete and successful.
    10             ChannelPromise promise = channel.newPromise();
    11             doBind0(regFuture, channel, localAddress, promise); // 2)、将SocketAddress和channel绑定起来,最终执行的是nio中的功能,重要**
    12             return promise;
    13         } else {
    14             // 省略异常判断、添加监听器和异步调用doBind0方法
    15         }
    16     }

        为方便关联对照,下面再粘贴一个简单的原生NIO编程的服务端初始化方法,其实doBind方法的逻辑基本就是对下面这个方法的封装,只是增加了很多附加功能。

        因为上述两步都有些复杂,所以此处分两部分进行追踪。

    二、AbstractBootstrap的initAndRegister方法

         该方法代码如下所示,一共有三个核心方法,逻辑比较清晰,将channel new出来,初始化它,然后注册到selector上。下面我们各个击破。

     1 final ChannelFuture initAndRegister() {
     2         Channel channel = null;
     3         try { // 1)、实例化channel,作为服务端初始化的是NioServerSocketChannel
     4             channel = channelFactory.newChannel();
     5             init(channel); // 2)、初始化channel,即给channel中的属性赋值
     6         } catch (Throwable t) {
     7             if (channel != null) {
     8                 channel.unsafe().closeForcibly();
     9                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    10             }
    11             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    12         }
    13         // 3)、注册,即最终是将channel 注册到selector上
    14         ChannelFuture regFuture = config().group().register(channel);
    15         if (regFuture.cause() != null) {
    16             if (channel.isRegistered()) {
    17                 channel.close();
    18             } else {
    19                 channel.unsafe().closeForcibly();
    20             }
    21         }
    22         return regFuture;
    23     }

    1、channelFactory.newChannel()方法

    1 @Override
    2     public T newChannel() {
    3         try {
    4             return constructor.newInstance();
    5         } catch (Throwable t) {
    6             throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    7         }
    8     }

        该方法完成了channel的实例化,channelFactory的赋值可参见上一篇博文【Netty源码学习系列之3-ServerBootstrap的初始化】(地址 https://www.cnblogs.com/zzq6032010/p/13027161.html),对服务端来说,这里channelFactory值为ReflectiveChannelFactory,且其内部的constructor是NioServerSocketChannel的无参构造器,下面追踪NioServerSocketChannel的无参构造方法。

    1.1)、new NioServerSocketChannel()

    1 public NioServerSocketChannel() {
    2         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    3     }
     1 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
     2 
     3 private static ServerSocketChannel newSocket(SelectorProvider provider) {
     4         try {
     5             return provider.openServerSocketChannel();
     6         } catch (IOException e) {
     7             throw new ChannelException(
     8                     "Failed to open a server socket.", e);
     9         }
    10     }

        可见,它先通过newSocket方法获取nio原生的ServerSocketChannel,然后传给了重载构造器,如下,其中第三行是对NioServerSocketChannelConfig  config进行了赋值,逻辑比较简单,下面主要看对父类构造方法的调用。

    1 public NioServerSocketChannel(ServerSocketChannel channel) {
    2         super(null, channel, SelectionKey.OP_ACCEPT);
    3         config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    4     }

    1.2)、对NioServerSocketChannel父类构造方法的调用

     1 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
     2         super(parent);
     3         this.ch = ch;
     4         this.readInterestOp = readInterestOp;
     5         try {
     6             ch.configureBlocking(false);
     7         } catch (IOException e) {
     8             try {
     9                 ch.close();
    10             } catch (IOException e2) {
    11                 if (logger.isWarnEnabled()) {
    12                     logger.warn(
    13                             "Failed to close a partially initialized socket.", e2);
    14                 }
    15             }
    16 
    17             throw new ChannelException("Failed to enter non-blocking mode.", e);
    18         }
    19     }

        中间经过了AbstractNioMessageChannel,然后调到下面AbstractNioChannel的构造方法。此时parent为null,ch为上面获取到的nio原生ServerSocketChannel,readInterestOp为SelectionKey的Accept事件(值为16)。可以看到,将原生渠道ch赋值、感兴趣的事件readInterestOp赋值、设置非阻塞。然后重点看对父类构造器的调用。

    1.3)、AbstractChannel构造器

    1 protected AbstractChannel(Channel parent) {
    2         this.parent = parent;
    3         id = newId();
    4         unsafe = newUnsafe();
    5         pipeline = newChannelPipeline();
    6     }

        可以看到,此构造方法只是给四个属性进行了赋值,我们挨个看下这四个属性。

        第一个属性是this.parent,类型为io.netty.channel.Channel,但此时值为null;

        第二个属性id类型为io.netty.channel.ChannelId,就是一个id生成器,值为new DefaultChannelId();

        第三个属性unsafe类型为io.netty.channel.Channel.Unsafe,该属性很重要,封装了对事件的处理逻辑,最终调用的是AbstractNioMessageChannel中的newUnsafe方法,赋的值为new NioMessageUnsafe();

        第四个属性pipeline类型为io.netty.channel.DefaultChannelPipeline,该属性很重要,封装了handler处理器的逻辑,赋的值为 new DefaultChannelPipeline(this)  this即当前的NioServerSocketChannel对象。

        其中DefaultChannelPipeline的构造器需要额外看一下,如下,将NioServerSocketChannel对象存入channel属性,然后初始化了tail、head两个成员变量,且对应的前后指针指向对方。TailContext和HeadContext都继承了AbstractChannelHandlerContext,在这个父类里面维护了next和prev两个双向指针,看到这里有经验的园友应该一下子就能看出来,DefaultChannelPipeline内部维护了一个双向链表。

     1 protected DefaultChannelPipeline(Channel channel) {
     2         this.channel = ObjectUtil.checkNotNull(channel, "channel");
     3         succeededFuture = new SucceededChannelFuture(channel, null);
     4         voidPromise =  new VoidChannelPromise(channel, true);
     5 
     6         tail = new TailContext(this);
     7         head = new HeadContext(this);
     8 
     9         head.next = tail;
    10         tail.prev = head;
    11     }

         至此,完成了上面initAndRegister方法中的第一个功能:channel的实例化。此时NioServerSocketChannel的几个父类属性快照图如下所示:

    2、init(channel)方法

        init(channel)方法位于ServerBootstrap中(因为这里是通过ServerBootstrap过来的,如果是通过Bootstrap进入的这里则调用的就是Bootstrap中的init方法),主要功能如下注释所示。本质都是针对channel进行初始化,初始化channel中的option、attr和pipeline。

     1 void init(Channel channel) throws Exception {
     2         // 1、获取AbstractBootstrap中的options属性,与channel进行关联
     3         final Map<ChannelOption<?>, Object> options = options0();
     4         synchronized (options) {
     5             setChannelOptions(channel, options, logger);
     6         }
     7         // 2、获取AbstractBootstrap中的attr属性,与channel关联起来
     8         final Map<AttributeKey<?>, Object> attrs = attrs0();
     9         synchronized (attrs) {
    10             for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
    11                 @SuppressWarnings("unchecked")
    12                 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
    13                 channel.attr(key).set(e.getValue());
    14             }
    15         }
    16         // 3、获取pipeline,并将一个匿名handler对象添加进去,重要***
    17         ChannelPipeline p = channel.pipeline();
    18         final EventLoopGroup currentChildGroup = childGroup;
    19         final ChannelHandler currentChildHandler = childHandler;
    20         final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    21         final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    22         synchronized (childOptions) {
    23             currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    24         }
    25         synchronized (childAttrs) {
    26             currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    27         }
    28         p.addLast(new ChannelInitializer<Channel>() {
    29             @Override
    30             public void initChannel(final Channel ch) throws Exception {
    31                 final ChannelPipeline pipeline = ch.pipeline();
    32                 ChannelHandler handler = config.handler();
    33                 if (handler != null) {
    34                     pipeline.addLast(handler);
    35                 }
    36 
    37                 ch.eventLoop().execute(new Runnable() {
    38                     @Override
    39                     public void run() {
    40                         pipeline.addLast(new ServerBootstrapAcceptor(
    41                                 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    42                     }
    43                 });
    44             }
    45         });
    46     }

        1跟2的功能都比较容易理解,功能3是init的核心,虽然代码不少但很容易理解,它就是往channel中的pipeline里添加了一个匿名handler对象,其initChannel方法只有在有客户端连接接入时才会调用,initChannel方法的功能是什么呢?可以看到,它就是往入参channel中的eventLoop里添加了一个任务,这个任务的功能就是往pipeline中再添加一个handler,最后添加的这个handler就不是匿名的了,它是ServerBootstrapAcceptor对象。因为这里的initChannel方法和后面的run方法都是有客户端接入时才会调用的,所以这里只是提一下,后面会详述。至此完成init方法,下面进入register。

    3、config().group().register(channel)方法

     3.1)、config().group()方法

        由前面可以知道,config().group().register(channel)这行代码位于AbstractBootstrap类中的initAndRegister方法中,但由于当前对象是ServerBootstrap,故此处config()方法实际调用的都是ServerBootstrap中重写的方法,得到了ServerBootstrapConfig。

        ServerBootstrapConfig的group方法如下,调用的是它的父类AbstractBootstrapConfig中的方法。通过类名就能知道,ServerBootstrapConfig中的方法是获取ServerBootstrap中的属性,而AbstractBootstrapConfig中的方法是获取AbstractBootstrap中的属性,两两对应。故此处获取的EventLoopGroup就是AbstractBootstrap中存放的group,即文章开头demo中的boss对象。

    1 public final EventLoopGroup group() {
    2         return bootstrap.group();
    3     }

        获取到了名叫boss的这个NioEventLoopGroup对象,下面追踪NioEventLoopGroup.register(channel)方法

    3.2)、 NioEventLoopGroup.register(channel)方法

        该方法是对之前初始化属性的应用,需结合NioEventLoopGroup的初始化流程看,详见【Netty源码学习系列之2-NioEventLoopGroup的初始化】(链接【https://www.cnblogs.com/zzq6032010/p/12872989.html】)一文,此处就不赘述了,下面把该类的继承类图粘贴出来,以便有个整体认识。

    3.2.1)、next()方法 

        下面的register方法位于MultithreadEventLoopGroup类中,是NioEventLoopGroup的直接父类,如下:

    1 public ChannelFuture register(Channel channel) {
    2         return next().register(channel);
    3     }

        next方法如下,调用了父类的next方法,下面的就是父类MultithreadEventExecutorGroup中的next实现,可以看到调用的是chooser的next方法。通过初始化流程可知,此处boss的线程数是1,是2的n次方,所以chooser就是PowerOfTwoEventExecutorChooser,通过next方法从EventExecutor[]中选择一个对象。需要注意的是chooser.next()通过轮询的方式选择的对象。

    1 public EventLoop next() {
    2         return (EventLoop) super.next();
    3     }
    1 public EventExecutor next() {
    2         return chooser.next();
    3     }

    3.2.2)、NioEventLoop.register方法

        next之后是register方法,中间将NioServerSocketChannel和当前的NioEventLoop封装成一个DefaultChannelPromise对象往下传递,在下面第二个register方法中可以看到,实际上调用的是NioServerSocketChannel中的unsafe属性的register方法。

    1 public ChannelFuture register(Channel channel) {
    2         return register(new DefaultChannelPromise(channel, this));
    3     }
    1 public ChannelFuture register(final ChannelPromise promise) {
    2         ObjectUtil.checkNotNull(promise, "promise");
    3         promise.channel().unsafe().register(this, promise);
    4         return promise;
    5     }

    3.2.3)、NioMessageUnsafe的register方法

        通过本文第一部分中第1步中的1.3)可以知道,NioServerSocketChannel中的unsafe是NioMessageUnsafe对象,下面继续追踪其register方法:

     1 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
     2             if (eventLoop == null) {// 判断非空
     3                 throw new NullPointerException("eventLoop");
     4             }
     5             if (isRegistered()) {// 判断是否注册
     6                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
     7                 return;
     8             }
     9             if (!isCompatible(eventLoop)) {// 判断eventLoop类型是否匹配
    10                 promise.setFailure(
    11                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    12                 return;
    13             }
    14        // 完成eventLoop属性的赋值
    15             AbstractChannel.this.eventLoop = eventLoop;
    16             // 判断eventLoop中的Reactor线程是不是当前线程 ***重要1
    17             if (eventLoop.inEventLoop()) {
    18                 register0(promise); // 进行注册
    19             } else {
    20                 try {// 不是当前线程则将register0任务放入eventLoop队列中让Reactor线程执行(如果Reactor线程未初始化还要将其初始化) ***重要2
    21                     eventLoop.execute(new Runnable() {
    22                         @Override
    23                         public void run() {
    24                             register0(promise);// 注册逻辑 ***重要3
    25                         }
    26                     });
    27                 } catch (Throwable t) {
    28                     // 省略异常处理
    29                 }
    30             }
    31         }

        该方法位于io.netty.channel.AbstractChannel.AbstractUnsafe中(它是NioMessageUnsafe的父类),根据注释能了解每一步做了什么,但如果要理解代码逻辑意图则需要结合netty的串行无锁化(串行无锁化参见博主的netty系列第一篇文章https://www.cnblogs.com/zzq6032010/p/12872993.html)。它实际就是让每一个NioEventLoop对象的thread属性记录一条线程,用来循环执行NioEventLoop的run方法,后续这个channel上的所有事件都由这一条线程来执行,如果当前线程不是Reactor线程,则会将任务放入队列中,Reactor线程会不断从队列中获取任务执行。这样以来,所有事件都由一条线程顺序处理,线程安全,也就不需要加锁了。

        说完整体思路,再来结合代码看看。上述代码中标识【***重要1】的地方就是通过inEventLoop方法判断eventLoop中的thread属性记录的线程是不是当前线程:

        先调到父类AbstractEventExecutor中,获取了当前线程:

    1 public boolean inEventLoop() {
    2         return inEventLoop(Thread.currentThread());
    3     }

        然后调到SingleThreadEventExecutor类中的方法,如下,比对thread与当前线程是否是同一个:

    1 public boolean inEventLoop(Thread thread) {
    2         return thread == this.thread;
    3     }

        此时thread未初始化,所以肯定返回false,则进入【***重点2】的逻辑,将register放入run方法中封装成一个Runnable任务,然后执行execute方法,如下,该方法位于SingleThreadEventExecutor中:

     1 public void execute(Runnable task) {
     2         if (task == null) {
     3             throw new NullPointerException("task");
     4         }
     5 
     6         boolean inEventLoop = inEventLoop();
     7         addTask(task); //将任务放入队列中 ***重要a
     8         if (!inEventLoop) {
     9             startThread(); //判断当前线程不是thread线程,则调用该方法 ***重要b
    10             if (isShutdown()) {
    11                 boolean reject = false;
    12                 try {
    13                     if (removeTask(task)) {
    14                         reject = true;
    15                     }
    16                 } catch (UnsupportedOperationException e) {
    17                     // 省略注释
    18                 }
    19                 if (reject) {
    20                     reject();
    21                 }
    22             }
    23         }
    24 
    25         if (!addTaskWakesUp && wakesUpForTask(task)) {
    26             wakeup(inEventLoop);
    27         }
    28     }

        有两个重要的逻辑,已经在上面代码中标出,先看看【***重要a】,如下,可见最终就是往SingleThreadEventExecutor的taskQueue队列中添加了一个任务,如果添加失败则调reject方法执行拒绝策略,通过前文分析可以知道,此处的拒绝策略就是直接抛错。

    1 protected void addTask(Runnable task) {
    2         if (task == null) {
    3             throw new NullPointerException("task");
    4         }
    5         if (!offerTask(task)) {
    6             reject(task);
    7         }
    8     }
    1 final boolean offerTask(Runnable task) {
    2         if (isShutdown()) {
    3             reject();
    4         }
    5         return taskQueue.offer(task);
    6     }

        然后在看【***重要b】,如下,该方法虽然叫startThread,但内部有控制,不能无脑开启线程,因为调这个方法的时候会有两种情况:1).thread变量为空;2).thread不为空且不是当前线程。第一种情况需要开启新的线程,但第二种情况就不能直接创建线程了。所以看下面代码可以发现,它内部通过CAS+volatile(state属性加了volatile修饰)实现的开启线程的原子控制,保证多线程情况下也只会有一个线程进入doStartThread()方法。

     1 private void startThread() {
     2         if (state == ST_NOT_STARTED) {
     3             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
     4                 boolean success = false;
     5                 try {
     6                     doStartThread();
     7                     success = true;
     8                 } finally {
     9                     if (!success) {
    10                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
    11                     }
    12                 }
    13             }
    14         }
    15     }

        继续往下看一下doStartThread()的方法逻辑:

     1 private void doStartThread() {
     2         assert thread == null;
     3         executor.execute(new Runnable() { //此处的executor内部执行的就是ThreadPerTaskExecutor的execute逻辑,创建一个新线程运行下面的run方法
     4             @Override
     5             public void run() {
     6                 thread = Thread.currentThread(); //将Reactor线程记录到thread变量中,保证一个NioEventLoop只有一个主线程在运行
     7                 if (interrupted) {
     8                     thread.interrupt();
     9                 }
    10 
    11                 boolean success = false;
    12                 updateLastExecutionTime();
    13                 try {
    14                     SingleThreadEventExecutor.this.run(); //调用当前对象的run方法,该run方法就是Reactor线程的核心逻辑方法,后面会重点研究
    15                     success = true;
    16                 } catch (Throwable t) {
    17                     logger.warn("Unexpected exception from an event executor: ", t);
    18                 } finally {
    19                    // 省略无关逻辑
    20                 }
    21             }
    22         });
    23     }

        可以看到,在上面的方法中完成了Reactor线程thread的赋值和核心逻辑NioEventLoop中run方法的启动。这个run方法启动后,第一步做的事情是什么?让我们往前回溯,回到3.2.3),当然是执行当初封装了 register0方法的那个run方法的任务,即执行register0方法,下面填之前埋得坑,对【***重要3】进行追踪:

     1 private void register0(ChannelPromise promise) {
     2             try {
     3                 // 省略判断逻辑
     4                 boolean firstRegistration = neverRegistered;
     5                 doRegister();// 执行注册逻辑
     6                 neverRegistered = false;
     7                 registered = true;
     8                 pipeline.invokeHandlerAddedIfNeeded();// 调用pipeline的逻辑
     9 
    10                 safeSetSuccess(promise);
    11                 pipeline.fireChannelRegistered();
    12                 // 省略无关逻辑
    13             } catch (Throwable t) {
    14                 // 省略异常处理
    15             }
    16         }

        doRegister()方法的实现在AbstractNioChannel中,如下,就是完成了nio中的注册,将nio的ServerSocketChannel注册到selector上:

     1 protected void doRegister() throws Exception {
     2         boolean selected = false;
     3         for (;;) {
     4             try {
     5                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
     6                 return;
     7             } catch (CancelledKeyException e) {
     8                // 省略异常处理
     9             }
    10         }
    11     }

        再看pipeline.invokeHandlerAddedIfNeeded()方法,该方法调用链路比较长,此处就不详细粘贴了,只是说一下流程。回顾下上面第二部分的第2步,在里面最后addLast了一个匿名的内部对象,重写了initChannel方法,此处通过pipeline.invokeHandlerAddedIfNeeded()方法就会调用到这个匿名对象的initChannel方法(只有第一次注册时才会调),该方法往pipeline中又添加了一个ServerBootstrapAcceptor对象。执行完方法后,netty会在finally中将之前那个匿名内部对象给remove掉,这时pipeline中的handler如下所示:

         至此,算是基本完成了initAndRegister方法的逻辑,当然限于篇幅(本篇已经够长了),其中还有很多细节性的处理未提及。

    三、AbstractBootstrap的doBind0方法

         doBind0方法逻辑如下所示,new了一个Runnable任务交给Reactor线程执行,execute执行过程已经分析过了,此处不再赘述,集中下所剩无几的精力看下run方法中的bind逻辑。

     1 private static void doBind0(
     2             final ChannelFuture regFuture, final Channel channel,
     3             final SocketAddress localAddress, final ChannelPromise promise) {
     4 
     5         channel.eventLoop().execute(new Runnable() {
     6             @Override
     7             public void run() {
     8                 if (regFuture.isSuccess()) {
     9                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    10                 } else {
    11                     promise.setFailure(regFuture.cause());
    12                 }
    13             }
    14         });
    15     }

        channel.bind方法,如下:

    1 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    2         return pipeline.bind(localAddress, promise);
    3     }

        调用了pipeline的bind方法:

    1 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    2         return tail.bind(localAddress, promise);
    3     }

        tail.bind方法:

     1 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
     2         if (localAddress == null) {
     3             throw new NullPointerException("localAddress");
     4         }
     5         if (isNotValidPromise(promise, false)) {
     6             // cancelled
     7             return promise;
     8         }
     9         // 从tail开始往前,找到第一个outbond的handler,这时只有head满足要求,故此处next是head
    10         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    11         EventExecutor executor = next.executor();
    12         if (executor.inEventLoop()) {// 因为当前线程就是executor中的Reactor线程,所以直接进入invokeBind方法
    13             next.invokeBind(localAddress, promise);
    14         } else {
    15             safeExecute(executor, new Runnable() {
    16                 @Override
    17                 public void run() {
    18                     next.invokeBind(localAddress, promise);
    19                 }
    20             }, promise, null);
    21         }
    22         return promise;
    23     }

        下面进入head.invokeBind方法:

     1 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
     2         if (invokeHandler()) {
     3             try {
     4                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
     5             } catch (Throwable t) {
     6                 notifyOutboundHandlerException(t, promise);
     7             }
     8         } else {
     9             bind(localAddress, promise);
    10         }
    11     }

        核心逻辑就是handler.bind方法,继续追踪:

    1 public void bind(
    2                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    3             unsafe.bind(localAddress, promise);
    4         }

        此处的unsafe是NioMessageUnsafe,继续追踪会看到在bind方法中又调用了NioServerSocketChannel中的doBind方法,最终在这里完成了nio原生ServerSocketChannel和address的绑定:

    1 protected void doBind(SocketAddress localAddress) throws Exception {
    2         if (PlatformDependent.javaVersion() >= 7) {
    3             javaChannel().bind(localAddress, config.getBacklog());
    4         } else {
    5             javaChannel().socket().bind(localAddress, config.getBacklog());
    6         }
    7     }

        至此,ServerBootstrap的bind方法完成。

    小结

        本文从头到尾追溯了ServerBootstrap中bind方法的逻辑,将前面netty系列中的二、三两篇初始化给串联了起来,是承上启下的一个位置。后面的netty系列将围绕本文中启动的NioEventLoop.run方法展开,可以这么说,本文跟前面的三篇只是为run方法的出现做的一个铺垫,run方法才是核心功能的逻辑所在。

        本文断断续续更新了一周,今天才完成,也没想到会这么长,就这样吧,后面继续netty run方法的学习。

  • 相关阅读:
    Windows Azure Storage (17) Azure Storage读取访问地域冗余(Read Access – Geo Redundant Storage, RA-GRS)
    SQL Azure (15) SQL Azure 新的规格
    Azure China (5) 管理Azure China Powershell
    Azure China (4) 管理Azure China Storage Account
    Azure China (3) 使用Visual Studio 2013证书发布Cloud Service至Azure China
    Azure China (2) Azure China管理界面初探
    Azure China (1) Azure公有云落地中国
    SQL Azure (14) 将云端SQL Azure中的数据库备份到本地SQL Server
    [New Portal]Windows Azure Virtual Machine (23) 使用Storage Space,提高Virtual Machine磁盘的IOPS
    Android数据库升级、降级、创建(onCreate() onUpgrade() onDowngrade())的注意点
  • 原文地址:https://www.cnblogs.com/zzq6032010/p/13034608.html
Copyright © 2011-2022 走看看