zoukankan      html  css  js  c++  java
  • Netty源码学习系列之4-ServerBootstrap的bind方法

    前言

        今天研究ServerBootstrap的bind方法,该方法可以说是netty的重中之重、核心中的核心。前两节的NioEventLoopGroup和ServerBootstrap的初始化就是为bind做准备。照例粘贴一下这个三朝元老的demo,开始本文内容。

     1 public class NettyDemo1 {
     2     // netty服务端的一般性写法
     3     public static void main(String[] args) {
     4         EventLoopGroup boss = new NioEventLoopGroup(1);
     5         EventLoopGroup worker = new NioEventLoopGroup();
     6         try {
     7             ServerBootstrap bootstrap = new ServerBootstrap();
     8             bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
     9                     .option(ChannelOption.SO_BACKLOG, 100)
    10                     .handler(new NettyServerHandler())
    11                     .childHandler(new ChannelInitializer<SocketChannel>() {
    12                         @Override
    13                         protected void initChannel(SocketChannel socketChannel) throws Exception {
    14                             ChannelPipeline pipeline = socketChannel.pipeline();
    15                             pipeline.addLast(new StringDecoder());
    16                             pipeline.addLast(new StringEncoder());
    17                             pipeline.addLast(new NettyServerHandler());
    18                         }
    19                     });
    20             ChannelFuture channelFuture = bootstrap.bind(90);
    21             channelFuture.channel().closeFuture().sync();
    22         } catch (Exception e) {
    23             e.printStackTrace();
    24         } finally {
    25             boss.shutdownGracefully();
    26             worker.shutdownGracefully();
    27         }
    28     }
    29 }

    一、bind及doBind方法

    1.ServerBootstrap.bind方法

        该方法有多个重载方法,但核心作用只有一个,就是将参数转为InetSocketAddress对象传给 --->

    1 public ChannelFuture bind(int inetPort) {
    2         return bind(new InetSocketAddress(inetPort));
    3     }
    1 public ChannelFuture bind(String inetHost, int inetPort) {
    2         return bind(SocketUtils.socketAddress(inetHost, inetPort));
    3     }
    1 public ChannelFuture bind(InetAddress inetHost, int inetPort) {
    2         return bind(new InetSocketAddress(inetHost, inetPort));
    3     }

        下面这个bind方法,在该方法中调用了doBind方法。

    1 public ChannelFuture bind(SocketAddress localAddress) {
    2         validate();
    3         return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    4     }

    2、ServerBootstrap的doBind方法

        doBind方法位于父类AbstractBootstrap中,它有两大功能,均在下面代码中标识了出来,它们分别对应通过原生nio进行server端初始化时的两个功能,第1步对应将channel注册到selector上;第2步对应将server地址绑定到channel上。

     1 private ChannelFuture doBind(final SocketAddress localAddress) {
     2         final ChannelFuture regFuture = initAndRegister(); // 1)、初始化和注册,重要***
     3         final Channel channel = regFuture.channel();
     4         if (regFuture.cause() != null) {
     5             return regFuture;
     6         }
     7 
     8         if (regFuture.isDone()) {
     9             // At this point we know that the registration was complete and successful.
    10             ChannelPromise promise = channel.newPromise();
    11             doBind0(regFuture, channel, localAddress, promise); // 2)、将SocketAddress和channel绑定起来,最终执行的是nio中的功能,重要**
    12             return promise;
    13         } else {
    14             // 省略异常判断、添加监听器和异步调用doBind0方法
    15         }
    16     }

        为方便关联对照,下面再粘贴一个简单的原生NIO编程的服务端初始化方法,其实doBind方法的逻辑基本就是对下面这个方法的封装,只是增加了很多附加功能。

        因为上述两步都有些复杂,所以此处分两部分进行追踪。

    二、AbstractBootstrap的initAndRegister方法

         该方法代码如下所示,一共有三个核心方法,逻辑比较清晰,将channel new出来,初始化它,然后注册到selector上。下面我们各个击破。

     1 final ChannelFuture initAndRegister() {
     2         Channel channel = null;
     3         try { // 1)、实例化channel,作为服务端初始化的是NioServerSocketChannel
     4             channel = channelFactory.newChannel();
     5             init(channel); // 2)、初始化channel,即给channel中的属性赋值
     6         } catch (Throwable t) {
     7             if (channel != null) {
     8                 channel.unsafe().closeForcibly();
     9                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    10             }
    11             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    12         }
    13         // 3)、注册,即最终是将channel 注册到selector上
    14         ChannelFuture regFuture = config().group().register(channel);
    15         if (regFuture.cause() != null) {
    16             if (channel.isRegistered()) {
    17                 channel.close();
    18             } else {
    19                 channel.unsafe().closeForcibly();
    20             }
    21         }
    22         return regFuture;
    23     }

    1、channelFactory.newChannel()方法

    1 @Override
    2     public T newChannel() {
    3         try {
    4             return constructor.newInstance();
    5         } catch (Throwable t) {
    6             throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    7         }
    8     }

        该方法完成了channel的实例化,channelFactory的赋值可参见上一篇博文【Netty源码学习系列之3-ServerBootstrap的初始化】(地址 https://www.cnblogs.com/zzq6032010/p/13027161.html),对服务端来说,这里channelFactory值为ReflectiveChannelFactory,且其内部的constructor是NioServerSocketChannel的无参构造器,下面追踪NioServerSocketChannel的无参构造方法。

    1.1)、new NioServerSocketChannel()

    1 public NioServerSocketChannel() {
    2         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    3     }
     1 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
     2 
     3 private static ServerSocketChannel newSocket(SelectorProvider provider) {
     4         try {
     5             return provider.openServerSocketChannel();
     6         } catch (IOException e) {
     7             throw new ChannelException(
     8                     "Failed to open a server socket.", e);
     9         }
    10     }

        可见,它先通过newSocket方法获取nio原生的ServerSocketChannel,然后传给了重载构造器,如下,其中第三行是对NioServerSocketChannelConfig  config进行了赋值,逻辑比较简单,下面主要看对父类构造方法的调用。

    1 public NioServerSocketChannel(ServerSocketChannel channel) {
    2         super(null, channel, SelectionKey.OP_ACCEPT);
    3         config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    4     }

    1.2)、对NioServerSocketChannel父类构造方法的调用

     1 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
     2         super(parent);
     3         this.ch = ch;
     4         this.readInterestOp = readInterestOp;
     5         try {
     6             ch.configureBlocking(false);
     7         } catch (IOException e) {
     8             try {
     9                 ch.close();
    10             } catch (IOException e2) {
    11                 if (logger.isWarnEnabled()) {
    12                     logger.warn(
    13                             "Failed to close a partially initialized socket.", e2);
    14                 }
    15             }
    16 
    17             throw new ChannelException("Failed to enter non-blocking mode.", e);
    18         }
    19     }

        中间经过了AbstractNioMessageChannel,然后调到下面AbstractNioChannel的构造方法。此时parent为null,ch为上面获取到的nio原生ServerSocketChannel,readInterestOp为SelectionKey的Accept事件(值为16)。可以看到,将原生渠道ch赋值、感兴趣的事件readInterestOp赋值、设置非阻塞。然后重点看对父类构造器的调用。

    1.3)、AbstractChannel构造器

    1 protected AbstractChannel(Channel parent) {
    2         this.parent = parent;
    3         id = newId();
    4         unsafe = newUnsafe();
    5         pipeline = newChannelPipeline();
    6     }

        可以看到,此构造方法只是给四个属性进行了赋值,我们挨个看下这四个属性。

        第一个属性是this.parent,类型为io.netty.channel.Channel,但此时值为null;

        第二个属性id类型为io.netty.channel.ChannelId,就是一个id生成器,值为new DefaultChannelId();

        第三个属性unsafe类型为io.netty.channel.Channel.Unsafe,该属性很重要,封装了对事件的处理逻辑,最终调用的是AbstractNioMessageChannel中的newUnsafe方法,赋的值为new NioMessageUnsafe();

        第四个属性pipeline类型为io.netty.channel.DefaultChannelPipeline,该属性很重要,封装了handler处理器的逻辑,赋的值为 new DefaultChannelPipeline(this)  this即当前的NioServerSocketChannel对象。

        其中DefaultChannelPipeline的构造器需要额外看一下,如下,将NioServerSocketChannel对象存入channel属性,然后初始化了tail、head两个成员变量,且对应的前后指针指向对方。TailContext和HeadContext都继承了AbstractChannelHandlerContext,在这个父类里面维护了next和prev两个双向指针,看到这里有经验的园友应该一下子就能看出来,DefaultChannelPipeline内部维护了一个双向链表。

     1 protected DefaultChannelPipeline(Channel channel) {
     2         this.channel = ObjectUtil.checkNotNull(channel, "channel");
     3         succeededFuture = new SucceededChannelFuture(channel, null);
     4         voidPromise =  new VoidChannelPromise(channel, true);
     5 
     6         tail = new TailContext(this);
     7         head = new HeadContext(this);
     8 
     9         head.next = tail;
    10         tail.prev = head;
    11     }

         至此,完成了上面initAndRegister方法中的第一个功能:channel的实例化。此时NioServerSocketChannel的几个父类属性快照图如下所示:

    2、init(channel)方法

        init(channel)方法位于ServerBootstrap中(因为这里是通过ServerBootstrap过来的,如果是通过Bootstrap进入的这里则调用的就是Bootstrap中的init方法),主要功能如下注释所示。本质都是针对channel进行初始化,初始化channel中的option、attr和pipeline。

     1 void init(Channel channel) throws Exception {
     2         // 1、获取AbstractBootstrap中的options属性,与channel进行关联
     3         final Map<ChannelOption<?>, Object> options = options0();
     4         synchronized (options) {
     5             setChannelOptions(channel, options, logger);
     6         }
     7         // 2、获取AbstractBootstrap中的attr属性,与channel关联起来
     8         final Map<AttributeKey<?>, Object> attrs = attrs0();
     9         synchronized (attrs) {
    10             for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
    11                 @SuppressWarnings("unchecked")
    12                 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
    13                 channel.attr(key).set(e.getValue());
    14             }
    15         }
    16         // 3、获取pipeline,并将一个匿名handler对象添加进去,重要***
    17         ChannelPipeline p = channel.pipeline();
    18         final EventLoopGroup currentChildGroup = childGroup;
    19         final ChannelHandler currentChildHandler = childHandler;
    20         final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    21         final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    22         synchronized (childOptions) {
    23             currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    24         }
    25         synchronized (childAttrs) {
    26             currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    27         }
    28         p.addLast(new ChannelInitializer<Channel>() {
    29             @Override
    30             public void initChannel(final Channel ch) throws Exception {
    31                 final ChannelPipeline pipeline = ch.pipeline();
    32                 ChannelHandler handler = config.handler();
    33                 if (handler != null) {
    34                     pipeline.addLast(handler);
    35                 }
    36 
    37                 ch.eventLoop().execute(new Runnable() {
    38                     @Override
    39                     public void run() {
    40                         pipeline.addLast(new ServerBootstrapAcceptor(
    41                                 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    42                     }
    43                 });
    44             }
    45         });
    46     }

        1跟2的功能都比较容易理解,功能3是init的核心,虽然代码不少但很容易理解,它就是往channel中的pipeline里添加了一个匿名handler对象,其initChannel方法只有在有客户端连接接入时才会调用,initChannel方法的功能是什么呢?可以看到,它就是往入参channel中的eventLoop里添加了一个任务,这个任务的功能就是往pipeline中再添加一个handler,最后添加的这个handler就不是匿名的了,它是ServerBootstrapAcceptor对象。因为这里的initChannel方法和后面的run方法都是有客户端接入时才会调用的,所以这里只是提一下,后面会详述。至此完成init方法,下面进入register。

    3、config().group().register(channel)方法

     3.1)、config().group()方法

        由前面可以知道,config().group().register(channel)这行代码位于AbstractBootstrap类中的initAndRegister方法中,但由于当前对象是ServerBootstrap,故此处config()方法实际调用的都是ServerBootstrap中重写的方法,得到了ServerBootstrapConfig。

        ServerBootstrapConfig的group方法如下,调用的是它的父类AbstractBootstrapConfig中的方法。通过类名就能知道,ServerBootstrapConfig中的方法是获取ServerBootstrap中的属性,而AbstractBootstrapConfig中的方法是获取AbstractBootstrap中的属性,两两对应。故此处获取的EventLoopGroup就是AbstractBootstrap中存放的group,即文章开头demo中的boss对象。

    1 public final EventLoopGroup group() {
    2         return bootstrap.group();
    3     }

        获取到了名叫boss的这个NioEventLoopGroup对象,下面追踪NioEventLoopGroup.register(channel)方法

    3.2)、 NioEventLoopGroup.register(channel)方法

        该方法是对之前初始化属性的应用,需结合NioEventLoopGroup的初始化流程看,详见【Netty源码学习系列之2-NioEventLoopGroup的初始化】(链接【https://www.cnblogs.com/zzq6032010/p/12872989.html】)一文,此处就不赘述了,下面把该类的继承类图粘贴出来,以便有个整体认识。

    3.2.1)、next()方法 

        下面的register方法位于MultithreadEventLoopGroup类中,是NioEventLoopGroup的直接父类,如下:

    1 public ChannelFuture register(Channel channel) {
    2         return next().register(channel);
    3     }

        next方法如下,调用了父类的next方法,下面的就是父类MultithreadEventExecutorGroup中的next实现,可以看到调用的是chooser的next方法。通过初始化流程可知,此处boss的线程数是1,是2的n次方,所以chooser就是PowerOfTwoEventExecutorChooser,通过next方法从EventExecutor[]中选择一个对象。需要注意的是chooser.next()通过轮询的方式选择的对象。

    1 public EventLoop next() {
    2         return (EventLoop) super.next();
    3     }
    1 public EventExecutor next() {
    2         return chooser.next();
    3     }

    3.2.2)、NioEventLoop.register方法

        next之后是register方法,中间将NioServerSocketChannel和当前的NioEventLoop封装成一个DefaultChannelPromise对象往下传递,在下面第二个register方法中可以看到,实际上调用的是NioServerSocketChannel中的unsafe属性的register方法。

    1 public ChannelFuture register(Channel channel) {
    2         return register(new DefaultChannelPromise(channel, this));
    3     }
    1 public ChannelFuture register(final ChannelPromise promise) {
    2         ObjectUtil.checkNotNull(promise, "promise");
    3         promise.channel().unsafe().register(this, promise);
    4         return promise;
    5     }

    3.2.3)、NioMessageUnsafe的register方法

        通过本文第一部分中第1步中的1.3)可以知道,NioServerSocketChannel中的unsafe是NioMessageUnsafe对象,下面继续追踪其register方法:

     1 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
     2             if (eventLoop == null) {// 判断非空
     3                 throw new NullPointerException("eventLoop");
     4             }
     5             if (isRegistered()) {// 判断是否注册
     6                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
     7                 return;
     8             }
     9             if (!isCompatible(eventLoop)) {// 判断eventLoop类型是否匹配
    10                 promise.setFailure(
    11                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    12                 return;
    13             }
    14        // 完成eventLoop属性的赋值
    15             AbstractChannel.this.eventLoop = eventLoop;
    16             // 判断eventLoop中的Reactor线程是不是当前线程 ***重要1
    17             if (eventLoop.inEventLoop()) {
    18                 register0(promise); // 进行注册
    19             } else {
    20                 try {// 不是当前线程则将register0任务放入eventLoop队列中让Reactor线程执行(如果Reactor线程未初始化还要将其初始化) ***重要2
    21                     eventLoop.execute(new Runnable() {
    22                         @Override
    23                         public void run() {
    24                             register0(promise);// 注册逻辑 ***重要3
    25                         }
    26                     });
    27                 } catch (Throwable t) {
    28                     // 省略异常处理
    29                 }
    30             }
    31         }

        该方法位于io.netty.channel.AbstractChannel.AbstractUnsafe中(它是NioMessageUnsafe的父类),根据注释能了解每一步做了什么,但如果要理解代码逻辑意图则需要结合netty的串行无锁化(串行无锁化参见博主的netty系列第一篇文章https://www.cnblogs.com/zzq6032010/p/12872993.html)。它实际就是让每一个NioEventLoop对象的thread属性记录一条线程,用来循环执行NioEventLoop的run方法,后续这个channel上的所有事件都由这一条线程来执行,如果当前线程不是Reactor线程,则会将任务放入队列中,Reactor线程会不断从队列中获取任务执行。这样以来,所有事件都由一条线程顺序处理,线程安全,也就不需要加锁了。

        说完整体思路,再来结合代码看看。上述代码中标识【***重要1】的地方就是通过inEventLoop方法判断eventLoop中的thread属性记录的线程是不是当前线程:

        先调到父类AbstractEventExecutor中,获取了当前线程:

    1 public boolean inEventLoop() {
    2         return inEventLoop(Thread.currentThread());
    3     }

        然后调到SingleThreadEventExecutor类中的方法,如下,比对thread与当前线程是否是同一个:

    1 public boolean inEventLoop(Thread thread) {
    2         return thread == this.thread;
    3     }

        此时thread未初始化,所以肯定返回false,则进入【***重点2】的逻辑,将register放入run方法中封装成一个Runnable任务,然后执行execute方法,如下,该方法位于SingleThreadEventExecutor中:

     1 public void execute(Runnable task) {
     2         if (task == null) {
     3             throw new NullPointerException("task");
     4         }
     5 
     6         boolean inEventLoop = inEventLoop();
     7         addTask(task); //将任务放入队列中 ***重要a
     8         if (!inEventLoop) {
     9             startThread(); //判断当前线程不是thread线程,则调用该方法 ***重要b
    10             if (isShutdown()) {
    11                 boolean reject = false;
    12                 try {
    13                     if (removeTask(task)) {
    14                         reject = true;
    15                     }
    16                 } catch (UnsupportedOperationException e) {
    17                     // 省略注释
    18                 }
    19                 if (reject) {
    20                     reject();
    21                 }
    22             }
    23         }
    24 
    25         if (!addTaskWakesUp && wakesUpForTask(task)) {
    26             wakeup(inEventLoop);
    27         }
    28     }

        有两个重要的逻辑,已经在上面代码中标出,先看看【***重要a】,如下,可见最终就是往SingleThreadEventExecutor的taskQueue队列中添加了一个任务,如果添加失败则调reject方法执行拒绝策略,通过前文分析可以知道,此处的拒绝策略就是直接抛错。

    1 protected void addTask(Runnable task) {
    2         if (task == null) {
    3             throw new NullPointerException("task");
    4         }
    5         if (!offerTask(task)) {
    6             reject(task);
    7         }
    8     }
    1 final boolean offerTask(Runnable task) {
    2         if (isShutdown()) {
    3             reject();
    4         }
    5         return taskQueue.offer(task);
    6     }

        然后在看【***重要b】,如下,该方法虽然叫startThread,但内部有控制,不能无脑开启线程,因为调这个方法的时候会有两种情况:1).thread变量为空;2).thread不为空且不是当前线程。第一种情况需要开启新的线程,但第二种情况就不能直接创建线程了。所以看下面代码可以发现,它内部通过CAS+volatile(state属性加了volatile修饰)实现的开启线程的原子控制,保证多线程情况下也只会有一个线程进入doStartThread()方法。

     1 private void startThread() {
     2         if (state == ST_NOT_STARTED) {
     3             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
     4                 boolean success = false;
     5                 try {
     6                     doStartThread();
     7                     success = true;
     8                 } finally {
     9                     if (!success) {
    10                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
    11                     }
    12                 }
    13             }
    14         }
    15     }

        继续往下看一下doStartThread()的方法逻辑:

     1 private void doStartThread() {
     2         assert thread == null;
     3         executor.execute(new Runnable() { //此处的executor内部执行的就是ThreadPerTaskExecutor的execute逻辑,创建一个新线程运行下面的run方法
     4             @Override
     5             public void run() {
     6                 thread = Thread.currentThread(); //将Reactor线程记录到thread变量中,保证一个NioEventLoop只有一个主线程在运行
     7                 if (interrupted) {
     8                     thread.interrupt();
     9                 }
    10 
    11                 boolean success = false;
    12                 updateLastExecutionTime();
    13                 try {
    14                     SingleThreadEventExecutor.this.run(); //调用当前对象的run方法,该run方法就是Reactor线程的核心逻辑方法,后面会重点研究
    15                     success = true;
    16                 } catch (Throwable t) {
    17                     logger.warn("Unexpected exception from an event executor: ", t);
    18                 } finally {
    19                    // 省略无关逻辑
    20                 }
    21             }
    22         });
    23     }

        可以看到,在上面的方法中完成了Reactor线程thread的赋值和核心逻辑NioEventLoop中run方法的启动。这个run方法启动后,第一步做的事情是什么?让我们往前回溯,回到3.2.3),当然是执行当初封装了 register0方法的那个run方法的任务,即执行register0方法,下面填之前埋得坑,对【***重要3】进行追踪:

     1 private void register0(ChannelPromise promise) {
     2             try {
     3                 // 省略判断逻辑
     4                 boolean firstRegistration = neverRegistered;
     5                 doRegister();// 执行注册逻辑
     6                 neverRegistered = false;
     7                 registered = true;
     8                 pipeline.invokeHandlerAddedIfNeeded();// 调用pipeline的逻辑
     9 
    10                 safeSetSuccess(promise);
    11                 pipeline.fireChannelRegistered();
    12                 // 省略无关逻辑
    13             } catch (Throwable t) {
    14                 // 省略异常处理
    15             }
    16         }

        doRegister()方法的实现在AbstractNioChannel中,如下,就是完成了nio中的注册,将nio的ServerSocketChannel注册到selector上:

     1 protected void doRegister() throws Exception {
     2         boolean selected = false;
     3         for (;;) {
     4             try {
     5                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
     6                 return;
     7             } catch (CancelledKeyException e) {
     8                // 省略异常处理
     9             }
    10         }
    11     }

        再看pipeline.invokeHandlerAddedIfNeeded()方法,该方法调用链路比较长,此处就不详细粘贴了,只是说一下流程。回顾下上面第二部分的第2步,在里面最后addLast了一个匿名的内部对象,重写了initChannel方法,此处通过pipeline.invokeHandlerAddedIfNeeded()方法就会调用到这个匿名对象的initChannel方法(只有第一次注册时才会调),该方法往pipeline中又添加了一个ServerBootstrapAcceptor对象。执行完方法后,netty会在finally中将之前那个匿名内部对象给remove掉,这时pipeline中的handler如下所示:

         至此,算是基本完成了initAndRegister方法的逻辑,当然限于篇幅(本篇已经够长了),其中还有很多细节性的处理未提及。

    三、AbstractBootstrap的doBind0方法

         doBind0方法逻辑如下所示,new了一个Runnable任务交给Reactor线程执行,execute执行过程已经分析过了,此处不再赘述,集中下所剩无几的精力看下run方法中的bind逻辑。

     1 private static void doBind0(
     2             final ChannelFuture regFuture, final Channel channel,
     3             final SocketAddress localAddress, final ChannelPromise promise) {
     4 
     5         channel.eventLoop().execute(new Runnable() {
     6             @Override
     7             public void run() {
     8                 if (regFuture.isSuccess()) {
     9                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    10                 } else {
    11                     promise.setFailure(regFuture.cause());
    12                 }
    13             }
    14         });
    15     }

        channel.bind方法,如下:

    1 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    2         return pipeline.bind(localAddress, promise);
    3     }

        调用了pipeline的bind方法:

    1 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    2         return tail.bind(localAddress, promise);
    3     }

        tail.bind方法:

     1 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
     2         if (localAddress == null) {
     3             throw new NullPointerException("localAddress");
     4         }
     5         if (isNotValidPromise(promise, false)) {
     6             // cancelled
     7             return promise;
     8         }
     9         // 从tail开始往前,找到第一个outbond的handler,这时只有head满足要求,故此处next是head
    10         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    11         EventExecutor executor = next.executor();
    12         if (executor.inEventLoop()) {// 因为当前线程就是executor中的Reactor线程,所以直接进入invokeBind方法
    13             next.invokeBind(localAddress, promise);
    14         } else {
    15             safeExecute(executor, new Runnable() {
    16                 @Override
    17                 public void run() {
    18                     next.invokeBind(localAddress, promise);
    19                 }
    20             }, promise, null);
    21         }
    22         return promise;
    23     }

        下面进入head.invokeBind方法:

     1 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
     2         if (invokeHandler()) {
     3             try {
     4                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
     5             } catch (Throwable t) {
     6                 notifyOutboundHandlerException(t, promise);
     7             }
     8         } else {
     9             bind(localAddress, promise);
    10         }
    11     }

        核心逻辑就是handler.bind方法,继续追踪:

    1 public void bind(
    2                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    3             unsafe.bind(localAddress, promise);
    4         }

        此处的unsafe是NioMessageUnsafe,继续追踪会看到在bind方法中又调用了NioServerSocketChannel中的doBind方法,最终在这里完成了nio原生ServerSocketChannel和address的绑定:

    1 protected void doBind(SocketAddress localAddress) throws Exception {
    2         if (PlatformDependent.javaVersion() >= 7) {
    3             javaChannel().bind(localAddress, config.getBacklog());
    4         } else {
    5             javaChannel().socket().bind(localAddress, config.getBacklog());
    6         }
    7     }

        至此,ServerBootstrap的bind方法完成。

    小结

        本文从头到尾追溯了ServerBootstrap中bind方法的逻辑,将前面netty系列中的二、三两篇初始化给串联了起来,是承上启下的一个位置。后面的netty系列将围绕本文中启动的NioEventLoop.run方法展开,可以这么说,本文跟前面的三篇只是为run方法的出现做的一个铺垫,run方法才是核心功能的逻辑所在。

        本文断断续续更新了一周,今天才完成,也没想到会这么长,就这样吧,后面继续netty run方法的学习。

  • 相关阅读:
    Django和flask中使用原生SQL方法
    UnicodeDecodeError: 'ascii' codec can't decode byte 0xe6 in
    Docker常用命令
    MySQL的压力测试
    使用docker-compose快速搭建PHP开发环境
    Docke如何配置Nginx和PHP
    Docker容器的重命名和自动重启
    docker部署MySQL、Redis和Nginx
    docker-compose的安装卸载以及如何使用
    docker如何制作自己的镜像
  • 原文地址:https://www.cnblogs.com/zzq6032010/p/13034608.html
Copyright © 2011-2022 走看看