zoukankan      html  css  js  c++  java
  • Selector

    Selector

    之前说过  SingleThreadEventExecutor.this.run(); 方法,里面有段代码  strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());

    现在详细说明下此方法

    @Override
        public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
            return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
        }

    如果当前 NioEventLoop 线程存在异步任务,会通过 selectSupplier.get() 最终调用到 selectNow() 方法,selectNow() 是非阻塞,执行后立即返回0。跳出switch,直接执行后续代码。

    如果当前 NioEventLoop 线程不存在异步任务,会调用 SelectStrategy.SELECT 立即返回-1。进入case SelectStrategy.SELECT 代码块。

    刚开始,hasTasks 为true,因为 AbstractChannel#register 方法中会提交异步任务。

     SingleThreadEventExecutor#execute(Runnable task)

     所有一开始执行selectNow()。

    select(curDeadlineNanos)

    private int select(long deadlineNanos) throws IOException {
            if (deadlineNanos == NONE) {
                return selector.select();
            }
            // Timeout will only be 0 if deadline is within 5 microsecs
            long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
            return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
        }

    SelectorImpl#select()

    public int select() throws IOException {
            return this.select(0L);
        }
    public int select(long var1) throws IOException {
    if (var1 < 0L) {
    throw new IllegalArgumentException("Negative timeout");
    } else {
    return this.lockAndDoSelect(var1 == 0L ? -1L : var1);
    }
    }

    selectNow()

    public int selectNow() throws IOException {
            return this.lockAndDoSelect(0L);
        }

    可见,selectNow 也是调用 lockAndDoSelect 方法。区别在于 selectNow() 方法中 lockAndDoSelect 方法参数为0,select() 方法中 lockAndDoSelect 方法参数为-1。

    SelectorImpl#lockAndDoSelect(long var1)

    private int lockAndDoSelect(long var1) throws IOException {
            synchronized(this) {
                if (!this.isOpen()) {
                    throw new ClosedSelectorException();
                } else {
                    int var10000;
                    synchronized(this.publicKeys) {
                        synchronized(this.publicSelectedKeys) {
                            var10000 = this.doSelect(var1);
                        }
                    }
    
                    return var10000;
                }
            }
        }

    最终会调用sun.nio.ch.WindowsSelectorImpl#doSelect 方法

    protected int doSelect(long var1) throws IOException {
            if (this.channelArray == null) {
                throw new ClosedSelectorException();
            } else {
                this.timeout = var1;
                this.processDeregisterQueue();//如果key取消了,就要从队列中注销
                if (this.interruptTriggered) {
                    this.resetWakeupSocket();
                    return 0;
                } else {
                    this.adjustThreadsCount();
                    this.finishLock.reset();
                    this.startLock.startThreads();
    
                    try {
                        this.begin();
    
                        try {
                            this.subSelector.poll();
                        } catch (IOException var7) {
                            this.finishLock.setException(var7);
                        }
    
                        if (this.threads.size() > 0) {
                            this.finishLock.waitForHelperThreads();
                        }
                    } finally {
                        this.end();
                    }
    
                    this.finishLock.checkForException();
                    this.processDeregisterQueue();//再次注销队列
                    int var3 = this.updateSelectedKeys();//更新并处理key
                    this.resetWakeupSocket();
                    return var3;
                }
            }
        }

    processDeregisterQueue()

    把一些出现异常而取消的key删除:

    void processDeregisterQueue() throws IOException {
            Set var1 = this.cancelledKeys();//获取已经取消的keys
            synchronized(var1) {
                if (!var1.isEmpty()) {
                    Iterator var3 = var1.iterator();
    
                    while(var3.hasNext()) {
                        SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next();
    
                        try {
                            this.implDereg(var4);
                        } catch (SocketException var11) {
                            throw new IOException("Error deregistering key", var11);
                        } finally {
                            var3.remove();
                        }
                    }
                }
    
            }
        }

    implDereg

    执行选择中注销操作

    protected void implDereg(SelectionKeyImpl var1) throws IOException {
            int var2 = var1.getIndex();
    
            assert var2 >= 0;
    
            synchronized(this.closeLock) {
                if (var2 != this.totalChannels - 1) {
                    SelectionKeyImpl var4 = this.channelArray[this.totalChannels - 1];
                    this.channelArray[var2] = var4;
                    var4.setIndex(var2);
                    this.pollWrapper.replaceEntry(this.pollWrapper, this.totalChannels - 1, this.pollWrapper, var2);
                }
    
                var1.setIndex(-1);
            }
    
            this.channelArray[this.totalChannels - 1] = null;
            --this.totalChannels;
            if (this.totalChannels != 1 && this.totalChannels % 1024 == 1) {
                --this.totalChannels;
                --this.threadsCount;
            }
    
            this.fdMap.remove(var1);
            this.keys.remove(var1);//从选择器key集合里删除
            this.selectedKeys.remove(var1);//删除要操作的key
            this.deregister(var1);//从通道中删除key
            SelectableChannel var3 = var1.channel();
            if (!var3.isOpen() && !var3.isRegistered()) {
                ((SelChImpl)var3).kill();//关闭通道
            }
    
        }

    subSelector.poll()

    这底层就是native方法

    updateSelectedKeys()

    返回之后的处理工作,看看有没有事件

     private int updateSelectedKeys() {
            ++this.updateCount;
            byte var1 = 0;
            int var4 = var1 + this.subSelector.processSelectedKeys(this.updateCount);
    
            WindowsSelectorImpl.SelectThread var3;
            for(Iterator var2 = this.threads.iterator(); var2.hasNext(); var4 += var3.subSelector.processSelectedKeys(this.updateCount)) {
                var3 = (WindowsSelectorImpl.SelectThread)var2.next();
            }
    
            return var4;
        }

    processSelectedKeys(long var1)

    处理读写异常三个集合的事件

    private int processSelectedKeys(long var1) {
                byte var3 = 0;
                int var4 = var3 + this.processFDSet(var1, this.readFds, Net.POLLIN, false);
                var4 += this.processFDSet(var1, this.writeFds, Net.POLLCONN | Net.POLLOUT, false);
                var4 += this.processFDSet(var1, this.exceptFds, Net.POLLIN | Net.POLLCONN | Net.POLLOUT, true);
                return var4;
            }

    processFDSet

    主要的就是下面的处理,会把放进去的集合遍历一遍,取出相应的事件key,检查触发的事件是否是感兴趣的事件,是就会加入selectedKeys,这个也是我们后面要去遍历的集合。

    private int processFDSet(long var1, int[] var3, int var4, boolean var5) {
                int var6 = 0;
    
                for(int var7 = 1; var7 <= var3[0]; ++var7) {
                    int var8 = var3[var7];
                    if (var8 == WindowsSelectorImpl.this.wakeupSourceFd) {
                        synchronized(WindowsSelectorImpl.this.interruptLock) {
                            WindowsSelectorImpl.this.interruptTriggered = true;
                        }
                    } else {
                        WindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8);
                        if (var9 != null) {
                            SelectionKeyImpl var10 = var9.ski;//取出key
                            if (!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) {
                                if (WindowsSelectorImpl.this.selectedKeys.contains(var10)) {//判断事件集合是否已包含此事件
                                    if (var9.clearedCount != var1) {
                                        if (var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {
                                            var9.updateCount = var1;//更新数量
                                            ++var6;
                                        }
                                    } else if (var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {
                                        var9.updateCount = var1;
                                        ++var6;
                                    }
    
                                    var9.clearedCount = var1;
                                } else {
                                    if (var9.clearedCount != var1) {
                                        var10.channel.translateAndSetReadyOps(var4, var10);
                                        if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
                                            WindowsSelectorImpl.this.selectedKeys.add(var10);
                                            var9.updateCount = var1;
                                            ++var6;
                                        }
                                    } else {
                                        var10.channel.translateAndUpdateReadyOps(var4, var10);
                                        if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
                                            WindowsSelectorImpl.this.selectedKeys.add(var10);
                                            var9.updateCount = var1;
                                            ++var6;
                                        }
                                    }
    
                                    var9.clearedCount = var1;
                                }
                            }
                        }
                    }
                }
    
                return var6;
            }

    WindowsSelectorImpl.this.selectedKeys.add(var10)

    即SelectedSelectionKeySet#add 方法

    public boolean add(SelectionKey o) {
            if (o == null) {
                return false;
            }
    
            keys[size++] = o;
            if (size == keys.length) {
                increaseCapacity();
            }
    
            return true;
        }

    调用链路

    而这个SelectedSelectionKeySet在哪里调用呢?在 run 方法的 processSelectedKeys(); 中调用 processSelectedKeysOptimized(); 方法时,会遍历selectedKeys 集合。取出之前add的SelectionKeyImpl 并做响应的处理。

     至此完成。

    AbstractNioChannel#doRegister()

    注册监听连接事件,

     protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }

    java.nio.channels.spi.AbstractSelectableChannel#register

    public final SelectionKey register(Selector sel, int ops,
                                           Object att)
            throws ClosedChannelException
        {
            synchronized (regLock) {//注册锁
                if (!isOpen())//验证通道否打开
                    throw new ClosedChannelException();
                if ((ops & ~validOps()) != 0)//验证参数
                    throw new IllegalArgumentException();
                if (blocking)//判断是否非阻塞模式
                    throw new IllegalBlockingModeException();
                SelectionKey k = findKey(sel);//是否有注册过key
                if (k != null) {//有的话就设置下感兴趣事件和附件
                    k.interestOps(ops);
                    k.attach(att);
                }
                if (k == null) {
                    // New registration
                    synchronized (keyLock) {//key锁
                        if (!isOpen())
                            throw new ClosedChannelException();
                        k = ((AbstractSelector)sel).register(this, ops, att);//没有就注册下
                        addKey(k);//保存key,便于通道关闭的时候取消注册
                    }
                }
                return k;
            }
        }

    findKey(Selector sel)

    获得当前通道中的key的选择器与传入选择器相同的key

    private SelectionKey findKey(Selector sel) {
            synchronized (keyLock) {
                if (keys == null)
                    return null;
                for (int i = 0; i < keys.length; i++)
                    if ((keys[i] != null) && (keys[i].selector() == sel))//存在且选择器相同
                        return keys[i];
                return null;
            }
        }

    register(AbstractSelectableChannel ch, int ops, Object attachment)

    选择器注册keysun.nio.ch.SelectorImplregister

    protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
            if (!(var1 instanceof SelChImpl)) {
                throw new IllegalSelectorException();
            } else {
                SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);//创建一个实现类
                var4.attach(var3);
                synchronized(this.publicKeys) {
                    this.implRegister(var4);//注册进选择器队列
                }
    
                var4.interestOps(var2);
                return var4;
            }
        }

    implRegister(SelectionKeyImpl var1)

    protected void implRegister(SelectionKeyImpl var1) {
            synchronized(this.closeLock) {
                if (this.pollWrapper == null) {
                    throw new ClosedSelectorException();
                } else {
                    this.growIfNeeded();
                    this.channelArray[this.totalChannels] = var1;
                    var1.setIndex(this.totalChannels);
                    this.fdMap.put(var1);
                    this.keys.add(var1);//保存key
                    this.pollWrapper.addEntry(this.totalChannels, var1);
                    ++this.totalChannels;
                }
            }
        }

    addKey(SelectionKey k)

    private void addKey(SelectionKey k) {
            assert Thread.holdsLock(keyLock);
            int i = 0;
            if ((keys != null) && (keyCount < keys.length)) {
                // Find empty element of key array
                for (i = 0; i < keys.length; i++)
                    if (keys[i] == null)
                        break;
            } else if (keys == null) {
                keys =  new SelectionKey[3];
            } else {
                // Grow key array
                int n = keys.length * 2;
                SelectionKey[] ks =  new SelectionKey[n];
                for (i = 0; i < keys.length; i++)
                    ks[i] = keys[i];
                keys = ks;
                i = keyCount;
            }
            keys[i] = k;
            keyCount++;
        }

    keys即是前文提到的 AbstractSelectableChannel#register方法中的 findkey(sel)。这样就关联起来了,这边加,那边取,形成闭环。

  • 相关阅读:
    [dubbo实战] dubbo+zookeeper伪集群搭建
    Spring中使用Map、Set、List、数组、属性集合的注入方法配置文件
    Docker实践
    docke镜像上传到dockerhub仓库和阿里云docker仓库的方法
    Linux chmod命令详解
    使用nexus搭建maven私服
    Zookeeper注册节点的掉线自动重新注册及测试方法
    Dubbo负载均衡策略
    各种排序算法及其java程序实现
    java中的各种数据类型在内存中存储的方式
  • 原文地址:https://www.cnblogs.com/xiaojiesir/p/15509608.html
Copyright © 2011-2022 走看看