zoukankan      html  css  js  c++  java
  • java netty之ServerBootstrap的启动

    通过前面的几篇文章,对整个netty部分的架构已经运行原理都有了一定的了解,那么这篇文章来分析一个经常用到的类:ServerBootstrap,一般对于服务器端的编程它用到的都还算是比较的多。。看一看它的初始化,以及它的运行原理。。。

    首先我们还是引入一段代码,通过分析这段代码来分析ServerBootstrap的运行。。。

    [java] view plain copy
     
    1. EventLoopGroup bossGroup = new NioEventLoopGroup();   //这个是用于serversocketchannel的eventloop  
    2.         EventLoopGroup workerGroup = new NioEventLoopGroup();    //这个是用于处理accept到的channel  
    3.         try {  
    4.             ServerBootstrap b = new ServerBootstrap();    //构建serverbootstrap对象  
    5.             b.group(bossGroup, workerGroup);   //设置时间循环对象,前者用来处理accept事件,后者用于处理已经建立的连接的io  
    6.             b.channel(NioServerSocketChannel.class);   //用它来建立新accept的连接,用于构造serversocketchannel的工厂类  
    7.             b.childHandler(new ChannelInitializer<SocketChannel>(){      //为accept channel的pipeline预添加的inboundhandler  
    8.                 @Override     //当新连接accept的时候,这个方法会调用  
    9.                 protected void initChannel(SocketChannel ch) throws Exception {  
    10.                     // TODO Auto-generated method stub  
    11.                     ch.pipeline().addLast(new MyChannelHandler());   //为当前的channel的pipeline添加自定义的处理函数  
    12.                 }  
    13.                   
    14.             });  
    15.             //bind方法会创建一个serverchannel,并且会将当前的channel注册到eventloop上面,  
    16.             //会为其绑定本地端口,并对其进行初始化,为其的pipeline加一些默认的handler  
    17.             ChannelFuture f = b.bind(80).sync();      
    18.             f.channel().closeFuture().sync();  //相当于在这里阻塞,直到serverchannel关闭  
    19.         } finally {  
    20.             bossGroup.shutdownGracefully();  
    21.             workerGroup.shutdownGracefully();  
    22.         }  

    这段代码在前面的文章也有用到,基本上其意思也都在上面的注释中说的比较清楚了,那么我们接下来具体的分析其中的方法调用,首先是ServerBootstrap的group方法:

    [java] view plain copy
     
    1. //这里parent用于执行server的accept时间事件,child才是用于执行获取的channel连接的事件  
    2. public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {  
    3.     super.group(parentGroup);  
    4.     if (childGroup == null) {  
    5.         throw new NullPointerException("childGroup");  
    6.     }  
    7.     if (this.childGroup != null) {  
    8.         throw new IllegalStateException("childGroup set already");  
    9.     }  
    10.     this.childGroup = childGroup;  
    11.     return this;  
    12. }  

    这个方法是用来设置eventloopgroup,首先调用了父类的group方法(abstractbootstrap),就不将父类的方法列出来了,其实意思都差不多,eventloopgroup属性的值。。。

    好了,接下来我们再来看一下channel方法:

    [java] view plain copy
     
    1. //构造serversocketchannel factory  
    2. public B channel(Class<? extends C> channelClass) {  
    3.     if (channelClass == null) {  
    4.         throw new NullPointerException("channelClass");  
    5.     }  
    6.     return channelFactory(new BootstrapChannelFactory<C>(channelClass));  //构造工厂类  
    7. }  
    8.   
    9. /** 
    10.  * {@link ChannelFactory} which is used to create {@link Channel} instances from 
    11.  * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)} 
    12.  * is not working for you because of some more complex needs. If your {@link Channel} implementation 
    13.  * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for 
    14.  * simplify your code. 
    15.  */  
    16. @SuppressWarnings("unchecked")  
    17. public B channelFactory(ChannelFactory<? extends C> channelFactory) {  
    18.     if (channelFactory == null) {  
    19.         throw new NullPointerException("channelFactory");  
    20.     }  
    21.     if (this.channelFactory != null) {  
    22.         throw new IllegalStateException("channelFactory set already");  
    23.     }  
    24.   
    25.     this.channelFactory = channelFactory;   //设置  
    26.     return (B) this;  
    27. }  

    该方法主要是用于构造用于产生channel的工厂类,在我们这段代码说白了就是用于实例化serversocketchannel的工厂类。。。

    接下来我们再来看一下childHandler方法:

    [java] view plain copy
     
    1. //设置childHandler,这个是当有channel accept之后为其添加的handler  
    2. public ServerBootstrap childHandler(ChannelHandler childHandler) {  
    3.     if (childHandler == null) {  
    4.         throw new NullPointerException("childHandler");  
    5.     }  
    6.     this.childHandler = childHandler;  
    7.     return this;  
    8. }  

    这个很简单吧,就是一个赋值,具体说他有什么用,前面的注释有说明,不过以后的分析会说明它有什么用的。。。

    接下来我们来看一下bind方法,这个比较重要吧:

    [java] view plain copy
     
    1. //最终将会创建serverchannel,然后会将其绑定到这个地址,然后对其进行初始化  
    2. public ChannelFuture bind(int inetPort) {  
    3.     return bind(new InetSocketAddress(inetPort));  
    4. }  

    好吧,接下来再来看bind方法:

    [java] view plain copy
     
    1. public ChannelFuture bind(SocketAddress localAddress) {  
    2.     validate();  
    3.     if (localAddress == null) {  
    4.         throw new NullPointerException("localAddress");  
    5.     }  
    6.     return doBind(localAddress);  
    7. }  

    好吧,再来看看doBind方法:

    [java] view plain copy
     
    1. private ChannelFuture doBind(final SocketAddress localAddress) {  
    2.     final ChannelFuture regPromise = initAndRegister();   //在这里创建serverchanel,并对其进行初始化,并将其注册到eventloop当中去  
    3.     final Channel channel = regPromise.channel();  
    4.     final ChannelPromise promise = channel.newPromise();  
    5.     if (regPromise.isDone()) {  
    6.         doBind0(regPromise, channel, localAddress, promise);   //将当前的serverchannel绑定地址  
    7.     } else {  
    8.         regPromise.addListener(new ChannelFutureListener() {  
    9.             @Override  
    10.             public void operationComplete(ChannelFuture future) throws Exception {  
    11.                 doBind0(future, channel, localAddress, promise);  
    12.             }  
    13.         });  
    14.     }  
    15.   
    16.     return promise;  
    17. }  

    这里调用了一个比较重要的方法:initAndRegister,我们来看看它的定义:

    [java] view plain copy
     
    1. //创建初始化以及注册serverchanel  
    2.     final ChannelFuture initAndRegister() {  
    3.         //利用工厂类创建channel  
    4.         final Channel channel = channelFactory().newChannel();  
    5.         try {  
    6.             init(channel);  //init函数留给了后面来实现,用于初始化channel,例如为其的pipeline加上handler  
    7.         } catch (Throwable t) {  
    8.             channel.unsafe().closeForcibly();  
    9.             return channel.newFailedFuture(t);  
    10.         }  
    11.   
    12.         ChannelPromise regPromise = channel.newPromise();  
    13.         group().register(channel, regPromise);  //将当前创建的serverchannel注册到eventloop上面去  
    14.         if (regPromise.cause() != null) {  
    15.             if (channel.isRegistered()) {  
    16.                 channel.close();  
    17.             } else {  
    18.                 channel.unsafe().closeForcibly();  
    19.             }  
    20.         }  
    21.   
    22.         // If we are here and the promise is not failed, it's one of the following cases:  
    23.         // 1) If we attempted registration from the event loop, the registration has been completed at this point.  
    24.         //    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.  
    25.         // 2) If we attempted registration from the other thread, the registration request has been successfully  
    26.         //    added to the event loop's task queue for later execution.  
    27.         //    i.e. It's safe to attempt bind() or connect() now:  
    28.         //         because bind() or connect() will be executed *after* the scheduled registration task is executed  
    29.         //         because register(), bind(), and connect() are all bound to the same thread.  
    30.   
    31.         return regPromise;  
    32.     }  

    代码还是很简单,而且也相对比较好理解,无非就是利用前面说到过的channel工厂类来创建一个serversocketchannel,然后调用init方法对这个刚刚生成的channel进行一些初始化的操作,然后在调用eventloopgroup的register方法,将当前这个channel的注册到group上,那么以后这个channel的事件都在这个group上面执行,说白了也就是一些accept。、。。

    好,我们先来看看这个init方法吧:

    [java] view plain copy
     
    1. @Override  
    2. //初始化chanel,当用channel factory构造channel以后,会调用这个函数来初始化,说白了就是为当前的channel的pipeline加入一些handler  
    3. void init(Channel channel) throws Exception {  
    4.     //先初始化一些配置  
    5.     final Map<ChannelOption<?>, Object> options = options();  
    6.     synchronized (options) {  
    7.         channel.config().setOptions(options);  
    8.     }  
    9.    //初始化一些属性  
    10.     final Map<AttributeKey<?>, Object> attrs = attrs();  
    11.     synchronized (attrs) {  
    12.         for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {  
    13.             @SuppressWarnings("unchecked")  
    14.             AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();  
    15.             channel.attr(key).set(e.getValue());  
    16.         }  
    17.     }  
    18.   
    19.     //获取当前channel的pipeline  
    20.     ChannelPipeline p = channel.pipeline();  
    21.     if (handler() != null) {  
    22.         p.addLast(handler());  
    23.     }  
    24.   
    25.     final EventLoopGroup currentChildGroup = childGroup;  
    26.     final ChannelHandler currentChildHandler = childHandler;  
    27.     final Entry<ChannelOption<?>, Object>[] currentChildOptions;  
    28.     final Entry<AttributeKey<?>, Object>[] currentChildAttrs;  
    29.     synchronized (childOptions) {  
    30.         currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));  
    31.     }  
    32.     synchronized (childAttrs) {  
    33.         currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));  
    34.     }  
    35.   
    36.     p.addLast(new ChannelInitializer<Channel>() {  
    37.         @Override  
    38.         public void initChannel(Channel ch) throws Exception {  
    39.             //这是一个inboundher,将其加入到serverchannel的pipeline上面去  
    40.             ch.pipeline().addLast(new ServerBootstrapAcceptor(  
    41.                     currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));  
    42.         }  
    43.     });  
    44. }  

    代码还是相对很简单,首先初始化一些配置参数,然后初始化属性,最后还要为当前的channel的pipeline添加一个handler,这个handler用来当channel注册到eventloop上面之后对其进行一些初始化,我们还是来看看channelInitalizer的定义吧:

    [java] view plain copy
     
    1. public abstract class ChannelInitializer<C extends Channel> extends ChannelStateHandlerAdapter {  
    2.   
    3.     private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);  
    4.   
    5.     /** 
    6.      * This method will be called once the {@link Channel} was registered. After the method returns this instance 
    7.      * will be removed from the {@link ChannelPipeline} of the {@link Channel}. 
    8.      * 
    9.      * @param ch            the {@link Channel} which was registered. 
    10.      * @throws Exception    is thrown if an error occours. In that case the {@link Channel} will be closed. 
    11.      */  
    12.     protected abstract void initChannel(C ch) throws Exception;  
    13.   
    14.     @SuppressWarnings("unchecked")  
    15.     @Override  
    16.     public final void channelRegistered(ChannelHandlerContext ctx)  
    17.             throws Exception {  
    18.         boolean removed = false;  
    19.         boolean success = false;  
    20.         try {  
    21.             //调用用户定义的init函数对当前的channel进行初始化  
    22.             initChannel((C) ctx.channel());  
    23.             ctx.pipeline().remove(this);  
    24.             removed = true;  
    25.             ctx.fireChannelRegistered();  
    26.             success = true;  
    27.         } catch (Throwable t) {  
    28.             logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);  
    29.         } finally {  
    30.             if (!removed) {  
    31.                 ctx.pipeline().remove(this);  
    32.             }  
    33.             if (!success) {  
    34.                 ctx.close();  
    35.             }  
    36.         }  
    37.     }  
    38.   
    39.     @Override  
    40.     public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {  
    41.         ctx.fireInboundBufferUpdated();  
    42.     }  
    43. }  

    它有一个channelRegistered方法,这个方法是在当前pipeline所属的channel注册到eventloop上面之后会激活的方法,它则是调用了用户自定义的函数来初始化channel,然后在将当前handler移除。。。也就是执行

    [java] view plain copy
     
    1. ch.pipeline().addLast(new ServerBootstrapAcceptor(  
    2.                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));  

    这里又为当前的serversocketchannel添加了另外一个handler,来看看该类型的定义吧:

    [java] view plain copy
     
    1. private static class ServerBootstrapAcceptor  
    2.         extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler<Channel> {  
    3.   
    4.     private final EventLoopGroup childGroup;  
    5.     private final ChannelHandler childHandler;  
    6.     private final Entry<ChannelOption<?>, Object>[] childOptions;  
    7.     private final Entry<AttributeKey<?>, Object>[] childAttrs;  
    8.   
    9.     @SuppressWarnings("unchecked")  
    10.     ServerBootstrapAcceptor(  
    11.             EventLoopGroup childGroup, ChannelHandler childHandler,  
    12.             Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {  
    13.         this.childGroup = childGroup;  //这个是用于管理accept的channel的eventloop  
    14.         this.childHandler = childHandler;  
    15.         this.childOptions = childOptions;  
    16.         this.childAttrs = childAttrs;  
    17.     }  
    18.   
    19.     @Override  
    20.     public MessageBuf<Channel> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {  
    21.         return Unpooled.messageBuffer();  
    22.     }  
    23.   
    24.     @Override  
    25.     @SuppressWarnings("unchecked")  
    26.     //当有数据进来的时候,会调用这个方法来处理数据,这里进来的数据就是accept的channel  
    27.     public void inboundBufferUpdated(ChannelHandlerContext ctx) {  
    28.         MessageBuf<Channel> in = ctx.inboundMessageBuffer(); //获取buf  
    29.         for (;;) {  
    30.             Channel child = in.poll();  
    31.             if (child == null) {  
    32.                 break;  
    33.             }  
    34.   
    35.             child.pipeline().addLast(childHandler);   //为accept的channel的pipeline加入用户定义的初始化handler  
    36.   
    37.             for (Entry<ChannelOption<?>, Object> e: childOptions) {  
    38.                 try {  
    39.                     if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {  
    40.                         logger.warn("Unknown channel option: " + e);  
    41.                     }  
    42.                 } catch (Throwable t) {  
    43.                     logger.warn("Failed to set a channel option: " + child, t);  
    44.                 }  
    45.             }  
    46.   
    47.             for (Entry<AttributeKey<?>, Object> e: childAttrs) {  
    48.                 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());  
    49.             }  
    50.   
    51.             try {  
    52.                 childGroup.register(child);   //将当前accept的channel注册到eventloop  
    53.             } catch (Throwable t) {  
    54.                 child.unsafe().closeForcibly();  
    55.                 logger.warn("Failed to register an accepted channel: " + child, t);  
    56.             }  
    57.         }  
    58.     }  
    59.   
    60.     @Override  
    61.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
    62.         final ChannelConfig config = ctx.channel().config();  
    63.         if (config.isAutoRead()) {  
    64.             // stop accept new connections for 1 second to allow the channel to recover  
    65.             // See https://github.com/netty/netty/issues/1328  
    66.             config.setAutoRead(false);  
    67.             ctx.channel().eventLoop().schedule(new Runnable() {  
    68.                 @Override  
    69.                 public void run() {  
    70.                    config.setAutoRead(true);  
    71.                 }  
    72.             }, 1, TimeUnit.SECONDS);  
    73.         }  
    74.         // still let the exceptionCaught event flow through the pipeline to give the user  
    75.         // a chance to do something with it  
    76.         ctx.fireExceptionCaught(cause);  
    77.     }  
    78. }  

    主要是有一个比较重要的方法,inboundBufferUpdated,这个方法是在有数据进来的时候会调用的,用于处理进来的数据,也就是accept到的channel,这里就知道我们定义的chidHandler的用处了吧,netty会将这个handler直接加入到刚刚accept到的channel的pipeline上面去。。。最后还要讲当前accept到的channel注册到child eventloop上面去,这里也就完完全全的明白了最开始定义的两个eventloopgroup的作用了。。。

    好了,serversocketchannel的init以及register差不多了,然后会调用doBind0方法,将当前的serversocketchannel绑定到一个本地端口,

    [java] view plain copy
     
    1. //将chanel绑定到一个本地地址  
    2.     private static void doBind0(  
    3.             final ChannelFuture regFuture, final Channel channel,  
    4.             final SocketAddress localAddress, final ChannelPromise promise) {  
    5.   
    6.         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up  
    7.         // the pipeline in its channelRegistered() implementation.  
    8.   
    9.         channel.eventLoop().execute(new Runnable() {  
    10.             @Override  
    11.             //匿名内部类想要访问外面的参数,那么外面的参数必须是要final的才行  
    12.             public void run() {  
    13.                 if (regFuture.isSuccess()) {  
    14.                     //调用channel的bind方法,将当前的channl绑定到一个本地地址,其实是调用的是pipeline的bind方法,但是最终又是调用的当前  
    15.                     //channel的unsafe对象的bind方法  
    16.                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);  
    17.                 } else {  
    18.                     promise.setFailure(regFuture.cause());  
    19.                 }  
    20.             }  
    21.         });  
    22.     }  

    其实这里调用bind方法最终还是调用serversocketchannel的unsafe对象的bind方法。。。。

    到这里,整个serverbootstrap 就算初始化完成了,而且也可以开始运行了。。。

    [java] view plain copy
     
    1. b.childHandler(new ChannelInitializer<SocketChannel>(){      //为accept channel的pipeline预添加的inboundhandler  
    2.                 @Override     //当新连接accept的时候,这个方法会调用  
    3.                 protected void initChannel(SocketChannel ch) throws Exception {  
    4.                     // TODO Auto-generated method stub  
    5.                     ch.pipeline().addLast(new MyChannelHandler());   //为当前的channel的pipeline添加自定义的处理函数  
    6.                 }  
    7.                   
    8.             });  

    这段代码的意思是对于刚刚accept到的channel,将会在它的pipeline上面添加handler,这个handler的用处主要是就是用户自定义的initChannel方法,就是初始化这个channel,说白了就是为它的pipeline上面添加自己定义的handler。。。

    这样整个serverbootstrap是怎么运行的也就差不多了。。。

    刚开始接触到netty的时候觉得这里一头雾水,通过这段时间对其代码的阅读,总算搞懂了其整个运行的原理,而且觉得其设计还是很漂亮的,虽然有的时候会觉得有那么一点点的繁琐。。。。

    整个运行过程总结为一下几个步骤:

    (1)创建用于两个eventloopgroup对象,一个用于管理serversocketchannel,一个用于管理accept到的channel

    (2)创建serverbootstrap对象,

    (3)设置eventloopgroup

    (4)创建用于构建用到的channel的工厂类

    (5)设置childhandler,它的主要功能主要是用户定义代码来初始化accept到的channel

    (6)创建serversocketchannel,并对它进行初始化,绑定端口,以及register,并为serversocketchannel的pipeline设置默认的handler

    通过这几个步骤,整个serverbootstrap也就算是运行起来了。。。

  • 相关阅读:
    股指期货高频数据机器学习预测
    如何使用TradingView(TV)回测数字货币交易策略
    使用EXCEL计算并绘制MACD指标
    使用EXCEL计算并绘制EMA指标
    使用EXCEL计算并绘制MFI指标
    使用EXCEL计算并绘制KDJ指标
    使用EXCEL计算并绘制OBV指标
    如何使用Excel绘制砖型图Renko Charts
    简约主义的市场分析图表--砖型图Renko Charts
    教你一步一步使用Excel获取API接口的金融数据
  • 原文地址:https://www.cnblogs.com/405845829qq/p/5195966.html
Copyright © 2011-2022 走看看