zoukankan      html  css  js  c++  java
  • Netty-ServerBootstrap

    ServerBootstrap 为 netty 建立服务端的辅助类, 以 NIO为例,创建代码如下:

    public static void main(String[] args) throws Exception {
            
            ServerBootstrap bs = new ServerBootstrap();
    
            bs.group(new NioEventLoopGroup(1), new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline()
                            .addLast(new HttpServerCodec())
                            .addLast(new HttpObjectAggregator(65535))
                            .addLast(new Controller());
                        }
                    }).bind(8080).sync().channel().closeFuture().sync();
    
            
        }

    核心参数如下:

     
        //配置属性,如 SO_KEEPALIVE 等private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
    //acceot 的 子channel所绑定的 事件循环组"
    private volatile EventLoopGroup childGroup; private volatile ChannelHandler childHandler;

    初始化流程主要为 绑定本地端口 -> 注册自身到 EventLoop , 并注册 accept 和 read 事件 -> EventLoop的主循环中会不断的select注册的channel的事件,并处理。

     

    首先执行绑定,核心逻辑位于  io.netty.bootstrap.AbstractBootstrap.doBind(SocketAddress) 和  io.netty.bootstrap.AbstractBootstrap.initAndRegister()中

    private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            ..........if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
    //绑定逻辑 doBind0(regFuture, channel, localAddress, promise);
    return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }

    先来看 initAndRegister , 核心逻辑就是利用channelFactory初始化一个NioServerSocketChannel实例,并为其设置上config中的参数,然后将其注册到EventLoop中,实际上是委托的channel的Unsafe来实现注册的,核心逻辑位于 AbstractUnsafe.register0 中 完成注册

    final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
    //本例子中实际调用的是 NioServerSocketChannel的构造参数, 并为其设置感兴趣的事件类型为 OP_ACCEPT channel
    = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
     1 void init(Channel channel) throws Exception {
     2         //设置属性
    15          ..........
    
    17         p.addLast(new ChannelInitializer<Channel>() {
    30             @Override
    31             public void initChannel(final Channel ch) throws Exception {
    32                 final ChannelPipeline pipeline = ch.pipeline();
    33                 ChannelHandler handler = config.handler();
    34                 if (handler != null) {
    35                     pipeline.addLast(handler);
    36                 }
    37 
    38                 ch.eventLoop().execute(new Runnable() {
    39                     @Override
    40                     public void run() {
                               //为NioServerSocketChannel 设置一个 默认的 channelhandler : ServerBootstrapAcceptor , 当发生 accept事件时,将 accept的channel注册到 childEventLoop中
    41                         pipeline.addLast(new ServerBootstrapAcceptor(
    42                                 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    43                     }
    44                 });
    45             }
    46         });
    47     }
    private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
    //执行channel到 eventloop的 selector doRegister(); neverRegistered
    = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise);
                     //触发 InboundChannelHnader.channelRegistered 事件
    pipeline.fireChannelRegistered();
                     // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) {
    //触发channelActive事件,并会为 channel 绑定上 read 事件 pipeline.fireChannelActive(); }
    else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }

    initAndRegister注册成功后,开始执行真正的绑定端口逻辑,核心逻辑位于 NioSocketChannel.doBind0(SocketAddress) 中

     private void doBind0(SocketAddress localAddress) throws Exception {
            if (PlatformDependent.javaVersion() >= 7) {
                SocketUtils.bind(javaChannel(), localAddress);
            } else {
                SocketUtils.bind(javaChannel().socket(), localAddress);
            }
        }

    至此 绑定个成功, 当触发 ACCEPT 事件时, 会触发  NioServerSocketChannel.doReadMessages -> ServerBootstrapAcceptor.channelRead , 并将 子channel 注册到 childEventLoop中

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                setChannelOptions(child, childOptions, logger);
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
    //注册channel childGroup.register(child).addListener(
    new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
  • 相关阅读:
    构建之法 读书笔记二
    聚集索引,非聚集索引,覆盖索引
    最佳左前缀法则
    悲观锁和乐观锁
    JVM (三)- GC 垃圾回收器
    JVM 内存模型
    Java内存模型(JMM) 和 JVM 内存模型区别
    byType 和 byName 的区别
    Spring注入方式
    Java高性能编程-java基础-1.1.5线程通信
  • 原文地址:https://www.cnblogs.com/ironroot/p/8581065.html
Copyright © 2011-2022 走看看