zoukankan      html  css  js  c++  java
  • 深入理解NIO(三)—— NIO原理及部分源码的解析

    深入理解NIO(三)—— NIO原理及部分源码的解析

    欢迎回到淦™的源码看爆系列

    在看完前面两个系列之后,相信大家对NIO也有了一定的理解,接下来我们就来深入源码去解读它,我这里的是OpenJDK-8u60版本,建议大家也下一份放ide里和我一起看会比较好理解。(这里主要介绍Selector,Buffer第一篇有提到一点,Channel也不过是些Buffer的操作方法而已,这里就不提及了,大家感兴趣可以自己去看)

    老哥行行好,转载和我说一声好吗,我不介意转载的,但是请把原文链接贴大点好吗

    open()

    // 1. 创建Selector
    Selector selector = Selector.open();

    首先我们来分析open方法:

    // Selector
    public static Selector open() throws IOException {
        // 这里的静态方法provider会使用DefaultSelectorProvider.create();方法根据系统选择一个SelectorProvider
        // windows平台的话是WindowsSelectorProvider,
        // Linux平台是一个EPollSelectorProvider,这里主要分析Linux平台下的
        // 之后openSelector方法(一会看下面)会返回一个EPollSelectorImpl作为Selector的实现,我们一般提及的Selector就是它了
        return SelectorProvider.provider().openSelector();
    }
    
    // EPollSelectorProvider
    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }

    之后是EPollSelectorImpl的构造方法:

    EPollSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
        long pipeFds = IOUtil.makePipe(false);
        fd0 = (int) (pipeFds >>> 32);
        fd1 = (int) pipeFds;
        // 其他的我也看不太懂,我们直接进去这个EPollArrayWrapper的构造方法
        pollWrapper = new EPollArrayWrapper();
        pollWrapper.initInterrupt(fd0, fd1);
        fdToKey = new HashMap<>();
    }
    
    // EPollArrayWrapper
    EPollArrayWrapper() throws IOException {
        // 直接看这里,这里调用了一个封装出来的Linux的api:epoll_create,这个东西大概可以理解成一个selector,详细的我们下一章再讲解
        epfd = epollCreate();
    }

    所以其实Selector方法大抵上就是封装了一个epoll_create() 方法,当然还调用了一下epoll_ctl(),把serverchannel给注册进去。

    register()

    // 5. 将channel注册到selector上,监听连接事件
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    接下来我们分析把channel注册到Selector上的register方法

    // SelectableChannel
    public final SelectionKey register(Selector sel, int ops)
            throws ClosedChannelException
    {
        return register(sel, ops, null);
    }
    
    // AbstractSelectableChannel
    public final SelectionKey register(Selector sel, int ops,
                                       Object att)
            throws ClosedChannelException
    {
        // 212行,剩下的删掉了
        k = ((AbstractSelector)sel).register(this, ops, att);
                    
    }
    
    // SelectorImpl
    protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object attachment)
    {
        if (!(ch instanceof SelChImpl))
            throw new IllegalSelectorException();
        //生成SelectorKey来存储到hashmap中,一共之后获取
        SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
        //attach用户想要存储的对象
        k.attach(attachment);
        //调用子类的implRegister方法,接下来进去这里
        synchronized (publicKeys) {
            implRegister(k);
        }
        //设置关注的option
        k.interestOps(ops);
        return k;
    }
    protected void implRegister(SelectionKeyImpl ski) {
            if (closed)
                throw new ClosedSelectorException();
            SelChImpl ch = ski.channel;
            //获取Channel所对应的fd,因为在linux下socket会被当作一个文件,也会有fd
            int fd = Integer.valueOf(ch.getFDVal());
            fdToKey.put(fd, ski);
            //调用pollWrapper的add方法,将channel的fd添加到监控列表中
            pollWrapper.add(fd);
            //保存到HashSet中,keys是SelectorImpl的成员变量
            keys.add(ski);
    }

    调用register方法并没有涉及到EpollArrayWrapper中的native方法epollCtl的调用,这是因为他们将这个方法的调用推迟到Select方法中去了.

    select()

    // 获取可用channel数量
    int readyChannels = selector.select();

    接下来我们来分析select()方法

    // SelectorImpl
    public int select(long timeout)
            throws IOException
    {
        . . . .
        return lockAndDoSelect((timeout == 0) ? -1 : timeout);
    }
    
    // SelectorImpl
    private int lockAndDoSelect(long timeout) throws IOException {
        . . . .
        return doSelect(timeout);
    }
    // EPollSelectorImpl
    protected int doSelect(long timeout) throws IOException {
        .....
        try {
            ....
            //调用了poll方法,底层调用了native的epollCtl和epollWait方法
            pollWrapper.poll(timeout);
        } finally {
            ....
        }
        ....
        //更新selectedKeys,为之后的selectedKeys函数做准备
        int numKeysUpdated = updateSelectedKeys();
        ....
        return numKeysUpdated;
    }
    // EPollArrayWrapper
    int poll(long timeout) throws IOException {
        // 这里面的实现就是调用epoll_ctl()方法注册先前在register方法中保存的Channel的fd和感兴趣的事件类型
        updateRegistrations();
        // 这里是调用epollWait方法等待感兴趣事件的生成,导致线程阻塞
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        . . . .
    }

    上面提到的epollCtl和epollWait方法在下一章我们会详细讲,这里先不讲。

    总之我们可以知道Selector其实就是封装了Linux提供的api而已,也就是epollCreateepollCtlepollWait方法。

    selectedKeys()

    // 获取可用channel的集合
    Set<SelectionKey> selectionKeys = selector.selectedKeys();

    接下来我们来看看selectedKeys()方法:

    // SelectorImpl
    //是通过Util.ungrowableSet生成的,不能添加,只能减少
    private Set<SelectionKey> publicSelectedKeys;
    public Set<SelectionKey> selectedKeys() {
        ....
        return publicSelectedKeys;
    }

    很奇怪啊,怎麽直接就返回publicSelectedKeys了,难道在select函数的执行过程中有修改过这个变量吗?publicSelectedKeys这个对象其实是selectedKeys变量的一份副本,你可以在SelectorImpl的构造函数中找到它们俩的关系,我们再回头看一下select中updateSelectedKeys方法:

    private int updateSelectedKeys() {
        //更新了的keys的个数,或在说是产生的事件的个数
        int entries = pollWrapper.updated; 
        int numKeysUpdated = 0;
        for (int i=0; i<entries; i++) {
            //对应的channel的fd
            int nextFD = pollWrapper.getDescriptor(i);
            //通过fd找到对应的SelectionKey
            SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
            if (ski != null) {
                int rOps = pollWrapper.getEventOps(i);
                //更新selectedKey变量,并通知响应的channel来做响应的处理
                if (selectedKeys.contains(ski)) {
                    if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                        numKeysUpdated++;
                    }
                } else {
                    ski.channel.translateAndSetReadyOps(rOps, ski);
                    if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                        // 这里加进去
                        selectedKeys.add(ski);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }

    不知道大家有没有留意到,如果我们不先调用select(),直接selectedKeys()是不会获得任何Channel的,因为里面没有更新publicSelectedKeys的方法

    还有一点是,publicSelectedKeys是selectedKeys的引用,所以我们获得的是它的引用,而不是每次返回一个新对象,这个引用里面的Channel我们处理完后要记得remove掉,不然下次还是会返回给你的。

    顺便一提这里publicSelectedKeys是采用 publicSelectedKeys = Util.ungrowableSet(selectedKeys); 的方式创建出来的,这个方法创建出来的set如方法名ungrowableSet,是不能调用add方法的,只能remove

    为什么Netty自己又从新实现了一边native相关的NIO底层方法? 听听Netty的创始人是怎麽说的吧链接

    因为Java的版本使用的epoll的LT模式,而Netty则希望使用ET模式(详情看第四篇的两种触发模式),而且Java版本没有将epoll的部分配置项暴露出来,比如说TCP_CORK和SO_REUSEPORT。

    下一篇:epoll的实现原理


    参考资料:

    https://segmentfault.com/a/1190000017798684?utm_source=tag-newest

  • 相关阅读:
    Yum安装MySQL
    Java最小化镜像制作
    Docker CE安装
    每月最后一周的周六晚上21:00执行任务-crontab
    每10秒执行定时任务-crontab
    可复制领导力-回顾收录
    逻辑数据库设计
    5e赋能核心文化
    python 学习自学
    德鲁克的“五项主要习惯”
  • 原文地址:https://www.cnblogs.com/fatmanhappycode/p/12355316.html
Copyright © 2011-2022 走看看