zoukankan      html  css  js  c++  java
  • Netty ServerBootstrap如何绑定端口

    ServerBootstrap监听端口

    接下来带他们通过源码去分析下ServerBootstrap是如何监听端口

    流程

    源码分析

    1. 先看一下启动demo

                EventLoopGroup bossGroup = new NioEventLoopGroup(1);
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 100)
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         if (sslCtx != null) {
                             p.addLast(sslCtx.newHandler(ch.alloc()));
                         }
                         //p.addLast(new LoggingHandler(LogLevel.INFO));
                         p.addLast(serverHandler);
                     }
                 });
                // Start the server.
                ChannelFuture f = b.bind(PORT).sync();
    

    2. ServerBootstrap.bind(PORT)

    首先从ServerBootstrap.bind(PORT)入手,开始看下他是如何去监听端口,完成Nio底层的一些封装。直接看其抽象类AbstractBootstrap的方法实现

        private ChannelFuture doBind(final SocketAddress localAddress) {
            
            final ChannelFuture regFuture = initAndRegister(); // 初始化并且去注册channel
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
    
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    

    3. 我们先分析下initAndRegister()到底干了什么?

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel(); // 这边是ReflectiveChannelFactory类通过反射去创建我们初始化bootstrap设置的Channel,这里由于我们是服务端,那就是NioServerSocketChannel
                init(channel);  
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regFuture;
        }
    

    4. ServerBootstrap类 init方法

    分析一下这个init(channel)干了什么

      void init(Channel channel) {
            setChannelOptions(channel, newOptionsArray(), logger); // 设置channelOptions
            setAttributes(channel, newAttributesArray());// 设置Attributes
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
    
            // 往ChannelPipeline添加了一个ChannelInitializer,此时channelPipeline里结构为。Head-> ChannelInitializer -> Tail
            p.addLast(new ChannelInitializer <Channel>() {
                @Override
                public void initChannel(final Channel ch) {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    总结一下 init大致就是设置一些配置属性以及添加了一个ChannelInitializer,这个ChannelInitializer看他的方法好像是设置一个ServerBootstrapAcceptor,具体哪里执行这个ChannelInitializer不清楚,带着疑惑我们继续往下看。

    5. MultithreadEventLoopGroup类 register方法

    回到步骤3我们看下这行代码

    ChannelFuture regFuture = config().group().register(channel);
    

    config().group() 这个代码就是通过ServerBootstrapConfig的group()方法去获取我们设置的NioEventLoopGroup(boss线程)

    NioEventLoopGroup类的register方法在父类MultithreadEventLoopGroup中实现:

        @Override
        public ChannelFuture register(Channel channel) {
            return next().register(channel);
        }
    

    MultithreadEventLoopGroup 种next()返回的实例是 SingleThreadEventLoop,因此我们直接看SingleThreadEventLoop的registry方法,通过方法的调用链路最终找到下面这个方法:

        @Deprecated
        @Override
        public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(promise, "promise");
            ObjectUtil.checkNotNull(channel, "channel");
            channel.unsafe().register(this, promise);
            return promise;
        }
    

    这下出来了一个新的东西 channel.unsafe(),我们先分析下这个东西返回的是什么,因为我们知道我们的channel是NioServerSocketChannel,所以我们直接去看NioServerSocketChannel的unsafe()方法:

    AbstractNioChannel.unsafe() -> AbstractChannel的unsafe变量 -> AbstractNioMessageChannel.newUnsafe()
    最终我们可以确定返回的是NioMessageUnsafe;

    那我直接看NioMessageUnsafe的register方法,这个方法是在父类AbstractUnsafe中定义

            @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                ObjectUtil.checkNotNull(eventLoop, "eventLoop");
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
                // 将NioServerSocketChannel的eventLoop 绑定到 MultithreadEventLoopGroup的next()返回的eventLoop
                AbstractChannel.this.eventLoop = eventLoop;
                // 如果当前线程是eventLoop则直接执行
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        // 提交一个eventLoop任务,任务会在EventLoop线程启动后去之行,下面会讲EventLoop线程是如何启动的
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
    

    继续看下AbstractUnsafe的register0方法,(此方法不是立马执行,而是等EventLoop线程启动之后,这边可以顺便分许下这个方法)

    
            private void register0(ChannelPromise promise) {
                try {
                    //代码省略
                    doRegister(); // 开始注册,由外部类AbstractChannel实现
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
                 //代码省略
                if (isActive()) {
                        if (firstRegistration) {
                            // 第一次注册通过处罚ChannelActive事件
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // 设置感兴趣的事件
                            beginRead();
                        }
                 }
                       
            }
    

    NioServerSocketChannel继承图

    由上图我们找到doRegister方法在AbstractNioChannel中实现,AbstractChannel里仅仅是个空实现,

        @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    // 注册java的ServerSocketChannel到EventLoop的Selector上去,并且把当前的netty的channel绑定到java的attachment上去,第二次参数0代表不订阅事件
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    
    1. 这边注册完成后但是没有完成Accept事件的注册,我们继续研究下是怎么完成Accept事件的注册,通过代码我们得知如果不是第一次注册直接调用AbstractChannel的beginRead()->AbstractNioChannel的doBeginRead(),然后完成注册,
    2. 第一次调用的话是通过PipeLine触发ChannelActive事件 ,然后调用HeadContext的channelActive方法,然后调用readIfIsAutoRead方法
          // Channel.read() or ChannelHandlerContext.read() was called
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }
            readPending = true;
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);
            }
    

    NioSeverSocketChannel在新建时候初始化到父类AbstractNioChannel是一个SelectionKey.OP_ACCEPT事件,因此这边完成的是连接事件的监听

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }
    
            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
    

    到这里完成了nio ServerSocketChannel selector的注册,

    6. EventLoop类 run方法

    看到这里有同学有疑问,这个提交任务,但是没有看到哪里启动了EventLoop的线程?带着这个疑惑我们看下eventLoop的execute方法。

       @Override
        public void execute(Runnable task) {
            ObjectUtil.checkNotNull(task, "task");
            execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
        }
        
        private void execute(Runnable task, boolean immediate) {
            boolean inEventLoop = inEventLoop(); // 判断是不是当前EventLoop线程
            addTask(task); // 提交job
            if (!inEventLoop) { //如果不是在EventLoop线程中
                startThread(); // 这个是开启线程吗?下面我会给分析下这个代码
                // 移除job的一些操作
                if (isShutdown()) {
                    boolean reject = false;
                    try {
                        if (removeTask(task)) {
                            reject = true;
                        }
                    } catch (UnsupportedOperationException e) {
                        // The task queue does not support removal so the best thing we can do is to just move on and
                        // hope we will be able to pick-up the task before its completely terminated.
                        // In worst case we will log on termination.
                    }
                    if (reject) {
                        reject();
                    }
                }
            }
    
            if (!addTaskWakesUp && immediate) {
                wakeup(inEventLoop);
            }
        }
        
    

    SingleThreadEventExecutor的startThread()这个方法是开启EventLoop的线程(如果线程没有启动的话)

        private void startThread() {
            if (state == ST_NOT_STARTED) {
                // cas判断下避免多线程开启线程,
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    boolean success = false;
                    try {
                        // 开启当前的EventLoop
                        doStartThread();
                        success = true;
                    } finally {
                        if (!success) {
                            STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                        }
                    }
                }
            }
        }
    

    SingleThreadEventExecutor的doStartThread()方法

        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                  // 其他代码省略。。。
                  SingleThreadEventExecutor.this.run();
                  // 其他代码省略。。。
                } 
            });
        }
    

    接下来我们直接看 SingleThreadEventExecutor.this.run()这个方法,其运行的是子类NioEventLoop类中的run方法:

     @Override
        protected void run() {
            int selectCnt = 0;
            for (;;) {
              //代码省略 这里面大致就是处理IO事件 以及 自定义Job事件  
            }
        }
    

    7. ServerBootstrap类 doBind0方法

    通过下面我们可以看到,此时像EventLoop线程池中提交了一个Runnable,里面会调用channel.bind(localAddress, promise)去绑定端口

        private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        // 绑定ip端口逻辑
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    

    那我们直接来看下channel.bind(localAddress, promise)具体看了什么,因为是服务端我们知道channel是NioServerSocketChannel,那我们去这里面寻找答案,果然在里面找到了最关键的一个方法,调用了pipeline的bind方法。pipeline默认是DefaultChannelPipeline

        @Override
        public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
            return pipeline.bind(localAddress, promise);
        }
    

    我们继续往下看DefaultChannelPipeline的bind方法,调用了Tail节点的bind方法,然后往Head节点传播

        @Override
        public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
            // 调用tail节点的bind
            return tail.bind(localAddress, promise);
        }
      
    

    tail的bind方法定义在其父类AbstractChannelHandlerContext中

        @Override
        public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(localAddress, "localAddress");
            if (isNotValidPromise(promise, false)) {
                // cancelled
                return promise;
            }
    
            final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeBind(localAddress, promise);
            } else {
                // 提交job,最终会被EventLoop执行
                safeExecute(executor, new Runnable() {
                    @Override
                    public void run() {
                        next.invokeBind(localAddress, promise);
                    }
                }, promise, null, false);
            }
            return promise;
        }
    

    这时候我们发现又是提交了一个Runnable去调用下一个的invokeBind方法

       private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
            if (invokeHandler()) {
                try {
                    // 这行实际调用的是Header节点中的bind(this, localAddress, promise)
                    ((ChannelOutboundHandler) handler()).bind()(this, localAddress, promise);
                } catch (Throwable t) {
                    notifyOutboundHandlerException(t, promise);
                }
            } else {
                bind(localAddress, promise);
            }
        }
    

    直接看HeadContext中的实现方法

            @Override
            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
                // 这个unsafe是NioServerScoketChannel中产生的
                unsafe.bind(localAddress, promise);
            }
    

    分析代码在其方法里找到了AbstractUnsafe类最终调用的外部类(NioServerScoketChannel)doBind方法,我们得知道NioServerScoketChannel中肯定存在doBind方法的实现l类

            @Override
            public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
                assertEventLoop();
                // 代码省略
                 try {
                    //核心代码出现,  
                    doBind(localAddress);
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    closeIfClosed();
                    return;
                }
              // 代码省略
            }
    

    NioServerScoketChannel的doBind方法

        @SuppressJava6Requirement(reason = "Usage guarded by java version check")
        @Override
        protected void doBind(SocketAddress localAddress) throws Exception {
            //获取java的channel然后开始绑定端口
            if (PlatformDependent.javaVersion() >= 7) {
                javaChannel().bind(localAddress, config.getBacklog());
            } else {
                javaChannel().socket().bind(localAddress, config.getBacklog());
            }
        }
    

    综上自此完成端口的绑定

    总结一下

    根据以上源码分析,我们大致能够清晰看到Netty是如何去封装服务端的端口绑定,下面我们总结下主要流程

    1. 初始化netty channel,设置一些属性,初始化pipeline等操作
    2. 注册channel
    • 绑定channel设置EventLoop
    • 将初始化的java channel绑定到EventLoop的selector上
    • 启动EventLoop的run方法,用于处理Io事件
    • pipeline触发fireChannelActive注册Accept事件
    1. 执行bind方法,

    结束

    识别下方二维码!回复: 入群 ,扫码加入我们交流群!

    点赞是认可,在看是支持

    欢迎关注我的公众号!里面可以加入微信技术交流群!
  • 相关阅读:
    多表头GridView
    Updater Application Block自动更新实施方案[源代码]
    IE和Firefox的Javascript兼容性总结
    JavaScript IE加速
    GridView汇总
    Oracle 中取当前日期的上个月最后天和第一天
    Atitit,通过pid获取进程文件路径 java php  c#.net版本大总结
    Atitit. 数据库catalog与schema的设计区别以及在实际中使用 获取数据库所有库表 java jdbc php  c#.Net
    Atitit. 数据约束 校验 原理理论与 架构设计 理念模式java php c#.net js javascript mysql oracle
    Atitit.一些公司的开源项目 重大知名开源项目attilax总结
  • 原文地址:https://www.cnblogs.com/1ssqq1lxr/p/15239188.html
Copyright © 2011-2022 走看看