zoukankan      html  css  js  c++  java
  • Netty 服务端:新连接接入

      本文主要分析服务端新连接的接入过程,主要分为以下 2 个各步骤:

    1. select 操作;
    2. processSelectedKeys 操作。

    1. select 操作

      在分析 select 操作前,先要回顾一下 NioEventLoop 的 run()方法及其父类 SingleThreadEventExecutor 的 execute(Runnable task)方法。

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        //判断是否是 netty 线程添加的任务
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);//是的话直接添加任务到队列
        } else {
            //不是的话,可能要创建 netty 线程
            startThread();
            //然后添加任务到队列
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        // 构造方法中将 addTaskWakesUp 置 false
        // wakesUpForTask(task) 直接返回 true
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            // 所以 wakeup()方法肯定会被调用
            wakeup(inEventLoop);
        }
    }
    

      

      这里的 wakeup(boolean inEventLoop)方法分析 NioEventLoop 中的:

    @Override
    protected void wakeup(boolean inEventLoop) {
        // 如果是非 Netty 线程添加任务,那么 wakenUp.compareAndSet(false ,true)
        // 成功的话,就会调用 selector 的 wakeup()方法
        if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
            // 回顾一下 selector 的 wakeup()方法
            // 1. 当 selector 在执行 select 操作时,调用它的 wakeup()方法,
            // 那么当前的 select 操作会立刻返回已就绪的事件集
            // 2. 如果提前调用 selector 的 wakeup()方法(一次或者多次都是一样的)
            // 那么下一次的 select 操作会直接返回(应该是没有就绪事件)
            // 关于 selector 的 wakeup()方法,可以看一下文末的参考资料
            selector.wakeup();
        }
    }
    

      接下来再 分析以下 NioEventLoop 中的 run()方法:

    protected void run() {
        for (;;) {
            try {
                // 计算 select 策略,当前有任务时,会进行一次 selectNow 操作返回就绪的 key 个数(大于等于 0)
                // SelectStrategy.CONTINUE 值是 -2,SelectStrategy.SELECT 的值是 -1
                // (这里的 SelectStrategy.CONTINUE 感觉不会匹配到)
                // 显然 switch 中没有匹配项,直接跳出 switch
                // 无任务时,则直接返回 SelectStrategy.SELECT
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        //当没有可处理的任务时,直接进行 select 操作
                        // wakenUp.getAndSet(false) 返回的是 oldValue,由于默认值是 false
                        // 所以第一次返回的是 false,需要注意的是,只有在这个地方才将 wakenUp 置为 false
                        select(wakenUp.getAndSet(false));
    
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                //根据比例来处理 IO 事件和任务
                if (ioRatio == 100) {
                    //...
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        // 计算出处理 IO 事件的时间,然后根据比例算出执行任务的时间
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            //...
        }
    }
    

      wakenUp 是用来减少调用 selector 的 wakeup()方法,关于 wakeup()方法的实现细节,参考文末的资料。接下来分析一下 key 的处理:

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            // 置为 null,参考上面一行的注释
            selectedKeys[i] = null;
    
            // 在前面 AbstractNioChannel 中的 doRegister()方法中,注册的时候传入的参数是 this 
            // 接入连接时,这个 a 就是 AbstractNioChannel 对象
            final Object a = k.attachment();
    
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                //这里暂时不熟悉
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            //...
        }
    }

     2. processSelectedKeys 操作

      接下来分析一下 processSelectedKey()方法:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //取出 channel 对应的 unsafe
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            //... 当 key 无效时,关闭 channel 等操作
        }
    
        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // 连接就绪事件只需要处理一次就行了,否则后续的 select()操作会一直立刻返回
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                // 最后调用 SocketChannel 的 finishConnect()方法
                unsafe.finishConnect();
            }
    
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
    
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                // read 和 accept
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    

      由于新连接的接入是一个就绪的 ACCEPT 事件,所以分析一下 unsafe.read(),这个 unsafe 对象是创建服务端 Channel 时创建的,是一个 NioMessageUnsafe 对象,它的 read()方法中有一行:

    localRead = doReadMessages(readBuf);

      该方法的实现选择 NioServerSocketChannel 中的:

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        // 接入连接
        SocketChannel ch = javaChannel().accept();
        try {
            if (ch != null) {
                // 向 Object 列表中加入封装后的 NioSocketChannel 对象
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);
    
            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
    
        return 0;
    }

      回到 NioMessageUnsafe 的 read()方法,doReadMessages()方法返回了 NioSocketChannel 列表,接下来的一行:

    pipeline.fireChannelRead(readBuf.get(i));
    

      就会调用服务端 Channel 中的 handler 链,首先是用户添加的 handler,最后会找到前面说过的 ServerBootstrapAcceptor,它的 channelRead()方法中的 msg 参数实际上是 Channel 对象。

      接下来对这个客户端的 Channel 的处理与服务端 Channel 的处理过程基本类似。

  • 相关阅读:
    爬取笔趣阁小说(一念永恒)
    爬虫requests爬去网页乱码问题
    requests bs4 datetime re json
    添加背景音乐。c
    strip()
    爬虫学习中遇到的问题
    super的用法(带了解)
    user-agent
    输入n个字符串,用空格隔开。这些字符串中有重复出现的。现在统计每个字符串出现的次数,并找出出现次数最多的字符串。
    字节跳动小程序的一些坑
  • 原文地址:https://www.cnblogs.com/magexi/p/10318143.html
Copyright © 2011-2022 走看看