zoukankan      html  css  js  c++  java
  • Java NIO之选择器

    1.简介

    前面的文章说了缓冲区,说了通道,本文就来说说 NIO 中另一个重要的实现,即选择器 Selector。在更早的文章中,我简述了几种 IO 模型。如果大家看过之前的文章,并动手写过代码的话。再看 Java 的选择器大概就会知道它是什么了,以及怎么用了。选择器是 Java 多路复用模型的一个实现,可以同时监控多个非阻塞套接字通道。示意图大致如下:

    如果大家了解过多路复用模型,那应该也会知道几种复用模型的实现。比如 select,poll 以及 Linux 下的 epoll 和 BSD 下的 kqueue。Java 的选择器并非凭空创造,而是在底层操作系统提供的接口的基础上封装而来。相关的细节,我随后会进行分析。

    关于 Java 选择器的简介这里先说到这,接下来进入正题。

    2.基本操作及实现

    本章我将对 Selector 的创建,通道的注册,Selector 的选择过程进行分析。内容篇幅较大,希望大家耐心看完。由于 Selector 相关类在不同操作系统下的实现是不同的,加之个人对 Linux epoll 更为熟悉,所以本文所分析的源码也是和 epoll 相关的。好了,进入正题吧。

    2.1 创建选择器

    选择器 Selector 是一个抽象类,所以不能直接创建。Selector 提供了一个 open 方法,通过 open 方法既可以创建选择器实例。示例代码如下:

    Selector selector = Selector.open();
    

    上面的代码比较简单,只有一行。不过不要被表象迷惑,这行代码仅是完整实现的冰山一角,更复杂的逻辑则隐藏在水面之下。
    在简介一节,我已经说了 Java 选择器是对底层多路复用接口的一个包装,这里的 open 方法也不例外。假设我们的 Java 运行在 Linux 平台下,那么 open 最终所做的事情应该是调用操作系统的epoll_create函数,用于创建 epoll 实例。真实情况是不是如此呢?答案就在冰山深处,接下来就让我们一起去求索吧。下面我们将沿着 open 方法一路走下去,如下:

    public abstract class Selector implements Closeable {
        public static Selector open() throws IOException {
            // 创建 SelectorProvider,再通过其 openSelector 方法创建 Selector
            return SelectorProvider.provider().openSelector();
        }
        // 省略无关代码
    }
    
    public abstract class SelectorProvider {
        public static SelectorProvider provider() {
            synchronized (lock) {
                if (provider != null)
                    return provider;
                return AccessController.doPrivileged(
                    new PrivilegedAction<SelectorProvider>() {
                        public SelectorProvider run() {
                                if (loadProviderFromProperty())
                                    return provider;
                                if (loadProviderAsService())
                                    return provider;
                                // 创建默认的 SelectorProvider
                                provider = sun.nio.ch.DefaultSelectorProvider.create();
                                return provider;
                            }
                        });
            }
        }
    }
    
    public class DefaultSelectorProvider {
        private DefaultSelectorProvider() { }
        
        /**
         * 根据系统名称创建相应的 SelectorProvider
         */
        public static SelectorProvider create() {
            String osname = AccessController
                .doPrivileged(new GetPropertyAction("os.name"));
            if (osname.equals("SunOS"))
                return createProvider("sun.nio.ch.DevPollSelectorProvider");
            if (osname.equals("Linux"))
                return createProvider("sun.nio.ch.EPollSelectorProvider");
            
            // 
            return new sun.nio.ch.PollSelectorProvider();
        }
        
        /**
         * 加载 SelectorProvider 类,并创建实例
         */
        @SuppressWarnings("unchecked")
        private static SelectorProvider createProvider(String cn) {
            Class<SelectorProvider> c;
            try {
                c = (Class<SelectorProvider>)Class.forName(cn);
            } catch (ClassNotFoundException x) {
                throw new AssertionError(x);
            }
            try {
                return c.newInstance();
            } catch (IllegalAccessException | InstantiationException x) {
                throw new AssertionError(x);
            }
    
        }
    }
    
    /**
     * 创建完 SelectorProvider,接下来要调用 openSelector 方法
     * 创建 Selector 的继承类了。
     */
    public class EPollSelectorProvider extends SelectorProviderImpl {
        public AbstractSelector openSelector() throws IOException {
            return new EPollSelectorImpl(this);
        }
    }
    
    class EPollSelectorImpl extends SelectorImpl {
        EPollSelectorImpl(SelectorProvider sp) throws IOException {
            // 调用父类构造方法
            super(sp);
            long pipeFds = IOUtil.makePipe(false);
            fd0 = (int) (pipeFds >>> 32);
            fd1 = (int) pipeFds;
            
            // 创建 EPollArrayWrapper,EPollArrayWrapper 是一个重要的实现
            pollWrapper = new EPollArrayWrapper();
            
            pollWrapper.initInterrupt(fd0, fd1);
            fdToKey = new HashMap<>();
        }
    }
    
    public abstract class SelectorImpl extends AbstractSelector {
        protected SelectorImpl(SelectorProvider sp) {
            super(sp);
            keys = new HashSet<SelectionKey>();
            selectedKeys = new HashSet<SelectionKey>();
            
            /* 初始化 publicKeys 和 publicSelectedKeys,
             * publicKeys 即 selector.keys() 方法所返回的集合,
             * publicSelectedKeys 则是 selector.selectedKeys() 方法返回的集合
             */
            if (Util.atBugLevel("1.4")) {
                publicKeys = keys;
                publicSelectedKeys = selectedKeys;
            } else {
                publicKeys = Collections.unmodifiableSet(keys);
                publicSelectedKeys = Util.ungrowableSet(selectedKeys);
            }
        }
    }
    
    /**
     * EPollArrayWrapper 一个重要的实现,这一层再往下就是 C 代码了
     */
    class EPollArrayWrapper {
        EPollArrayWrapper() throws IOException {
            // 调用 epollCreate 方法创建 epoll 文件描述符
            epfd = epollCreate();
        
            // the epoll_event array passed to epoll_wait
            // 初始化 pollArray,该对象用于存储就绪文件描述符和事件
            int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
            pollArray = new AllocatedNativeObject(allocationSize, true);
            pollArrayAddress = pollArray.address();
        
            // eventHigh needed when using file descriptors > 64k
            if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
                eventsHigh = new HashMap<>();
        }
        
        // epollCreate 方法是 native 类型的
        private native int epollCreate();
    }
    

    以上代码时 Java 层面的,Java 层调用栈最下面的类是 EPollArrayWrapper(源码路径可以在附录中查找)。EPollArrayWrapper 是一个重要的实现,起着承上启下的作用。上层是 Java 代码,下层是 C 代码。上层的代码看完了,接下来看看冰山深处的 C 代码:

    JNIEXPORT jint JNICALL
    Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
    {
        // 调用 epoll_create 函数创建 epoll 实例,并返回文件描述符 epfd
        int epfd = epoll_create(256);
        if (epfd < 0) {
           JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
        }
        return epfd;
    }
    

    上面的代码很简单,仅做了创建 epoll 实例这一件事。看到这里,答案就明了了。最后在附一张时序图帮助大家理清代码调用顺序,如下:

    Selector.open

    2.2 选择键

    2.2.1 几种事件

    选择键 SelectionKey 包含4种事件,分别是:

    public static final int OP_READ = 1 << 0;
    public static final int OP_WRITE = 1 << 2;
    public static final int OP_CONNECT = 1 << 3;
    public static final int OP_ACCEPT = 1 << 4;
    

    事件之间可以通过或运算进行组合,比如:

    int interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
    

    2.2.2 两种事件集合:interestOps 和 readyOps

    interestOps 即感兴趣的事件集合,通道调用 register 方法注册时会设置此值,interestOps 可通过 SelectionKey interestOps() 方法获取。readyOps 是就绪事件集合,可通过 SelectionKey readyOps() 获取。

    interestOps 和 readyOps 被声明在 SelectionKey 子类 SelectionKeyImpl 中,代码如下:

    public class SelectionKeyImpl extends AbstractSelectionKey {
        private volatile int interestOps;
        private int readyOps;
    }
    

    接下来再来看看与 readyOps 事件集合相关的几个方法,如下:

    selectionKey.isAcceptable();
    selectionKey.isConnectable();
    selectionKey.isReadable();
    selectionKey.isWritable();
    

    以上方法从字面意思上就可以知道有什么用,这里就不解释了。接下来以 isReadable 方法为例,简单看一下这个方法是如何实现。

    public final boolean isReadable() {
        return (readyOps() & OP_READ) != 0;
    }
    

    上面说到可以通过或运算组合事件,这里则是通过与运算来测试某个事件是否在事件集合中。比如

    readyOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE = 0101,
    readyOps & OP_READ = 0101 & 0001 = 0001,
    readyOps & OP_CONNECT = 0101 & 1000 = 0
    

    readyOps & OP_READ != 0,所以 OP_READ 在事件集合中。readyOps & OP_CONNECT == 0,所以 OP_CONNECT 不在事件集合中。

    2.2.3 attach 方法

    attach 是一个好用的方法,通过这个方法,可以将对象暂存在 SelectionKey 中,待需要的时候直接取出来即可。比如本文对应的练习代码实现了一个简单的 HTTP 服务器,在读取用户请求数据后(即 selectionKey.isReadable() 为 true),会去解析请求头,然后将请求头信息通过 attach 方法放入 selectionKey 中。待通道可写后,再从 selectionKey 中取出请求头,并根据请求头回复客户端不同的消息。当然,这只是一个应用场景,attach 可能还有其他的应用场景,比如标识通道。不过其他的场景我没使用过,就不说了。attach 使用方式如下:

    selectionKey.attach(obj);
    Object attachedObj = selectionKey.attachment();
    

    2.3 通道注册

    通道注册即将感兴趣的事件告知 Selector,待事件发生时,Selector 即可返回就绪事件,我们就可以去做后续的事情了。比如 ServerSocketChannel 通道通常对 OP_ACCEPT 事件感兴趣,那么我们就可以把这个事件注册给 Selector。待事件发生,即服务端接受客户端连接后,我们即可获取这个就绪的事件并做相应的操作。通道注册的示例代码如下:

    channel.configureBlocking(false);
    SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
    

    起初我以为通道注册操作会调用操作系统的 epoll_ctl 函数,但最终通过看源码,发现自己的理解是错的。既然通道注册阶段不调用 epoll_ctl 函数。那么,epoll_ctl 什么时候才会被调用呢?如果不调用 epoll_ctl,那么注册过程都干了什么事情呢?关于第一个问题,本节还无法解答,不过第二个问题则可以说说。接下来让我们深入通道类 register 方法的调用栈中去探寻答案吧。

    public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {
        public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {
            return register(sel, ops, null);
        }
        
        public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
    }
    
    public abstract class AbstractSelectableChannel extends SelectableChannel {
        
        private SelectionKey[] keys = null;
        
        public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
            synchronized (regLock) {
                // 省去一些校验代码
                
                // 从 keys 数组中查找,查找条件为 k.selector() == sel
                SelectionKey k = findKey(sel);
                
                // 如果 k 不为空,则修改 k 所感兴趣的事件
                if (k != null) {
                    k.interestOps(ops);
                    k.attach(att);
                }
                
                // k 为空,则创建一个 SelectionKey,并存储到 keys 数组中
                if (k == null) {
                    // New registration
                    synchronized (keyLock) {
                        if (!isOpen())
                            throw new ClosedChannelException();
                        k = ((AbstractSelector)sel).register(this, ops, att);
                        addKey(k);
                    }
                }
                return k;
            }
        }
    }
    
    public abstract class AbstractSelector extends Selector {
        protected abstract SelectionKey register(AbstractSelectableChannel ch,
                                             int ops, Object att);
    }
    
    public abstract class SelectorImpl extends AbstractSelector {
        protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
            if (!(ch instanceof SelChImpl))
                throw new IllegalSelectorException();
            // 创建 SelectionKeyImpl 实例
            SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
            k.attach(attachment);
            synchronized (publicKeys) {
                implRegister(k);
            }
            k.interestOps(ops);
            return k;
        }
    }
    
    class EPollSelectorImpl extends SelectorImpl {
        protected void implRegister(SelectionKeyImpl ski) {
            if (closed)
                throw new ClosedSelectorException();
            SelChImpl ch = ski.channel;
            int fd = Integer.valueOf(ch.getFDVal());
            // 存储 fd 和 SelectionKeyImpl 的映射关系
            fdToKey.put(fd, ski);
            
            pollWrapper.add(fd);
            // 将 SelectionKeyImpl 实例存储到 keys 中(这里的 keys 声明在 SelectorImpl 类中),keys 集合可由 selector.keys() 方法获取
            keys.add(ski);
        }
    }
    
    public class SelectionKeyImpl extends AbstractSelectionKey {
        public SelectionKey interestOps(int ops) {
            ensureValid();
            return nioInterestOps(ops);
        }
        
        public SelectionKey nioInterestOps(int ops) {
            if ((ops & ~channel().validOps()) != 0)
                throw new IllegalArgumentException();
            // 转换并设置感兴趣的事件
            channel.translateAndSetInterestOps(ops, this);
            // 设置 interestOps 变量
            interestOps = ops;
            return this;
        }
    }
    
    class SocketChannelImpl extends SocketChannel implements SelChImpl {
        public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
            int newOps = 0;
            // 转换事件
            if ((ops & SelectionKey.OP_READ) != 0)
                newOps |= PollArrayWrapper.POLLIN;
            if ((ops & SelectionKey.OP_WRITE) != 0)
                newOps |= PollArrayWrapper.POLLOUT;
            if ((ops & SelectionKey.OP_CONNECT) != 0)
                newOps |= PollArrayWrapper.POLLCONN;
            // 设置事件
            sk.selector.putEventOps(sk, newOps);
        }
    }
    
    class class EPollSelectorImpl extends SelectorImpl {
        public void putEventOps(SelectionKeyImpl ski, int ops) {
            if (closed)
                throw new ClosedSelectorException();
            SelChImpl ch = ski.channel;
            // 设置感兴趣的事件
            pollWrapper.setInterest(ch.getFDVal(), ops);
        }
    }
    
    class EPollArrayWrapper {
        void setInterest(int fd, int mask) {
            synchronized (updateLock) {
                // 扩容 updateDescriptors 数组,并存储文件描述符 fd
                int oldCapacity = updateDescriptors.length;
                if (updateCount == oldCapacity) {
                    int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
                    int[] newDescriptors = new int[newCapacity];
                    System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
                    updateDescriptors = newDescriptors;
                }
                updateDescriptors[updateCount++] = fd;
        
                // events are stored as bytes for efficiency reasons
                byte b = (byte)mask;
                assert (b == mask) && (b != KILLED);
                // 存储事件
                setUpdateEvents(fd, b, false);
            }
        }
        
        private void setUpdateEvents(int fd, byte events, boolean force) {
            if (fd < MAX_UPDATE_ARRAY_SIZE) {
                if ((eventsLow[fd] != KILLED) || force) {
                    eventsLow[fd] = events;
                }
            } else {
                Integer key = Integer.valueOf(fd);
                if (!isEventsHighKilled(key) || force) {
                    eventsHigh.put(key, Byte.valueOf(events));
                }
            }
        }
    }
    

    到 setUpdateEvents 这个方法,整个调用栈就结束了。但是我们并未在调用栈中看到调用 epoll_ctl 函数的地方,也就是说,通道注册时,并不会立即调用 epoll_ctl,而是先将事件集合 events 存放在 eventsLow。至于 epoll_ctl 函数何时调用的,需要大家继续往下看了。

    2.4 选择过程

    2.4.1 选择方法

    Selector 包含3种不同功能的选择方法,分别如下:

    • int select()
    • int select(long timeout)
    • int selectNow()

    select() 是一个阻塞方法,仅在至少一个通道处于就绪状态时才返回。
    select(long timeout) 同样也是阻塞方法,不过可对该方法设置超时时间(timeout > 0),使得线程不会被一直阻塞。如果 timeout = 0,会一直阻塞线程。
    selectNow() 为非阻塞方法,调用后立即返回。

    以上3个方法均返回 int 类型值,表示每次调用 select 或 selectNow 方法后,新就绪通道的数量。如果某个通道在上一次调用 select 方法时就已经处于就绪状态,但并未将该通道对应的 SelectionKey 对象从 selectedKeys 集合中移除。假设另一个的通道在本次调用 select 期间处于就绪状态,此时,select 返回1,而不是2。

    2.4.2 选择过程

    选择方法用起来虽然简单,但方法之下隐藏的逻辑还是比较复杂的。大致分为下面几个步骤:

    1. 检查已取消键集合 cancelledKeys 是否为空,不为空则将 cancelledKeys 的键从 keys 和 selectedKeys 中移除,并将键和通道注销。
    2. 调用操作系统的 epoll_ctl 函数将通道感兴趣的事件注册到 epoll 实例中
    3. 调用操作系统的 epoll_wait 函数监听事件
    4. 再次执行步骤1
    5. 更新 selectedKeys 集合,并返回就绪通道数量

    上面五个步骤对应于 EPollSelectorImpl 类中 doSelect 方法的逻辑,如下:

    protected int doSelect(long timeout) throws IOException {
        if (closed)
            throw new ClosedSelectorException();
        // 处理已取消键集合,对应步骤1
        processDeregisterQueue();
        try {
            begin();
            // select 方法的核心,对应步骤2和3
            pollWrapper.poll(timeout);
        } finally {
            end();
        }
        // 处理已取消键集合,对应步骤4
        processDeregisterQueue();
        
        // 更新 selectedKeys 集合,并返回就绪通道数量,对应步骤5
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }
    

    接下来,我们按照上面的步骤顺序去分析代码实现。先来看看步骤1对应的代码:

    +----SelectorImpl.java
    void processDeregisterQueue() throws IOException {
        // Precondition: Synchronized on this, keys, and selectedKeys
        Set<SelectionKey> cks = cancelledKeys();
        synchronized (cks) {
            if (!cks.isEmpty()) {
                Iterator<SelectionKey> i = cks.iterator();
                // 遍历 cancelledKeys,执行注销操作
                while (i.hasNext()) {
                    SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                    try {
                        // 执行注销逻辑
                        implDereg(ski);
                    } catch (SocketException se) {
                        throw new IOException("Error deregistering key", se);
                    } finally {
                        i.remove();
                    }
                }
            }
        }
    }
    
    +----EPollSelectorImpl.java
    protected void implDereg(SelectionKeyImpl ski) throws IOException {
        assert (ski.getIndex() >= 0);
        SelChImpl ch = ski.channel;
        int fd = ch.getFDVal();
        // 移除 fd 和选择键键的映射关系
        fdToKey.remove(Integer.valueOf(fd));
        // 从 epoll 实例中删除事件
        pollWrapper.remove(fd);
        ski.setIndex(-1);
        
        // 从 keys 和 selectedKeys 中移除选择键
        keys.remove(ski);
        selectedKeys.remove(ski);
        
        // 注销选择键
        deregister((AbstractSelectionKey)ski);
        
        // 注销通道
        SelectableChannel selch = ski.channel();
        if (!selch.isOpen() && !selch.isRegistered())
            ((SelChImpl)selch).kill();
    }
    

    上面的代码代码逻辑不是很复杂,首先是获取 cancelledKeys 集合,然后遍历集合,并对每个选择键及其对应的通道执行注销操作。接下来再来看看步骤2和3对应的代码,如下:

    +----EPollArrayWrapper.java
    int poll(long timeout) throws IOException {
        // 调用 epoll_ctl 函数注册事件,对应步骤3
        updateRegistrations();
        
        // 调用 epoll_wait 函数等待事件发生,对应步骤4
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        for (int i=0; i<updated; i++) {
            if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }
    
    /**
     * Update the pending registrations.
     */
    private void updateRegistrations() {
        synchronized (updateLock) {
            int j = 0;
            while (j < updateCount) {
                // 获取 fd 和 events,这两个值在调用 register 方法时被存储到数组中
                int fd = updateDescriptors[j];
                short events = getUpdateEvents(fd);
                boolean isRegistered = registered.get(fd);
                int opcode = 0;
    
                if (events != KILLED) {
                    // 确定 opcode 的值
                    if (isRegistered) {
                        opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                    } else {
                        opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                    }
                    if (opcode != 0) {
                        // 注册事件
                        epollCtl(epfd, opcode, fd, events);
                        // 设置 fd 的注册状态
                        if (opcode == EPOLL_CTL_ADD) {
                            registered.set(fd);
                        } else if (opcode == EPOLL_CTL_DEL) {
                            registered.clear(fd);
                        }
                    }
                }
                j++;
            }
            updateCount = 0;
        }
        
        // 下面两个均是 native 方法
        private native void epollCtl(int epfd, int opcode, int fd, int events);
        private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
    }
    

    看到 updateRegistrations 方法的实现,大家现在知道 epoll_ctl 这个函数是在哪里调用的了。在 3.2 节通道注册的结尾给大家埋了一个疑问,这里就是答案了。注册通道实际上只是先将事件收集起来,等调用 select 方法时,在一起通过 epoll_ctl 函数将事件注册到 epoll 实例中。

    上面 epollCtl 和 epollWait 方法是 native 类型的,接下来我们再来看看这两个方法是如何实现的。如下:

    +----EPollArrayWrapper.c
    JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) {
        struct epoll_event event;
        int res;
    
        event.events = events;
        event.data.fd = fd;
    
        // 调用 epoll_ctl 注册事件
        RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
    
        if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
            JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
        }
    }
    
    JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) {
        struct epoll_event *events = jlong_to_ptr(address);
        int res;
    
        if (timeout <= 0) {           /* Indefinite or no wait */
            // 调用 epoll_wait 等待事件
            RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
        } else {                      /* Bounded wait; bounded restarts */
            res = iepoll(epfd, events, numfds, timeout);
        }
    
        if (res < 0) {
            JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
        }
        return res;
    }
    

    上面的C代码没什么复杂的逻辑,这里就不多说了。如果大家对 epoll_ctl 和 epoll_wait 函数不了解,可以参考 Linux man-page。关于 epoll 的示例,也可以参考我的另一篇文章“基于epoll实现简单的web服务器”

    说完步骤2和3对应的代码,接下来再来说说步骤4和5。由于步骤4和步骤1是一样的,这里不再赘述。最后再来说说步骤5的逻辑。代码如下:

    +----EPollSelectorImpl.java
    private int updateSelectedKeys() {
        int entries = pollWrapper.updated;
        int numKeysUpdated = 0;
        for (int i=0; i<entries; i++) {
            /* 从 pollWrapper 成员变量的 pollArray 中获取文件描述符,
             * pollArray 中的数据由 epoll_wait 设置
             */
            int nextFD = pollWrapper.getDescriptor(i);
            SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
            // ski is null in the case of an interrupt
            if (ski != null) {
                // 从 pollArray 中获取就绪事件集合
                int rOps = pollWrapper.getEventOps(i);
                
                /* 如果 selectedKeys 已包含选择键,则选择键必须由新的事件发生时,
                 * 才会将 numKeysUpdated + 1
                 */ 
                if (selectedKeys.contains(ski)) {
                    if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                        numKeysUpdated++;
                    }
                } else {
                    // 转换并设置就绪事件集合
                    ski.channel.translateAndSetReadyOps(rOps, ski);
                    if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                        // 更新 selectedKeys 集合,并将 numKeysUpdated + 1
                        selectedKeys.add(ski);
                        numKeysUpdated++;
                    }
                }
            }
        }
        
        // 返回 numKeysUpdated
        return numKeysUpdated;
    }
    
    +----SocketChannelImpl.java
    public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
        int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
        int oldOps = sk.nioReadyOps();
        int newOps = initialOps;
    
        if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
            return false;
        }
    
        if ((ops & (PollArrayWrapper.POLLERR
                    | PollArrayWrapper.POLLHUP)) != 0) {
            newOps = intOps;
            sk.nioReadyOps(newOps);
            // No need to poll again in checkConnect,
            // the error will be detected there
            readyToConnect = true;
            return (newOps & ~oldOps) != 0;
        }
    
        /* 
         * 转换事件
         */
        if (((ops & PollArrayWrapper.POLLIN) != 0) &&
            ((intOps & SelectionKey.OP_READ) != 0) &&
            (state == ST_CONNECTED))
            newOps |= SelectionKey.OP_READ;
    
        if (((ops & PollArrayWrapper.POLLCONN) != 0) &&
            ((intOps & SelectionKey.OP_CONNECT) != 0) &&
            ((state == ST_UNCONNECTED) || (state == ST_PENDING))) {
            newOps |= SelectionKey.OP_CONNECT;
            readyToConnect = true;
        }
    
        if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
            ((intOps & SelectionKey.OP_WRITE) != 0) &&
            (state == ST_CONNECTED))
            newOps |= SelectionKey.OP_WRITE;
    
        // 设置事件
        sk.nioReadyOps(newOps);
        
        // 如果新的就绪事件和老的就绪事件不相同,则返回true,否则返回 false
        return (newOps & ~oldOps) != 0;
    }
    

    上面就是步骤5的逻辑了,简单总结一下。首先是获取就绪通道数量,然后再获取这些就绪通道对应的文件描述符 fd,以及就绪事件集合 rOps。之后调用 translateAndSetReadyOps 转换并设置就绪事件集合。最后,将选择键添加到 selectedKeys 集合中,并累加 numKeysUpdated 值,之后返回该值。

    以上就是选择过程的代码讲解,贴了不少代码,可能不太好理解。Java NIO 和操作系统接口关联比较大,所以在学习 NIO 相关原理时,也应该去了解诸如 epoll 等系统调用的知识。没有这些背景知识,很多东西看起来不太好懂。好了,本节到此结束。

    2.5 模板代码

    使用 NIO 选择器编程时,主干代码的结构一般比较固定。所以把主干代码写好后,就可以往里填业务代码了。下面贴一个服务端的模板代码,如下:

    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.socket().bind(new InetSocketAddress("localhost", 8080));
    ssc.configureBlocking(false);
    
    Selector selector = Selector.open();
    ssc.register(selector, SelectionKey.OP_ACCEPT);
    
    while(true) {
        int readyNum = selector.select();
        if (readyNum == 0) {
            continue;
        }
    
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> it = selectedKeys.iterator();
        
        while(it.hasNext()) {
            SelectionKey key = it.next();
            
            if(key.isAcceptable()) {
                // 接受连接
            } else if (key.isReadable()) {
                // 通道可读
            } else if (key.isWritable()) {
                // 通道可写
            }
            
            it.remove();
        }
    }
    

    2.6 实例演示

    原本打算将示例演示的代码放在本节中展示,奈何文章篇幅已经很大了,所以决定把本节的内容独立成文。在下一篇文章中,我将会演示使用 Java NIO 完成一个简单的 HTTP 服务器。这里先贴张效果图,如下:

    tinyhttpd_w

    3.总结

    到这里,本文差不多就要结束了。原本只是打算简单说说 Selector 的用法,然后再写一份实例代码。但是后来发现这样写显得比较空洞,没什么深度。所以后来翻了一下 Selector 的源码,大致理解了 Selector 的逻辑,然后就有了上面的分析。不过 Selector 的逻辑并不止我上面所说的那些,还有一些内容我现在还没看,所以就没有讲。对于已写出来的分析,由于我个人水平有限,难免会有错误。如果有错误,也欢迎大家指出来,共同进步!

    好了,本文到此结束,感谢大家的阅读。

    参考

    附录

    文中贴的一些代码是没有包含在 JDK src.zip 包里的,这里单独列举出来,方便大家查找。

    文件名 路径
    DefaultSelectorProvider.java jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java
    EPollSelectorProvider.java jdk/src/solaris/classes/sun/nio/ch/EPollSelectorProvider.java
    SelectorImpl.java jdk/src/share/classes/sun/nio/ch/SelectorImpl.java
    EPollSelectorImpl.java jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java
    EPollArrayWrapper.java jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java
    SelectionKeyImpl.java jdk/src/share/classes/sun/nio/ch/SelectionKeyImpl.java
    SocketChannelImpl.java jdk/src/share/classes/sun/nio/ch/SocketChannelImpl.java
    EPollArrayWrapper.c jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c

    本文在知识共享许可协议 4.0 下发布,转载需在明显位置处注明出处
    作者:coolblog
    本文同步发布在我的个人博客:http://www.coolblog.xyz/?r=cb

    cc
    本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。

  • 相关阅读:
    Linux服务器使用tar加密压缩文件
    ssh-copy-id使用非默认22端口
    Nginx日志分割脚本
    MySQL的yum源
    vSphere Client开启虚拟机提示:出现了常规系统错误: 由于目标计算机积极拒绝,无法连接。
    ESXi主机遗忘密码重置密码
    扩容swap交换分区空间
    ESXi上的固态硬盘识别为非SSD
    VMware Vcenter Server 6.0忘记密码
    Centos6与Centos7区别
  • 原文地址:https://www.cnblogs.com/nullllun/p/8734681.html
Copyright © 2011-2022 走看看