zoukankan      html  css  js  c++  java
  • Netty源码分析之服务端启动过程

    一、首先来看一段服务端的示例代码:

     1 public class NettyTestServer {
     2     public void bind(int port) throws Exception{
     3         EventLoopGroup bossgroup = new NioEventLoopGroup();//创建BOSS线程组
     4         EventLoopGroup workgroup = new NioEventLoopGroup();//创建WORK线程组
     5         try{
     6             ServerBootstrap b = new ServerBootstrap();
     7             b.group(bossgroup,workgroup)//绑定BOSS和WORK线程组
     8                     .channel(NioServerSocketChannel.class)//设置channel类型,服务端用的是NioServerSocketChannel
     9                     .option(ChannelOption.SO_BACKLOG,100) //设置channel的配置选项    
    10                     .handler(new LoggingHandler(LogLevel.INFO))//设置NioServerSocketChannel的Handler
    11                     .childHandler(new ChannelInitializer<SocketChannel>() {//设置childHandler,作为新建的NioSocketChannel的初始化Handler
    12                         @Override//当新建的与客户端通信的NioSocketChannel被注册到EventLoop成功时,该方法会被调用,用于添加业务Handler
    13                         protected void initChannel(SocketChannel ch) throws Exception {
    14                             ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
    15                             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
    16                             ch.pipeline().addLast(new StringDecoder());
    17                             ch.pipeline().addLast(new EchoServerHandler());
    18                         }
    19                     });
    20             ChannelFuture f = b.bind(port).sync();//同步等待绑定结束
    21             f.channel().closeFuture().sync();//同步等待关闭
    22         }finally {
    23             bossgroup.shutdownGracefully();
    24             workgroup.shutdownGracefully();
    25         }
    26     }
    27     public static  void main(String[] args) throws  Exception{
    28         int port = 8082;
    29         new NettyTestServer().bind(port);
    30     }
    31 }
    32 @ChannelHandler.Sharable
    33 class EchoServerHandler extends ChannelInboundHandlerAdapter{
    34     int count = 0;
    35 
    36     @Override
    37     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    38         String body = (String)msg;
    39         System.out.println("This is" + ++count + "times receive client:[" + body + "]");
    40         body += "$_";
    41         ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
    42         ctx.writeAndFlush(echo);
    43         ctx.fireChannelRead("my name is chenyang");
    44     }
    45 
    46     @Override
    47     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    48         cause.printStackTrace();
    49         ctx.close();
    50     }
    51 }

    二、首先来看一下ServerBootstrap类,顾名思义,它是一个服务端启动类,用于帮助用户快速配置、启动服务端服务。先来看一下该类的主要成员定义:

     1 /**
     2  * {@link Bootstrap} sub-class which allows easy bootstrap of {@link ServerChannel}
     3  *
     4  */
     5 public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
     6 
     7     private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
     8     //以下都是针对NioSocketChannel的
     9     private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
    10     private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
    11     private volatile EventLoopGroup childGroup;
    12     private volatile ChannelHandler childHandler;

    可见,ServerBootstrap是AbstractBootstrap的子类,AbstractBootstrap的成员主要有:

     1 /**
     2  * {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
     3  * method-chaining to provide an easy way to configure the {@link AbstractBootstrap}.
     4  *
     5  * <p>When not used in a {@link ServerBootstrap} context, the {@link #bind()} methods are useful for connectionless
     6  * transports such as datagram (UDP).</p>
     7  */
     8 public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
     9     //以下都是针对服务端NioServerSocketChannel的
    10     volatile EventLoopGroup group;
    11     private volatile ChannelFactory<? extends C> channelFactory;
    12     private volatile SocketAddress localAddress;
    13     private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    14     private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
    15     private volatile ChannelHandler handler;

    用一张图说明两个类之间的关系如下(原图出自:http://blog.csdn.net/zxhoo/article/details/17532857)。

    总结如下: ServerBootstrap比AbstractBootstrap多了4个Part,其中AbstractBootstrap的成员用于设置服务端NioServerSocketChannel(包括所使用的线程组、使用的channel工厂类、使用的Handler以及地址和选项信息等), ServerBootstrap的4个成员用于设置为有新连接时新建的NioSocketChannel。

    三、ServerBootstrap配置源码解释

     1)b.group(bossgroup,workgroup)

     1     /**
     2      * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
     3      * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
     4      * {@link Channel}'s.
     5      */
     6     public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
     7         super.group(parentGroup);//设置BOSS线程组(在AbstractBootstrap中)
     8         if (childGroup == null) {
     9             throw new NullPointerException("childGroup");
    10         }
    11         if (this.childGroup != null) {
    12             throw new IllegalStateException("childGroup set already");
    13         }
    14         this.childGroup = childGroup;//设置WORK线程组
    15         return this;
    16     }

    2) .channel(NioServerSocketChannel.class)

     1     /**
     2      * The {@link Class} which is used to create {@link Channel} instances from.
     3      * You either use this or {@link #channelFactory(ChannelFactory)} if your
     4      * {@link Channel} implementation has no no-args constructor.
     5      */
     6     public B channel(Class<? extends C> channelClass) {
     7         if (channelClass == null) {
     8             throw new NullPointerException("channelClass");
     9         }
    10         return channelFactory(new BootstrapChannelFactory<C>(channelClass));//设置channel工厂
    11     }

    channelFactory方法就是用来设置channel工厂的,这里的工厂就是BootstrapChannelFactory(是一个泛型类)。

     1     /**
     2      * {@link ChannelFactory} which is used to create {@link Channel} instances from
     3      * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
     4      * is not working for you because of some more complex needs. If your {@link Channel} implementation
     5      * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
     6      * simplify your code.
     7      */
     8     @SuppressWarnings("unchecked")
     9     public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    10         if (channelFactory == null) {
    11             throw new NullPointerException("channelFactory");
    12         }
    13         if (this.channelFactory != null) {
    14             throw new IllegalStateException("channelFactory set already");
    15         }
    16 
    17         this.channelFactory = channelFactory;//设置channel工厂
    18         return (B) this;
    19     }

    下面就是channel工厂类的实现,构造函数传入一个channel类型(针对服务端也就是NioServerSocketChannel.class),BootstrapChannelFactory工厂类提供的newChannel方法将使用反射创建对应的channel。用于channel的创建一般只在启动的时候进行,因此使用反射不会造成性能的问题。

     1     private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
     2         private final Class<? extends T> clazz;
     3 
     4         BootstrapChannelFactory(Class<? extends T> clazz) {
     5             this.clazz = clazz;
     6         }
     7 
     8         @Override
     9         public T newChannel() {//需要创建channel的时候,次方法将被调用
    10             try {
    11                 return clazz.newInstance();//反射创建对应channel
    12             } catch (Throwable t) {
    13                 throw new ChannelException("Unable to create Channel from class " + clazz, t);
    14             }
    15         }
    16 
    17         @Override
    18         public String toString() {
    19             return StringUtil.simpleClassName(clazz) + ".class";
    20         }
    21     }

    3) .option(ChannelOption.SO_BACKLOG,100)

        用来设置channel的选项,比如设置BackLog的大小等。

     1     /**
     2      * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
     3      * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
     4      */
     5     @SuppressWarnings("unchecked")
     6     public <T> B option(ChannelOption<T> option, T value) {
     7         if (option == null) {
     8             throw new NullPointerException("option");
     9         }
    10         if (value == null) {
    11             synchronized (options) {
    12                 options.remove(option);
    13             }
    14         } else {
    15             synchronized (options) {
    16                 options.put(option, value);
    17             }
    18         }
    19         return (B) this;
    20     }

    4) .handler(new LoggingHandler(LogLevel.INFO))

         用于设置服务端NioServerSocketChannel的Handler。

     1     /**
     2      * the {@link ChannelHandler} to use for serving the requests.
     3      */
     4     @SuppressWarnings("unchecked")
     5     public B handler(ChannelHandler handler) {
     6         if (handler == null) {
     7             throw new NullPointerException("handler");
     8         }
     9         this.handler = handler;//设置的是父类AbstractBootstrap里的成员,也就是该handler是被NioServerSocketChannel使用
    10         return (B) this;
    11     }

     5) .childHandler(new ChannelInitializer<SocketChannel>() {

           一定要分清.handler和.childHandler的区别,首先,两者都是设置一个Handler,但是,前者设置的Handler是属于服务端NioServerSocketChannel的,而后者设置的Handler是属于每一个新建的NioSocketChannel的(每当有一个来自客户端的连接时,否会创建一个新的NioSocketChannel)。

     1    /**
     2      * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
     3      */
     4     public ServerBootstrap childHandler(ChannelHandler childHandler) {
     5         if (childHandler == null) {
     6             throw new NullPointerException("childHandler");
     7         }
     8         this.childHandler = childHandler;
     9         return this;
    10     }

          至此,ServerBootstrap的配置完成,其实有人可能会很好奇,为什么不直接在ServerBootstrap的构造函数中一步完成这些初始化配置操作,这样做虽然可以,但是这会导致ServerBootstrap构造函数的参数过多,而是用Builder模式(也就是ServerBootstrap目前采用的模式,可以参见<<effective java>>)则可以有效的解决构造方法参数过多的问题。

    四、bind流程

    1)一切从bind开始  ChannelFuture f = b.bind(port).sync();

    1    /**
    2      * Create a new {@link Channel} and bind it.
    3      */
    4     public ChannelFuture bind(int inetPort) {
    5         return bind(new InetSocketAddress(inetPort));
    6     }

    继续深入bind

     1     /**
     2      * Create a new {@link Channel} and bind it.
     3      */
     4     public ChannelFuture bind(SocketAddress localAddress) {
     5         validate();
     6         if (localAddress == null) {
     7             throw new NullPointerException("localAddress");
     8         }
     9         return doBind(localAddress);
    10     }

    继续摄入doBind

     1  private ChannelFuture doBind(final SocketAddress localAddress) {
     2         final ChannelFuture regFuture = initAndRegister();//初始化并注册一个channel
     3         final Channel channel = regFuture.channel();
     4         if (regFuture.cause() != null) {
     5             return regFuture;
     6         }
     7         //等待注册成功
     8         if (regFuture.isDone()) {
     9             // At this point we know that the registration was complete and successful.
    10             ChannelPromise promise = channel.newPromise();
    11             doBind0(regFuture, channel, localAddress, promise);//执行channel.bind()
    12             return promise;
    13         } else {
    14             // Registration future is almost always fulfilled already, but just in case it's not.
    15             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    16             regFuture.addListener(new ChannelFutureListener() {
    17                 @Override
    18                 public void operationComplete(ChannelFuture future) throws Exception {
    19                     Throwable cause = future.cause();
    20                     if (cause != null) {
    21                         // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
    22                         // IllegalStateException once we try to access the EventLoop of the Channel.
    23                         promise.setFailure(cause);
    24                     } else {
    25                         // Registration was successful, so set the correct executor to use.
    26                         // See https://github.com/netty/netty/issues/2586
    27                         promise.executor = channel.eventLoop();
    28                     }
    29                     doBind0(regFuture, channel, localAddress, promise);
    30                 }
    31             });
    32             return promise;
    33         }
    34     }

    doBind中最重要的一步就是调用initAndRegister方法了,它会初始化并注册一个channel,直接看源码吧。

     1  final ChannelFuture initAndRegister() {
     2         final Channel channel = channelFactory().newChannel();//还记得前面我们设置过channel工厂么,终于排上用场了
     3         try {
     4             init(channel);//初始化channel(就是NioServerSocketChannel)
     5         } catch (Throwable t) {
     6             channel.unsafe().closeForcibly();
     7             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
     8             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
     9         }
    10 
    11         ChannelFuture regFuture = group().register(channel);//向EventLoopGroup中注册一个channel
    12         if (regFuture.cause() != null) {
    13             if (channel.isRegistered()) {
    14                 channel.close();
    15             } else {
    16                 channel.unsafe().closeForcibly();
    17             }
    18         }
    19 
    20         // If we are here and the promise is not failed, it's one of the following cases:
    21         // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    22         //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    23         // 2) If we attempted registration from the other thread, the registration request has been successfully
    24         //    added to the event loop's task queue for later execution.
    25         //    i.e. It's safe to attempt bind() or connect() now:
    26         //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    27         //         because register(), bind(), and connect() are all bound to the same thread.
    28 
    29         return regFuture;
    30     }

     先来看一下init方法

     1 @Override
     2     void init(Channel channel) throws Exception {
     3         final Map<ChannelOption<?>, Object> options = options();
     4         synchronized (options) {
     5             channel.config().setOptions(options);//设置之前配置的channel选项
     6         }
     7 
     8         final Map<AttributeKey<?>, Object> attrs = attrs();
     9         synchronized (attrs) {
    10             for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
    11                 @SuppressWarnings("unchecked")
    12                 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
    13                 channel.attr(key).set(e.getValue());//设置之前配置的属性
    14             }
    15         }
    16 
    17         ChannelPipeline p = channel.pipeline();//获取channel绑定的pipeline(pipeline实在channel创建的时候创建并绑定的)
    18         if (handler() != null) {//如果用户配置过Handler
    19             p.addLast(handler());//为NioServerSocketChannel绑定的pipeline添加Handler
    20         }
    21         //开始准备child用到的4个part,因为接下来就要使用它们。
    22         final EventLoopGroup currentChildGroup = childGroup;
    23         final ChannelHandler currentChildHandler = childHandler;
    24         final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    25         final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    26         synchronized (childOptions) {
    27             currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    28         }
    29         synchronized (childAttrs) {
    30             currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    31         }
    32         //为NioServerSocketChannel的pipeline添加一个初始化Handler,当NioServerSocketChannel在EventLoop注册成功时,该handler的init方法将被调用
    33         p.addLast(new ChannelInitializer<Channel>() {
    34             @Override
    35             public void initChannel(Channel ch) throws Exception {
    36                 ch.pipeline().addLast(new ServerBootstrapAcceptor(//为NioServerSocketChannel的pipeline添加ServerBootstrapAcceptor处理器
    //该Handler主要用来将新创建的NioSocketChannel注册到EventLoopGroup中
    37 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 38 } 39 }); 40 }

    init执行之后,接下来看一下注册过程(ChannelFuture regFuture = group().register(channel); 注意,这里的group是之前设置的BOSS EventLoopGroup)

    1    @Override
    2     public ChannelFuture register(Channel channel) {
    3         return next().register(channel);//首先使用next()在BOSS EventLoopGroup中选出下一个EventLoop,然后执行注册
    4     }
    1     @Override
    2     public ChannelFuture register(Channel channel) {
    3         return register(channel, new DefaultChannelPromise(channel, this));
    4     }
     1   @Override
     2     public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
     3         if (channel == null) {
     4             throw new NullPointerException("channel");
     5         }
     6         if (promise == null) {
     7             throw new NullPointerException("promise");
     8         }
     9 
    10         channel.unsafe().register(this, promise);//unsafe执行的都是实际的操作
    11         return promise;
    12     }
     1 @Override
     2         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
     3             if (eventLoop == null) {
     4                 throw new NullPointerException("eventLoop");
     5             }
     6             if (isRegistered()) {
     7                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
     8                 return;
     9             }
    10             if (!isCompatible(eventLoop)) {
    11                 promise.setFailure(
    12                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    13                 return;
    14             }
    15 
    16             AbstractChannel.this.eventLoop = eventLoop;//绑定为该channel选的的EventLoop
    17             //必须保证注册是由该EventLoop发起的,否则会单独封装成一个Task,由该EventLoop执行
    18             if (eventLoop.inEventLoop()) {
    19                 register0(promise);//注册
    20             } else {
    21                 try {
    22                     eventLoop.execute(new OneTimeTask() {
    23                         @Override
    24                         public void run() {
    25                             register0(promise);
    26                         }
    27                     });
    28                 } catch (Throwable t) {
    29                     logger.warn(
    30                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
    31                             AbstractChannel.this, t);
    32                     closeForcibly();
    33                     closeFuture.setClosed();
    34                     safeSetFailure(promise, t);
    35                 }
    36             }
    37         }
     1 private void register0(ChannelPromise promise) {
     2             try {
     3                 // check if the channel is still open as it could be closed in the mean time when the register
     4                 // call was outside of the eventLoop
     5                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
     6                     return;
     7                 }
     8                 boolean firstRegistration = neverRegistered;
     9                 doRegister();//最底层的注册调用
    10                 neverRegistered = false;
    11                 registered = true;
    12                 safeSetSuccess(promise);//设置注册结果为成功
    13                 pipeline.fireChannelRegistered();//发起pipeline调用fireChannelRegistered(head.fireChannelRegistered)
    14                 // Only fire a channelActive if the channel has never been registered. This prevents firing
    15                 // multiple channel actives if the channel is deregistered and re-registered.
    16                 if (firstRegistration && isActive()) {//如果是首次注册,而且channel已经处于Active状态(如果是服务端,表示listen成功,如果是客户端,便是connect成功)
    17                     pipeline.fireChannelActive();//发起pipeline的fireChannelActive
    18                 }
    19             } catch (Throwable t) {
    20                 // Close the channel directly to avoid FD leak.
    21                 closeForcibly();
    22                 closeFuture.setClosed();
    23                 safeSetFailure(promise, t);
    24             }
    25         }

    doRegister会完成在EventLoop的Selector上的注册任务。

     1  @Override
     2     protected void doRegister() throws Exception {
     3         boolean selected = false;
     4         for (;;) {
     5             try {
     6                 selectionKey = javaChannel().register(eventLoop().selector, 0, this);//注意,此时op位为0,channel还不能监听读写事件
     7                 return;
     8             } catch (CancelledKeyException e) {
     9                 if (!selected) {
    10                     // Force the Selector to select now as the "canceled" SelectionKey may still be
    11                     // cached and not removed because no Select.select(..) operation was called yet.
    12                     eventLoop().selectNow();
    13                     selected = true;
    14                 } else {
    15                     // We forced a select operation on the selector before but the SelectionKey is still cached
    16                     // for whatever reason. JDK bug ?
    17                     throw e;
    18                 }
    19             }
    20         }
    21     }

    由上可知,注册成功后,NioServerSocketChannel还不能监听读写事件,那么什么时候回开始监听呢?由于注册成功之后,会进行pipeline.fireChannelRegistered()调用,该事件会在NioServerSocketChannel的pipeline中传播(从head开始,逐步findContextInbound),这会导致Inbound类型的Handler的channelRegistered方法被调用。还记得在init方法中为NioServerSocketChannel添加的ChannelInitializer的Handler吗,它也是一个InboundHandler,看一下他的实现:

     1 @Sharable
     2 public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
     3 
     4     private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
     5 
     6     /**
     7      * This method will be called once the {@link Channel} was registered. After the method returns this instance
     8      * will be removed from the {@link ChannelPipeline} of the {@link Channel}.
     9      *
    10      * @param ch            the {@link Channel} which was registered.
    11      * @throws Exception    is thrown if an error occurs. In that case the {@link Channel} will be closed.
    12      */
    13     protected abstract void initChannel(C ch) throws Exception;//抽象方法,由子类实现
    14 
    15     @Override
    16     @SuppressWarnings("unchecked")
    17     public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {//该方法会在NioServerScoketChannel注册成功时被调用
    18         ChannelPipeline pipeline = ctx.pipeline();
    19         boolean success = false;
    20         try {
    21             initChannel((C) ctx.channel());//调用initChannel
    22             pipeline.remove(this);//初始化Handler只完成初始化工作,初始化完成自后就把自己删除
    23             ctx.fireChannelRegistered();//继续传播channelRegistered事件
    24             success = true;
    25         } catch (Throwable t) {
    26             logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
    27         } finally {
    28             if (pipeline.context(this) != null) {
    29                 pipeline.remove(this);
    30             }
    31             if (!success) {
    32                 ctx.close();
    33             }
    34         }
    35     }
    36 }

    在重复贴一次代码,看一下initChannel里面是什么

    1     p.addLast(new ChannelInitializer<Channel>() {
    2             @Override
    3             public void initChannel(Channel ch) throws Exception {//被channelRegistered调用
    4                 ch.pipeline().addLast(new ServerBootstrapAcceptor(
    5                         currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    6             }
    7         }

    可以看到,initChannel只是向pipeline中添加了ServerBootstrapAcceptor类型的Handler。

    但是这还是没有看到给NioServerSocketChannel注册读写事件的地方,继续看之前的register0代码,它还会调用pipleline的fireChannelActive方法,看一下该方方法的代码:

     1     @Override
     2     public ChannelPipeline fireChannelActive() {
     3         head.fireChannelActive();//将ChannelActive事件在pipeline中传播
     4         //如果channel被配置成自动可读的,那么久发起读事件
     5         if (channel.config().isAutoRead()) {
     6             channel.read();//pipeline.read()-->tail.read()-->*****-->head.read()-->unsafe.beginRead()
     7         }
     8 
     9         return this;
    10     }
     1     @Override
     2     public ChannelHandlerContext fireChannelActive() {//head的fireChannelActive()
     3         final AbstractChannelHandlerContext next = findContextInbound();//寻找下一个Inbound类型的Context
     4         EventExecutor executor = next.executor();
     5         if (executor.inEventLoop()) {
     6             next.invokeChannelActive();//调用Context中的Handler的channelActive方法
     7         } else {
     8             executor.execute(new OneTimeTask() {
     9                 @Override
    10                 public void run() {
    11                     next.invokeChannelActive();
    12                 }
    13             });
    14         }
    15         return this;
    16     }

    看一下beginRead实现:

     1         @Override
     2         public final void beginRead() {
     3             if (!isActive()) {
     4                 return;
     5             }
     6 
     7             try {
     8                 doBeginRead();//真正的注册读事件
     9             } catch (final Exception e) {
    10                 invokeLater(new OneTimeTask() {
    11                     @Override
    12                     public void run() {
    13                         pipeline.fireExceptionCaught(e);
    14                     }
    15                 });
    16                 close(voidPromise());
    17             }
    18         }
     1 @Override
     2     protected void doBeginRead() throws Exception {
     3         // Channel.read() or ChannelHandlerContext.read() was called
     4         if (inputShutdown) {
     5             return;
     6         }
     7 
     8         final SelectionKey selectionKey = this.selectionKey;
     9         if (!selectionKey.isValid()) {
    10             return;
    11         }
    12 
    13         readPending = true;
    14 
    15         final int interestOps = selectionKey.interestOps();
    16         if ((interestOps & readInterestOp) == 0) {
    17             selectionKey.interestOps(interestOps | readInterestOp);//真正的注册读事件
    18         }
    19     }

    五、客户端接入过程

    接下来看看,当一个客户端连接进来时,都发生了什么。

    1)首先从事件的源头看起,下面是EventLoop的事件循环

     1    @Override
     2     protected void run() {
     3         for (;;) {
     4             boolean oldWakenUp = wakenUp.getAndSet(false);
     5             try {
     6                 if (hasTasks()) {
     7                     selectNow();
     8                 } else {
     9                     select(oldWakenUp);//调用selector.select()
    10 
    11                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
    12                     // before calling 'selector.wakeup()' to reduce the wake-up
    13                     // overhead. (Selector.wakeup() is an expensive operation.)
    14                     //
    15                     // However, there is a race condition in this approach.
    16                     // The race condition is triggered when 'wakenUp' is set to
    17                     // true too early.
    18                     //
    19                     // 'wakenUp' is set to true too early if:
    20                     // 1) Selector is waken up between 'wakenUp.set(false)' and
    21                     //    'selector.select(...)'. (BAD)
    22                     // 2) Selector is waken up between 'selector.select(...)' and
    23                     //    'if (wakenUp.get()) { ... }'. (OK)
    24                     //
    25                     // In the first case, 'wakenUp' is set to true and the
    26                     // following 'selector.select(...)' will wake up immediately.
    27                     // Until 'wakenUp' is set to false again in the next round,
    28                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
    29                     // any attempt to wake up the Selector will fail, too, causing
    30                     // the following 'selector.select(...)' call to block
    31                     // unnecessarily.
    32                     //
    33                     // To fix this problem, we wake up the selector again if wakenUp
    34                     // is true immediately after selector.select(...).
    35                     // It is inefficient in that it wakes up the selector for both
    36                     // the first case (BAD - wake-up required) and the second case
    37                     // (OK - no wake-up required).
    38 
    39                     if (wakenUp.get()) {
    40                         selector.wakeup();
    41                     }
    42                 }
    43 
    44                 cancelledKeys = 0;
    45                 needsToSelectAgain = false;
    46                 final int ioRatio = this.ioRatio;
    47                 if (ioRatio == 100) {
    48                     processSelectedKeys();
    49                     runAllTasks();
    50                 } else {
    51                     final long ioStartTime = System.nanoTime();
    52 
    53                     processSelectedKeys();//有事件发生时,执行这里
    54 
    55                     final long ioTime = System.nanoTime() - ioStartTime;
    56                     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    57                 }
    58 
    59                 if (isShuttingDown()) {
    60                     closeAll();
    61                     if (confirmShutdown()) {
    62                         break;
    63                     }
    64                 }
    65             } catch (Throwable t) {
    66                 logger.warn("Unexpected exception in the selector loop.", t);
    67 
    68                 // Prevent possible consecutive immediate failures that lead to
    69                 // excessive CPU consumption.
    70                 try {
    71                     Thread.sleep(1000);
    72                 } catch (InterruptedException e) {
    73                     // Ignore.
    74                 }
    75             }
    76         }
    77     }

    看一下processSelectedKeys代码

    1   private void processSelectedKeys() {
    2         if (selectedKeys != null) {
    3             processSelectedKeysOptimized(selectedKeys.flip());//执行这里
    4         } else {
    5             processSelectedKeysPlain(selector.selectedKeys());
    6         }
    7     }
     1  private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
     2         for (int i = 0;; i ++) {
     3             final SelectionKey k = selectedKeys[i];
     4             if (k == null) {
     5                 break;
     6             }
     7             // null out entry in the array to allow to have it GC'ed once the Channel close
     8             // See https://github.com/netty/netty/issues/2363
     9             selectedKeys[i] = null;
    10 
    11             final Object a = k.attachment();
    12 
    13             if (a instanceof AbstractNioChannel) {//因为是NioServerSocketChannel,所以执行这里
    14                 processSelectedKey(k, (AbstractNioChannel) a);
    15             } else {
    16                 @SuppressWarnings("unchecked")
    17                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    18                 processSelectedKey(k, task);
    19             }
    20 
    21             if (needsToSelectAgain) {
    22                 // null out entries in the array to allow to have it GC'ed once the Channel close
    23                 // See https://github.com/netty/netty/issues/2363
    24                 for (;;) {
    25                     if (selectedKeys[i] == null) {
    26                         break;
    27                     }
    28                     selectedKeys[i] = null;
    29                     i++;
    30                 }
    31 
    32                 selectAgain();
    33                 // Need to flip the optimized selectedKeys to get the right reference to the array
    34                 // and reset the index to -1 which will then set to 0 on the for loop
    35                 // to start over again.
    36                 //
    37                 // See https://github.com/netty/netty/issues/1523
    38                 selectedKeys = this.selectedKeys.flip();
    39                 i = -1;
    40             }
    41         }
    42     }
     1 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
     2         final NioUnsafe unsafe = ch.unsafe();
     3         if (!k.isValid()) {
     4             // close the channel if the key is not valid anymore
     5             unsafe.close(unsafe.voidPromise());
     6             return;
     7         }
     8 
     9         try {
    10             int readyOps = k.readyOps();
    11             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
    12             // to a spin loop
    13             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    14                 unsafe.read();//因为是ACCEPT事件,所以执行这里(这里的read会因为NioServerSocketChannel和NioSocketChannel不同)
    15                 if (!ch.isOpen()) {
    16                     // Connection already closed - no need to handle write.
    17                     return;
    18                 }
    19             }
    20             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    21                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    22                 ch.unsafe().forceFlush();
    23             }
    24             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    25                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    26                 // See https://github.com/netty/netty/issues/924
    27                 int ops = k.interestOps();
    28                 ops &= ~SelectionKey.OP_CONNECT;
    29                 k.interestOps(ops);
    30 
    31                 unsafe.finishConnect();
    32             }
    33         } catch (CancelledKeyException ignored) {
    34             unsafe.close(unsafe.voidPromise());
    35         }
    36     }

     NioServerSocketChannel继承了AbstractNioMessageChannel,所以执行的是AbstractNioMessageChannel的版本

     1   @Override
     2         public void read() {
     3             assert eventLoop().inEventLoop();
     4             final ChannelConfig config = config();
     5             if (!config.isAutoRead() && !isReadPending()) {
     6                 // ChannelConfig.setAutoRead(false) was called in the meantime
     7                 removeReadOp();
     8                 return;
     9             }
    10 
    11             final int maxMessagesPerRead = config.getMaxMessagesPerRead();
    12             final ChannelPipeline pipeline = pipeline();//获取服务端NioServerSocketChannel的pipeline
    13             boolean closed = false;
    14             Throwable exception = null;
    15             try {
    16                 try {
    17                     for (;;) {
    18                         int localRead = doReadMessages(readBuf);//执行这里
    19                         if (localRead == 0) {
    20                             break;
    21                         }
    22                         if (localRead < 0) {
    23                             closed = true;
    24                             break;
    25                         }
    26 
    27                         // stop reading and remove op
    28                         if (!config.isAutoRead()) {
    29                             break;
    30                         }
    31 
    32                         if (readBuf.size() >= maxMessagesPerRead) {
    33                             break;
    34                         }
    35                     }
    36                 } catch (Throwable t) {
    37                     exception = t;
    38                 }
    39                 setReadPending(false);
    40                 int size = readBuf.size();
    41                 for (int i = 0; i < size; i ++) {
    42                     pipeline.fireChannelRead(readBuf.get(i));//引发ChannelRead
    43                 }
    44 
    45                 readBuf.clear();
    46                 pipeline.fireChannelReadComplete();//引发channelReadComplete
    47 
    48                 if (exception != null) {
    49                     if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
    50                         // ServerChannel should not be closed even on IOException because it can often continue
    51                         // accepting incoming connections. (e.g. too many open files)
    52                         closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
    53                     }
    54 
    55                     pipeline.fireExceptionCaught(exception);
    56                 }
    57 
    58                 if (closed) {
    59                     if (isOpen()) {
    60                         close(voidPromise());
    61                     }
    62                 }
    63             } finally {
    64                 // Check if there is a readPending which was not processed yet.
    65                 // This could be for two reasons:
    66                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
    67                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
    68                 //
    69                 // See https://github.com/netty/netty/issues/2254
    70                 if (!config.isAutoRead() && !isReadPending()) {
    71                     removeReadOp();
    72                 }
    73             }
    74         }

    而对于NioSocketChannel而言,其继承自AbstractNioByteChannel,因此调用的AbstractNioByteChannel的read版本如下:

     1  @Override
     2         public final void read() {
     3             final ChannelConfig config = config();
     4             if (!config.isAutoRead() && !isReadPending()) {
     5                 // ChannelConfig.setAutoRead(false) was called in the meantime
     6                 removeReadOp();
     7                 return;
     8             }
     9 
    10             final ChannelPipeline pipeline = pipeline();
    11             final ByteBufAllocator allocator = config.getAllocator();
    12             final int maxMessagesPerRead = config.getMaxMessagesPerRead();
    13             RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
    14             if (allocHandle == null) {
    15                 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
    16             }
    17 
    18             ByteBuf byteBuf = null;
    19             int messages = 0;
    20             boolean close = false;
    21             try {
    22                 int totalReadAmount = 0;//读到的总长度
    23                 boolean readPendingReset = false;
    24                 do {
    25                     byteBuf = allocHandle.allocate(allocator);
    26                     int writable = byteBuf.writableBytes();//获取bytebuf还可以写入的字节数
    27                     int localReadAmount = doReadBytes(byteBuf);//真正的读取,localReadAmount本次读取的实际长度
    28                     if (localReadAmount <= 0) {//什么都没有读到
    29                         // not was read release the buffer
    30                         byteBuf.release();
    31                         byteBuf = null;
    32                         close = localReadAmount < 0;
    33                         break;//跳出循环
    34                     }
    35                     if (!readPendingReset) {
    36                         readPendingReset = true;
    37                         setReadPending(false);
    38                     }
    39                     pipeline.fireChannelRead(byteBuf);//发起调用channelRead,将bytebuf传过去
    40                     byteBuf = null;
    41                     //如果当前读到的总长度+本次读到的总长度已经大于Integer类型的最大值
    42                     if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
    43                         // Avoid overflow.
    44                         totalReadAmount = Integer.MAX_VALUE;
    45                         break;//跳出循环
    46                     }
    47                     //更新总长度
    48                     totalReadAmount += localReadAmount;
    49 
    50                     // stop reading
    51                     if (!config.isAutoRead()) {
    52                         break;//如果不是自动读取,那么读取一次之后就自动停止了
    53                     }
    54                     //如果本次读取的大小没有把bytebuf填满,那么说明数据已经全部读取了
    55                     if (localReadAmount < writable) {
    56                         // Read less than what the buffer can hold,
    57                         // which might mean we drained the recv buffer completely.
    58                         break;//跳出循环
    59                     }
    60                 } while (++ messages < maxMessagesPerRead);
    61 
    62                 pipeline.fireChannelReadComplete();//跳出循环后,引发channelReadComplete
    63                 allocHandle.record(totalReadAmount);
    64 
    65                 if (close) {
    66                     closeOnRead(pipeline);
    67                     close = false;
    68                 }
    69             } catch (Throwable t) {
    70                 handleReadException(pipeline, byteBuf, t, close);
    71             } finally {
    72                 // Check if there is a readPending which was not processed yet.
    73                 // This could be for two reasons:
    74                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
    75                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
    76                 //
    77                 // See https://github.com/netty/netty/issues/2254
    78                 if (!config.isAutoRead() && !isReadPending()) {
    79                     removeReadOp();
    80                 }
    81             }
    82         }

           接着看doMessages

     1  @Override
     2     protected int doReadMessages(List<Object> buf) throws Exception {
     3         SocketChannel ch = javaChannel().accept();//创建SocketChannel,accept客户端
     4 
     5         try {
     6             if (ch != null) {
     7                 buf.add(new NioSocketChannel(this, ch));
     8                 return 1;
     9             }
    10         } catch (Throwable t) {
    11             logger.warn("Failed to create a new channel from an accepted socket.", t);
    12 
    13             try {
    14                 ch.close();
    15             } catch (Throwable t2) {
    16                 logger.warn("Failed to close a socket.", t2);
    17             }
    18         }
    19 
    20         return 0;
    21     }

    执行完doReadMessages之后,针对客户端的SocketChannel已经创建了,由于之后还会引发channelRead和channelReadComplete事件,而这些都会导致pipeline中的ServerBootstrapAcceptor的相应方法被调用,来看一下ServerBootstrapAcceptor源码:

     1 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
     2 
     3         private final EventLoopGroup childGroup;
     4         private final ChannelHandler childHandler;
     5         private final Entry<ChannelOption<?>, Object>[] childOptions;
     6         private final Entry<AttributeKey<?>, Object>[] childAttrs;
     7 
     8         ServerBootstrapAcceptor(
     9                 EventLoopGroup childGroup, ChannelHandler childHandler,
    10                 Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
    11             this.childGroup = childGroup;
    12             this.childHandler = childHandler;
    13             this.childOptions = childOptions;
    14             this.childAttrs = childAttrs;
    15         }
    16 
    17         @Override
    18         @SuppressWarnings("unchecked")
    19         public void channelRead(ChannelHandlerContext ctx, Object msg) {
    20             final Channel child = (Channel) msg;
    21 
    22             child.pipeline().addLast(childHandler);//将最开始配置的childHandler添加到SocketChannel的pipeline中,这个Handler也是一个初始化Handler,原理和服务端的一致
    23 
    24             for (Entry<ChannelOption<?>, Object> e: childOptions) {
    25                 try {
    26                     if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
    27                         logger.warn("Unknown channel option: " + e);
    28                     }
    29                 } catch (Throwable t) {
    30                     logger.warn("Failed to set a channel option: " + child, t);
    31                 }
    32             }
    33 
    34             for (Entry<AttributeKey<?>, Object> e: childAttrs) {
    35                 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    36             }
    37 
    38             try {
    39                 childGroup.register(child).addListener(new ChannelFutureListener() {//将SocketChannel注册到WORK EventLoopGroup中,注册过程与服务端类似,此处不再讲解
    40                     @Override
    41                     public void operationComplete(ChannelFuture future) throws Exception {
    42                         if (!future.isSuccess()) {
    43                             forceClose(child, future.cause());
    44                         }
    45                     }
    46                 });
    47             } catch (Throwable t) {
    48                 forceClose(child, t);
    49             }
    50         }
    51 
    52         private static void forceClose(Channel child, Throwable t) {
    53             child.unsafe().closeForcibly();
    54             logger.warn("Failed to register an accepted channel: " + child, t);
    55         }
    56 
    57         @Override
    58         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    59             final ChannelConfig config = ctx.channel().config();
    60             if (config.isAutoRead()) {
    61                 // stop accept new connections for 1 second to allow the channel to recover
    62                 // See https://github.com/netty/netty/issues/1328
    63                 config.setAutoRead(false);
    64                 ctx.channel().eventLoop().schedule(new Runnable() {
    65                     @Override
    66                     public void run() {
    67                        config.setAutoRead(true);
    68                     }
    69                 }, 1, TimeUnit.SECONDS);
    70             }
    71             // still let the exceptionCaught event flow through the pipeline to give the user
    72             // a chance to do something with it
    73             ctx.fireExceptionCaught(cause);
    74         }
    75     }

    引用一张图(出自:http://blog.csdn.net/zxhoo/article/details/17532857) 。

  • 相关阅读:
    Android 拍照 代码实例
    利用Android手机里的摄像头进行拍照
    看视频时,类加载器没太理解,现在再整理下几个要点
    关于java设计模式与极品飞车游戏的思考
    【Mood-3】心声
    源自梦想 eclipse快捷键整理
    2020重新出发,JAVA语言,JAVA的诞生和发展史
    2020重新出发,序章: 语言的诞生
    2020重新出发,JAVA学前了解,DOS常用命令
    2020重新出发,JAVA学前了解,Windosws常用快捷键
  • 原文地址:https://www.cnblogs.com/chenyangyao/p/5795100.html
Copyright © 2011-2022 走看看