zoukankan      html  css  js  c++  java
  • 深入NIO Socket实现机制(转)

    http://www.jianshu.com/p/0d497fe5484a#

    前言

    Java NIO 由以下几个核心部分组成:

    • Buffer
    • Channel
    • Selector

    以前基于net包进行socket编程时,accept方法会一直阻塞,直到有客户端请求的到来,并返回socket进行相应的处理。整个过程是流水线的,处理完一个请求,才能去获取并处理后面的请求;当然我们可以把获取socket和处理socket的过程分开,一个线程负责accept,线程池负责处理请求。

    NIO为我们提供了更好的解决方案,采用选择器(Selector)找出已经准备好读写的socket,并按顺序处理,基于通道(Channel)和缓冲区(Buffer)来传输和保存数据。

    Buffer和Channel已经介绍过深入浅出NIO Channel和Buffer,本文主要介绍NIO的Selector和Socket的实践以及实现原理。

    Selector是什么?

    在养鸡场,有这一个人,每天的工作就是不停检查几个特殊的鸡笼,如果有鸡进来,有鸡出去,有鸡生蛋,有鸡生病等等,就把相应的情况记录下来。这样,如果负责人想知道鸡场情况,只需要到那个人查询即可,当然前提是,负责得让那个人知道需要记录哪些情况。

    Selector的作用相当这个人的工作,每个鸡笼相当于一个SocketChannel,单个线程通过Selector可以管理多个SocketChannel。

    为了实现Selector管理多个SocketChannel,必须将多个具体的SocketChannel对象注册到Selector对象,并声明需要监听的事件,目前有4种类型的事件:

    connect:客户端连接服务端事件,对应值为SelectionKey.OP_CONNECT(8)
    accept:服务端接收客户端连接事件,对应值为SelectionKey.OP_ACCEPT(16)
    read:读事件,对应值为SelectionKey.OP_READ(1)
    write:写事件,对应值为SelectionKey.OP_WRITE(4)

    当SocketChannel有对应的事件发生时,Selector能够觉察到并进行相应的处理。

    为了更好地理解NIO Socket,先来看一段服务端的示例代码

    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);
    serverChannel.socket().bind(new InetSocketAddress(port));
    Selector selector = Selector.open();
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    while(true){
        int n = selector.select();
        if (n == 0) continue;
        Iterator ite = this.selector.selectedKeys().iterator();
        while(ite.hasNext()){
            SelectionKey key = (SelectionKey)ite.next();
            if (key.isAcceptable()){
                SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
                clntChan.configureBlocking(false);
                //将选择器注册到连接到的客户端信道,
                //并指定该信道key值的属性为OP_READ,
                //同时为该信道指定关联的附件
                clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
            }
            if (key.isReadable()){
                handleRead(key);
            }
            if (key.isWritable() && key.isValid()){
                handleWrite(key);
            }
            if (key.isConnectable()){
                System.out.println("isConnectable = true");
            }
          ite.remove();
        }
    }

    服务端连接过程
    1、创建ServerSocketChannel实例serverSocketChannel,并bind到指定端口。
    2、创建Selector实例selector;
    3、将serverSocketChannel注册到selector,并指定事件OP_ACCEPT。
    4、while循环执行:
    4.1、调用select方法,该方法会阻塞等待,直到有一个或多个通道准备好了I/O操作或等待超时。
    4.2、获取选取的键列表;
    4.3、循环键集中的每个键:
    4.3.a、获取通道,并从键中获取附件(如果添加了附件);
    4.3.b、确定准备就绪的操纵并执行,如果是accept操作,将接收的信道设置为非阻塞模式,并注册到选择器;
    4.3.c、如果需要,修改键的兴趣操作集;
    4.3.d、从已选键集中移除键

    在步骤3中,selector只注册了serverSocketChannel的OP_ACCEPT事件

    • 如果有客户端A连接服务,执行select方法时,可以通过serverSocketChannel获取客户端A的socketChannel,并在selector上注册socketChannel的OP_READ事件。
    • 如果客户端A发送数据,会触发read事件,这样下次轮询调用select方法时,就能通过socketChannel读取数据,同时在selector上注册该socketChannel的OP_WRITE事件,实现服务器往客户端写数据。

    NIO Socket实现原理

    SocketChannel、ServerSocketChannel和Selector的实例初始化都通过SelectorProvider类实现,其中Selector是整个NIO Socket的核心实现。

    public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }

    SelectorProvider在windows和linux下有不同的实现,provider方法会返回对应的实现。

    Selector分析

    Selector是如何做到同时管理多个socket?

    Selector初始化时,会实例化PollWrapper、SelectionKeyImpl数组和Pipe。

    WindowsSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
        pollWrapper = new PollArrayWrapper(INIT_CAP);
        wakeupPipe = Pipe.open();
        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
    
        // Disable the Nagle algorithm so that the wakeup is more immediate
        SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
        (sink.sc).socket().setTcpNoDelay(true);
        wakeupSinkFd = ((SelChImpl)sink).getFDVal();
        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
    }

    pollWrapper用Unsafe类申请一块物理内存,存放注册时的socket句柄fdVal和event的数据结构pollfd,其中pollfd共8位,0~3位保存socket句柄,4~7位保存event。(答案)

    pollWrapper提供了fdVal和event数据的相应操作,如添加操作通过Unsafe的putInt和putShort实现。

    void putDescriptor(int i, int fd) {
        pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
    }
    void putEventOps(int i, int event) {
        pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
    }

    SelectionKeyImpl保存注册时的channel、selector、event以及保存在pollWrapper的偏移位置index。

    先看看serverChannel.register(selector, SelectionKey.OP_ACCEPT)是如何实现的:

    public final SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException {
        synchronized (regLock) {
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.interestOps(ops);
                k.attach(att);
            }
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }
    }
    1. 如果该channel和selector已经注册过,则直接添加事件和附件。
    2. 否则通过selector实现注册过程。
    protected final SelectionKey register(AbstractSelectableChannel ch,
          int ops,  Object attachment) {
        if (!(ch instanceof SelChImpl))
            throw new IllegalSelectorException();
        SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
        k.attach(attachment);
        synchronized (publicKeys) {
            implRegister(k);
        }
        k.interestOps(ops);
        return k;
    }
    
    protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (pollWrapper == null)
                throw new ClosedSelectorException();
            growIfNeeded();
            channelArray[totalChannels] = ski;
            ski.setIndex(totalChannels);
            fdMap.put(ski);
            keys.add(ski);
            pollWrapper.addEntry(totalChannels, ski);
            totalChannels++;
        }
    }

    传统的事件监听几乎都是通过保存监听关系的方式实现的。

    pollWrapper.addEntry(totalChannels, ski); 

    1. 以当前channel和selector为参数,初始化 SelectionKeyImpl 对象selectionKeyImpl ,并添加附件attachment。
    2. 如果当前channel的数量totalChannels等于SelectionKeyImpl数组大小,对SelectionKeyImpl数组和pollWrapper进行扩容操作。
    3. 如果totalChannels % MAX_SELECTABLE_FDS == 0,则多开一个线程处理selector。
    4. pollWrapper.addEntry将把selectionKeyImpl中的socket句柄添加到对应的pollfd。
    5. k.interestOps(ops)方法最终也会把event添加到对应的pollfd。

    所以,不管serverSocketChannel,还是socketChannel,在selector注册事件后,最终都保存在pollArray中。

     (那么是通过遍历的方式来传递事件的吗?接着,再来看看selector中的select是如何实现一次获取多个有事件发生的channel的。
    底层由selector实现类的doSelect方法实现,如下:

    protected int doSelect(long timeout) throws IOException {
            if (channelArray == null)
                throw new ClosedSelectorException();
            this.timeout = timeout; // set selector timeout
            processDeregisterQueue();
            if (interruptTriggered) {
                resetWakeupSocket();
                return 0;
            }
            // Calculate number of helper threads needed for poll. If necessary
            // threads are created here and start waiting on startLock
            adjustThreadsCount();
            finishLock.reset(); // reset finishLock
            // Wakeup helper threads, waiting on startLock, so they start polling.
            // Redundant threads will exit here after wakeup.
            startLock.startThreads();
            // do polling in the main thread. Main thread is responsible for
            // first MAX_SELECTABLE_FDS entries in pollArray.
            try {
                begin();
                try {
                    subSelector.poll();
                } catch (IOException e) {
                    finishLock.setException(e); // Save this exception
                }
                // Main thread is out of poll(). Wakeup others and wait for them
                if (threads.size() > 0)
                    finishLock.waitForHelperThreads();
              } finally {
                  end();
              }
            // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
            finishLock.checkForException();
            processDeregisterQueue();
            int updated = updateSelectedKeys();
            // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
            resetWakeupSocket();
            return updated;
        }

    其中 subSelector.poll() 是select的核心,由native函数poll0实现(可参考c代码),readFds、writeFds 和exceptFds数组用来保存底层select的结果,数组的第一个位置都是存放发生事件的socket的总数,其余位置存放发生事件的socket句柄fd。

    private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
    private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
    private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
    private int poll() throws IOException{ // poll for the main thread
         return poll0(pollWrapper.pollArrayAddress,
              Math.min(totalChannels, MAX_SELECTABLE_FDS),
                 readFds, writeFds, exceptFds, timeout);
    }

    执行 selector.select() ,poll0函数把指向socket句柄和事件的内存地址传给底层函数。

    1. 如果之前没有发生事件,程序就阻塞在select处,当然不会一直阻塞,因为epoll在timeout时间内如果没有事件,也会返回
    2. 一旦有对应的事件发生,poll0方法就会返回
    3. processDeregisterQueue方法会清理那些已经cancelled的SelectionKey
    4. updateSelectedKeys方法统计有事件发生的SelectionKey数量,并把符合条件发生事件的SelectionKey添加到selectedKeys哈希表中,提供给后续使用。

    在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型实现,是基于IO复用技术的非阻塞IO,不是异步IO。在JDK1.5 update10和linux core2.6以上版本,sun优化了Selctor的实现,底层使用epoll替换了select/poll。

    epoll原理

    epoll是Linux下的一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄。(对于nio的底层学习最终还是学习epoll原理)

    http://blog.csdn.net/coder_yi_liu/article/details/8938927

    先看看使用c封装的3个epoll系统调用:

    • int epoll_create(int size)
      epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。
    • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
      epoll_ctl可以操作epoll_create创建的epoll,如将socket句柄加入到epoll中让其监控,或把epoll正在监控的某个socket句柄移出epoll。
    • int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)
      epoll_wait在调用时,在给定的timeout时间内,所监控的句柄中有事件发生时,就返回用户态的进程。

    大概看看epoll内部是怎么实现的:

    1. epoll初始化时,会向内核注册一个文件系统,用于存储被监控的句柄文件,调用epoll_create时,会在这个文件系统中创建一个file节点。同时epoll会开辟自己的内核高速缓存区,以红黑树的结构保存句柄,以支持快速的查找、插入、删除。还会再建立一个list链表,用于存储准备就绪的事件。
    2. 当执行epoll_ctl时,除了把socket句柄放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后,就把socket插入到就绪链表里。
    3. 当epoll_wait调用时,仅仅观察就绪链表里有没有数据,如果有数据就返回,否则就sleep,超时时立刻返回。

    epoll的两种工作模式:

    • LT:level-trigger,水平触发模式,只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket。
    • ET:edge-trigger,边缘触发模式,只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket。

    socket读数据

    socket写数据

    read实现

    通过遍历selector中的SelectionKeyImpl数组,获取发生事件的socketChannel对象,其中保存了对应的socket句柄,实现如下。

    public int read(ByteBuffer buf) throws IOException {
        if (buf == null)
            throw new NullPointerException();
        synchronized (readLock) {
            if (!ensureReadOpen())
                return -1;
            int n = 0;
            try {
                begin();
                synchronized (stateLock) {
                    if (!isOpen()) {         
                        return 0;
                    }
                    readerThread = NativeThread.current();
                }
                for (;;) {
                    n = IOUtil.read(fd, buf, -1, nd);
                    if ((n == IOStatus.INTERRUPTED) && isOpen()) {
                        // The system call was interrupted but the channel
                        // is still open, so retry
                        continue;
                    }
                    return IOStatus.normalize(n);
                }
            } finally {
                readerCleanup();        // Clear reader thread
                // The end method, which 
                end(n > 0 || (n == IOStatus.UNAVAILABLE));
    
                // Extra case for socket channels: Asynchronous shutdown
                //
                synchronized (stateLock) {
                    if ((n <= 0) && (!isInputOpen))
                        return IOStatus.EOF;
                }
                assert IOStatus.check(n);
            }
        }
    }

    通过Buffer的方式读取socket的数据。

    wakeup实现

    public Selector wakeup() {
        synchronized (interruptLock) {
            if (!interruptTriggered) {
                setWakeupSocket();
                interruptTriggered = true;
            }
        }
        return this;
    }
    
    // Sets Windows wakeup socket to a signaled state.
    private void setWakeupSocket() {
       setWakeupSocket0(wakeupSinkFd);
    }
    private native void setWakeupSocket0(int wakeupSinkFd);


    最终调用了native方法来实现。

    看来wakeupSinkFd这个变量是为wakeup方法使用的。
    其中interruptTriggered为中断已触发标志,当pollWrapper.interrupt()之后,该标志即为true了;因为这个标志,连续两次wakeup,只会有一次效果。

    为了实现client和server的数据交互,Linux下采用管道pipe实现,windows下采用两个socket之间的通信进行实现,它们都有这样的特性:

    1. 都有两个端,一个 是read端,一个是write端,windows中两个socket也是read和write的角色。
    2. 当往write端写入 数据,则read端即可以收到数据。
  • 相关阅读:
    [LeetCode] Power of Three 判断3的次方数
    [LeetCode] 322. Coin Change 硬币找零
    [LeetCode] 321. Create Maximum Number 创建最大数
    ITK 3.20.1 VS2010 Configuration 配置
    VTK 5.10.1 VS2010 Configuration 配置
    FLTK 1.3.3 MinGW 4.9.1 Configuration 配置
    FLTK 1.1.10 VS2010 Configuration 配置
    Inheritance, Association, Aggregation, and Composition 类的继承,关联,聚合和组合的区别
    [LeetCode] Bulb Switcher 灯泡开关
    [LeetCode] Maximum Product of Word Lengths 单词长度的最大积
  • 原文地址:https://www.cnblogs.com/guazi/p/6605221.html
Copyright © 2011-2022 走看看