zoukankan      html  css  js  c++  java
  • JAVA NIO 简单介绍

    Version:0.9 StartHTML:-1 EndHTML:-1 StartFragment:00000099 EndFragment:00918492
    一:为什么要使用NIO技术
          基本的Java套接字对于小规模系统可以很好地运行,但当涉及同时处理几千甚至上万个客户端的服务器时,可能会产生一些问题。
          如果一个客户端一个线程的方式去处理,则由于创建、维护和切换线程需要的系统开销导致系统扩展性方面受到了很大限制;当然你也可以使用线程池,也可以节省一些开销,也同时可以使用并行硬件的优势,比如F5,网络连接均衡服务器等等。但对于连接生存期比较长的协议来说,线程池的大小仍然限制了系统可以同时处理的客户端数量。考虑一个在客户端之间传递消息的即时消息服务器IM。客户端必须不停地连接服务器以接收即时消息,因此线程池的大小限制了系统可以同时服务的客户端总数。如果增加线程池的大小,将带来更多的线程处理开销,而不能提升系统的性能,因为在大部分的时间里客户端是处于空闲状态的。
    如果这是所有问题,可能NIO还不是必须的。不幸的是,在使用线程的扩展性方面还涉及一些更加难把握的挑战。其中一个挑战就是程序员几乎不能对什么时候哪个线程将获得服务进行控制。你可以设置一个线程实例的优先级,但是这个优先级只是一种“建议”,下一个选择执行的线程完全取决于具体实现。因此,如果程序员想要保证某些连接优先获得服务,或想要制定一定的服务顺序,线程可能就很难做到。
    然而,有关线程的最重要的问题可能要保证数据的一致性,但很多客户端之间共享一些状态信息时,这就需要使用锁机制或者其他互斥机制对依次访问状态进行严格的同步。否则,由于不同线程上的程序段交错执行,他们之间会改掉其他线程说做的修改。
    由于需要对共享状态进行同步访问,要同时考虑到多线程服务器的正确性和高效性就变得非常困难。使用锁机制将增加更多的的系统调度和上下文切换开销,而程序员对这些开销又无法控制。由于其复杂性,一些程序员宁愿继续使用单线程方法。这类服务器只用一个线程来处理所有客户端,但不是顺序处理,而是一次全部处理。这种服务器不能为任何客户端提供I/O操作的阻塞等待,而必须排他地使用非阻塞I/O。
    在我们写Socket服务器端的时候,肯定会用到ServerSocket类的accept方法,当在ServerSocket实例上调用 accept方法时,如果有一个新的连接来了,则accept方法会立即返回一个socket实例,否则该方法将一直阻塞直到有新的连接到来或计时器超时。假设我们用一个线程专门来处理连接的请求,也就是accept方法;不幸的是,我们会发现这种方法要求我们不断地轮询所有的I/O源,而这种“忙等” 方法又会引入很多系统开销,因为程序要反复循环地连接I/O源,却又发现什么都不用做。以下代码就是一个典型的处理客户端请求方式,循环一直在跑,除非有人把循环标志给修改了,server.accept()方法一直在阻塞直到有一个新的的连接过来,如果有新的连接过来这返回一个socket实例,并扔给连接管理器去处理,如果一直都没有连接过来则一直阻塞在那里死等。
    while(!bCanExit) {
    try {
           //如果单在一个线程处理socket连接,该方法一直会阻塞,直到有新的连接过来
           Socketsocket = server.accept();
           Connection connection = newConnection(socket);
           connection.setClientId(Util.random32UUID());
           connectionManager.add(connection);
           if(logger.isInfoEnabled()){
                 logger.info("有一个新的连接!");
           }
    } catch(IOException e) {}
    }
    try {
    server.close();
    } catch(IOException e) {
    logger.error("close serverSocketerror:", e);
    }
    ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    客户端代码:
    packageorg.zapldy.tcpip.nio;
     
    importjava.net.InetSocketAddress;
    importjava.net.SocketException;
    importjava.nio.ByteBuffer;
    importjava.nio.channels.SocketChannel;
     
    public classTCPEchoClientNoblocking {
     
       public staticvoid main(String[] args) throwsException {
          String server = "127.0.0.1";
          byte[] data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".getBytes();
          int servPort = 8888;
     
          SocketChannel clntChan =SocketChannel.open();
          clntChan.configureBlocking(false);
          //我们通过持续调用finishConnect()方法来“轮询”连接状态,该方法在连接成功建立之前
           //一直返回false。打印操作显示了在等待连接建立的过程中,程序还可以执行其他任务。不过
           //,这种忙等的方法非常浪费系统资源,这里这样做只是为了演示该方法的使用。
          if (!clntChan.connect(newInetSocketAddress(server,servPort))) {
              while (!clntChan.finishConnect()) {
                 System.out.print("=");//这里可以做其他事情
              }
          }
     
          ByteBuffer writeBuf =ByteBuffer.wrap(data);
          ByteBuffer readBuf =ByteBuffer.allocate(data.length);
     
          int totalBytesRcvd = 0;
          int bytesRcvd;
          while (totalBytesRcvd < data.length) {
              if (writeBuf.hasRemaining()) {
                 clntChan.write(writeBuf);
              }
              if ((bytesRcvd =clntChan.read(readBuf)) == -1) {
                 throw newSocketException("Connection closedprematurely");
              }
              totalBytesRcvd +=bytesRcvd;
              System.out.print("=");
          }
          System.out.println("Recieved: "
                 + new String(readBuf.array(), 0,totalBytesRcvd));
          clntChan.close();
       }
    }
    ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
       服务器端代码(服务器还是用传统的方式实现的,后面将再用NIO重写,这里只是为了读者能快速运行代码):
    packageorg.zapldy.tcpip;
     
    importjava.io.IOException;
    importjava.io.InputStream;
    importjava.io.OutputStream;
    importjava.net.ServerSocket;
    importjava.net.Socket;
    importjava.net.SocketAddress;
     
    publicclass TCPEchoServer {
       private staticfinal int BUFSIZE = 32;
       public staticvoid main(String[] args) throwsIOException {
          int servPort = 8888;
          ServerSocket servSocket = newServerSocket(servPort);
     
          int recvMsgSize = 0;
          byte[] recvBuf = newbyte[BUFSIZE];
     
          while (true) {
              Socket clntSocket =servSocket.accept();// 该方法会阻塞
              SocketAddressclientAddress =
                                   clntSocket.getRemoteSocketAddress();
              System.out.println("Handling client at " + clientAddress);
             InputStream in =clntSocket.getInputStream();
              OutputStream out =clntSocket.getOutputStream();
              while ((recvMsgSize = in.read(recvBuf))!= -1) {
                out.write(recvBuf, 0, recvMsgSize);
              }
              clntSocket.close();
          }
       }
    }
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    Selector
    上面已经提到过,Selector类可以用于避免使用阻塞式客户端中很浪费资源的“忙等”方法。例如,考虑一个IM服务器。像QQ或者旺旺这样的,可能有几万甚至几千万个客户端同时连接到了服务器,但在任何时刻都只是非常少量的消息
    需要读取和分发。这就需要一种方法阻塞等待,直到至少有一个信道可以进行I/O操作,并指出是哪个信道。NIO的选择器就实现了这样的功能。一个 Selector实例可以同时检查一组信道的I/O状态。用专业术语来说,选择器就是一个多路开关选择器,因为一个选择器能够管理多个信道上的I/O操作。然而如果用传统的方式来处理这么多客户端,使用的方法是循环地一个一个地去检查所有的客户端是否有I/O操作,如果当前客户端有I/O操作,则可能把当前客户端扔给一个线程池去处理,如果没有I/O操作则进行下一个轮询,当所有的客户端都轮询过了又接着从头开始轮询;这种方法是非常笨而且也非常浪费资源,因为大部分客户端是没有I/O操作,我们也要去检查;而Selector就不一样了,它在内部可以同时管理多个I/O,当一个信道有I/O操作的时候,他会通知Selector,Selector就是记住这个信道有I/O操作,并且知道是何种I/O操作,是读呢?是写呢?还是接受新的连接;所以如果使用Selector,它返回的结果只有两种结果,一种是0,即在你调用的时刻没有任何客户端需要I/O操作,另一种结果是一组需要I/O操作的客户端,这是你就根本不需要再检查了,因为它返回给你的肯定是你想要的。这样一种通知的方式比那种主动轮询的方式要高效得多!
    要使用选择器(Selector),需要创建一个Selector实例(使用静态工厂方法open())并将其注册(register)到想要监控的信道上(注意,这要通过channel的方法实现,而不是使用selector的方法)。最后,调用选择器的select()方法。该方法会阻塞等待,直到有一个或更多的信道准备好了I/O操作或等待超时。select()方法将返回可进行I/O操作的信道数量。现在,在一个单独的线程中,通过调用 select()方法就能检查多个信道是否准备好进行I/O操作。如果经过一段时间后仍然没有信道准备好,select()方法就会返回0,并允许程序继续执行其他任务。
    下面来看一个例子。假设我们想要使用信道和选择器来实现一个像上面一样的回显服务器,并不使用多线程和忙等。为了使不同协议都能方便地使用这个基本的服务模式,我们把信道中与具体协议相关的处理各种I/O操作(接收,读,写)分离出来。Protocol定义了通用 EchoSelectorServer类与特定协议之间的接口,包括三个方法,每个方法代表了一种I/O形式。当有信道准备好I/O操作时,服务器只需要调用相应的方法即可。
    packageorg.zapldy.tcpip.nio;
     
    importjava.io.IOException;
    importjava.nio.channels.SelectionKey;
     
    publicinterface Protocol {
       public voidhandleAccept(SelectionKey key) throwsIOException;
       public voidhandleRead(SelectionKey key) throwsIOException;
       public voidhandleWrite(SelectionKey key) throwsIOException;
    }
    下面是具体的实现(注意看注释):
    packageorg.zapldy.tcpip.nio;
     
    importjava.io.IOException;
    importjava.nio.ByteBuffer;
    importjava.nio.channels.SelectionKey;
    importjava.nio.channels.ServerSocketChannel;
    importjava.nio.channels.SocketChannel;
     
    publicclass EchoProtocol implementsProtocol {
     
       private intbufsize;//为每个客户端信道创建的缓冲区大小
     
       public EchoProtocol(int bufsize) {
          this.bufsize = bufsize;
       }
     
     
       public voidhandleAccept(SelectionKeykey) throws IOException {
          //channel()方法返回注册时用来创建的Channel,该Channel是一个ServerSocketChannel,
           //因为这是我们注册的唯一一种支持accept操作的信道,
    //accept()方法为传入的连接返回一个SocketChannel实例。
          SocketChannel channel =((ServerSocketChannel)          key.channel()).accept();
          //这里无法注册阻塞式信道,必须是非阻塞式的
          channel.configureBlocking(false);
          //可以通过SelectionKey类的selector()方法来获取相应的Selector。
           //我们根据指定大小创建了一个新的ByteBuffer实例,
           //并将其作为参数传递给register()方法。它将作为附件,与regiter()方法所返回的
    //SelectionKey实例相关联。
          channel.register(key.selector(),SelectionKey.OP_READ, ByteBuffer
                 .allocateDirect(bufsize));
       }
     
     
       public voidhandleRead(SelectionKey key) throws IOException {
          //根据其支持数据读取操作可知,这是一个SocketChannel。
          SocketChannel channel =(SocketChannel) key.channel();
          //建立连接后,有一个ByteBuffer附件加到该SelectionKey实例上,这个附件里面的内容将
           //会在发送的时候用到,附件始终是附着这个长连接上
          ByteBuffer buf = (ByteBuffer)key.attachment();
          long bytesRead = channel.read(buf);
          //如果read()方法返回-1,则表示底层连接已经关闭,此时需要关闭信道。
           //关闭信道时,将从选择器的各种集合中移除与该信道关联的键。
          if (bytesRead == -1) {
              channel.close();
          } else if (bytesRead > 0) {
              //这里依然保留了信道的可读操作,虽然缓冲区中可能已经没有剩余空间了,
    //因为下次还是要接受新的数据
              key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
          }
       }
     
       public voidhandleWrite(SelectionKey key) throws IOException {
          //附加到SelectionKey上的ByteBuffer包含了之前从信道中读取的数据。
          ByteBuffer buf =(ByteBuffer)key.attachment();
          //该方法用来修改缓冲区的内部状态,以指示write操作从什么地方获取数据,及还剩多少数据
          buf.flip();
          SocketChannel channel =(SocketChannel)key.channel();//获取信道
          channel.write(buf);//向信道中写数据
          if(!buf.hasRemaining()){
              //如果没有剩余数据可读,则修改该键关联的操作集,指示其只能进行读操作了
              key.interestOps(SelectionKey.OP_READ);
          }
       //如果缓冲区中还有剩余数据,该操作将剩余数据移到缓冲区前端,以使下次迭代能读入更多数据。
          buf.compact();
       }
    }
    下面是回显服务器端代码的实现,在服务器端创建一个选择器,并将其与每个侦听客户端连接的套接字说对应的ServerSocketChannel注册在一起。然后进行反复循环,调用select()方法,并调用相应的操作器对各种类型的I/O操作进行处理。
    packageorg.zapldy.tcpip.nio;
     
    importjava.io.IOException;
    importjava.net.InetSocketAddress;
    importjava.nio.channels.SelectionKey;
    importjava.nio.channels.Selector;
    importjava.nio.channels.ServerSocketChannel;
    importjava.util.Iterator;
     
    publicclass EchoSelectorServer {
       private staticfinal int BUFSIZE = 256;
       private staticfinal int TIMEOUT = 3000;
     
       private staticfinal int PORT = 8888;
     
       public staticvoid main(String[] args) throwsIOException{
          Selector selector = Selector.open();
     
          ServerSocketChannellistnChannel = ServerSocketChannel.open();
          listnChannel.socket().bind(newInetSocketAddress(PORT));
          //只有非阻塞信道才可以注册选择器,因此需要将其配置为适当的状态
          listnChannel.configureBlocking(false);
          //在注册过程中指出该信道可以进行“accept”操作
          listnChannel.register(selector,SelectionKey.OP_ACCEPT);
     
          Protocolprotocol = newEchoProtocol(BUFSIZE);
          while(true){
              if(selector.select(TIMEOUT) == 0){
                 System.out.print("==");
                 continue;
              }
     
                            Iterator<SelectionKey>keyIter =
    selector.selectedKeys().iterator();
              while(keyIter.hasNext()){
                 SelectionKey key =keyIter.next();
                 if(key.isAcceptable()){
                     protocol.handleAccept(key);
                 }
                 if(key.isReadable()){
                     protocol.handleRead(key);
                 }
                 if(key.isWritable() &&key.isValid()){
                     protocol.handleWrite(key);
                 }
                 //由于select()操作只是向Selector所关联的键集合中添加元素
                   //因此,如果不移除每个处理过的键,
    //它就会在下次调用select()方法时仍然保留在集合中
                   //而且可能会有无用的操作来调用它。
                 keyIter.remove();
              }
          }
       }
    }
     
    /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    流(TCP)信道详解
    流信道有两个变体:SocketChannel和ServerSocketChannel。像其对应的Socket一样,SocketChannel是相互连接的终端进行通信的信道。
    SocketChannel:创建,连接和关闭
       static SocketChannel open(SocketAddressremote)
       static SocketChannel open()
       boolean connect(SocketAddress remote)
       boolean isConnected()
       void close()
       boolean isOpen()
       Socket socket()
       调用SocketChannel的静态工厂方法open()可以创建一个实例。open()方法的第一种形式以SocketAddress为参数,返回一个连接到指定服务器的SocketChannel实例。注意,该方法可能会无限期地阻塞下去。open()的无参数形式用于创建一个没有连接的 SocketChannel实例,该实例可以通过调用connect()方法连接到指定终端。当使用完SocketChannel后,需要调用 close()方法将其关闭。有一点很重要,即每个SocketChannel实例都包裹了一个基本的Java Socket,并可以通过socket()方法对该Socket进行访问。这就可以通过基本的Socket方法进行绑定、设置套接字选项等操作。
     
    在创建并连接SocketChannel后,就可以调用该信道的读写方法进行I/O操作。
    SocketChannel读和写
       int read(ByteBuffer dst)
       long read(ByteBuffer[] dsts)
       long read(ByteBuffer[] dsts, int offset,int length)
       int write(ByteBuffer src)
       int write(ByteBuffer[] srcs)
       int write(ByteBuffer[] srcs, intoffset, int length)
            读操作的最基本形式以一个ByteBuffer为参数,并将读取的数据填入该缓冲区所有剩余字节空间中。另一种形式以多个ByteBuffer为参数(ByteBuffer数组),并根据其在数组中的顺序,将读取的数据依次填入每个缓冲区的剩余字节空间中。这种方法称为散射式读,因为它将读入的直接分散到了多个缓冲区中。需要注意重要的一点,散射式读不一定会将所有缓冲区填满,这些缓冲区的总空间大小只是一个上限。
          写操作的最基本形式是以一个ByteBuffer为参数,并试图将该缓冲区中剩余的字节写入信道。另一种形式以一个ByteBuffer数组作为参数,并试图将所有缓冲区中的剩余字节都写入信道。这种方法称为聚集式写,因为它把多个缓冲区中的字节聚集起来,一起发送出去。
            与其对应的ServerSocket一样,ServerSocketChannel是用来侦听客户端连接的信道。
    ServerSocketChannel创建,接受和关闭
       static ServerSocketChannel open()
       ServerSocket socket()
       SocketChannel accept()
       void close()
       boolean isOpen()
       调用静态工厂方法open()可以创建一个ServerSocketChannel实例。每个实例都包裹了一个ServerSocket实例,并可以通过 socket()方法对其访问。正如前面的例子所表明的,必须通过底层的ServerSocket实例来实现绑定制定端口,设置套接字选项等操作。在创建了信道实例并绑定端口后,就可以调用accept()方法来准备接收客户端的连接请求。连接成功则返回一个新的已连接的SocketChannel。在用完ServerSocketChannel后,需要调用close()方法将其关闭。
       如前文提到的那样,阻塞式信道除了能够(必须)与Buffer一起使用外,对于普通套接字来说几乎没有优点。因此,可能总是需要将其设置成非阻塞式的。
    SocketChannel, ServerSocketChannel设置阻塞行为
       SelectableChannelconfigureBlocking(boolean block)
       boolean isBlocking()
       通过调用configureBlocking(false)可以将SocketChannel或ServerSocketChannel设置为非阻塞模式。configureBlocking()方法将返回一个SelectableChannel,它是SocketChannel和 ServerSocketChannel父类。
       考虑为SocketChannel设置连接的情况。如果传给SocketChannel的工厂方法open()一个远程地址,对该方法的调用则将阻塞等待,直到成功建立了连接。要避免这种情况,可以使用open()方法的无参数形式,配置信道为非阻塞模式,再调用connect()方法,制定远程终端地址。如果在没有阻塞的情况下连接已经建立,connect()方法返回true;否则需要有检查套接字是否连接成功的方法。
    SocketChannel测试连接性
       boolean finishConnect()
       boolean isConnected()
       boolean isConnectionPending()
       对于非阻塞SocketChannel来说,一旦已经发起连接,底层套接字可能既不是已经连接,又不是没有连接,而是连接“正在进行”。由于底层协议的工作机制,套接字可能会在这个状态一直保持下去。finishConnect()方法可以用来检查在非阻塞套接字上试图进行的连接状态,还可以在阻塞套接字建立连接的过程中阻塞等待,直到连接成功建立。例如,你可能需要将信道配置成非阻塞模式,通过connect()方法发起连接,做完一些其他工作后,又将信道配置成阻塞模式,然后调用finishConnect()方法等待连接建立完成。或者可以让信道保持在非阻塞模式,并反复调用 finishConnect()方法。如TCPEchoClientNoblocking类中所示。
          isConnected()用于检查套接字是否已经建立了连接,从而避免在进行其他操作时抛出NotYetConnectedException异常(如在调用read()或write()时)。还可以使用isConnectedPending()方法来检查是否有连接在该信道上发起。知道是否有连接发起是有必要的,因为如果没有的话,finishConnect()方法将抛出NoConnectionPendingException异常。
    5、Selector详解
    EchoSelectorServer示例中展示了Selector的基本用法。在此,我们将对其进行更加详细的介绍。
    Selector创建和关闭
       static Selector open()
       boolean isOpen()
       void close()
       调用Selector的open()工厂方法可以创建一个选择器实例。选择器的状态是“打开”或是“关闭”的。创建时选择器的状态时打开的,并保持该状态,直到调用close()方法通知系统其任务已经完成。可以调用isOpen()方法来检查选择器是否已经关闭。
    1)在信道中注册
       我们已经知道,每个选择器都有一组与之关联的信道,选择器对这些信道上“感兴趣的”I/O操作进行监听。Selector与Channel之间的关联由一个SelectionKey实例表示。(注意:一个信道可以注册多个Selector实例,因此可以有多个关联的SelectionKey实例)。 SelectionKey维护了一个信道上感兴趣的操作类型信息,并将这些信息存放在一个int型的位图中,该int型数据的每一位都有相应的含义。
       SelectionKey类中的常量定义了信道上可能感兴趣的操作类型,每个这种常量都是只有一位设置为1的位掩码。
    SelectionKey兴趣操作集
       static int OP_ACCEPT
       static int OP_CONNECT
       static int OP_READ
       static int OP_WRITE
       int interestOps()
       SelectionKey interestOps(int ops)
       通过对OP_ACCEPT,OP_CONNECT,OP_READ以及OP_WRITE中适当的常量进行按位OR,我们可以构造一个位向量来制定一组操作。例如,一个包含读和写的操作集可由表达式(OP_READ| OP_WRITE)来指定。不带参数的interestOps()方法将返回一个int型位图,该位图中设置为1的每一位都指示了信道上需要监听的一种操作。另一种方法以一个位图为参数,指示了应该监听信道上的哪些操作。重点提示:任何对key(信道)所关联的兴趣操作集的改变,都只在下次调用了 select()方法后才会生效。
     
    SocketChannel, ServerSocketChannel注册Selector
       SelectionKey register(Selectorsel, intops)
       SelectionKey register(Selectorsel, int ops, Object attachment)
       int validOps()
       boolean isRegistered()
       SelectionKey keyFor(Selector sel)
       调用信道的register()方法可以将一个选择器注册到该信道。在注册过程中,通过存储在int型数据中的位图来指定该信道上的初始兴趣操作集。 register()方法将返回一个代表了信道和给定选择器之间的关联的SelectionKey实例。validOps()方法用于返回一个指示了该信道上的有效I/O操作集的位图。对于SocketChannel来说,有效操作包括读、写和连接。一个信道可能只与一个选择器注册一次,因此后续对 register()方法的调用只是简单地更新该key所关联的兴趣操作集。使用isRegistered()方法可以检查信道是否已经注册了选择器。 keyFor()方法与第一次调用register()方法返回的是同一个SelectionKey实例,除非该信道没有注册给定的选择器。
    以下代码注册了一个信道,支持读写操作:
    SelectionKey key = clientChannel.register(selector,
                            SelectionKey.OP_READ| SelectionKey.OP_WRITE)
    下图展示了一个选择器,其键集中包含了7个代表注册信道的键:两个在端口8888和8889上的服务器信道,以及从服务器信道创建的5个客户端信道:
    SelectionKey获取和取消
       Selector selector()
       SelectableChannel channel()
       void cancel()
       键关联的Selector实例和Channel实例可以分别使用该键的selector()和channel()方法获得。cancel()方法用于(永久性地)注销该键,并将其放入选择器的注销集中。在下一次调用select()方法时,这些键将从该选择器的所有集中移除,其关联的信道也将不再被监听(除非它又重新注册)。
     
    2) 选取和识别准备就绪的信道
       在信道上注册了选择器,并由关联的键指定了感兴趣的I/O操作集后,我们就只需要坐下来等待I/O了。这要使用选择器来完成。
    Selector等待信道准备就绪
       int select()
       int select(long timeout)
       int selectNow()
       Selector wakeup()
       select()方法用于从已经注册的信道中返回在感兴趣的I/O操作集上准备就绪的信道总数。(例如,兴趣操作集中包含OP_READ的信道有数据可读,或包含OP_ACCEPT的信道有连接请求待接受。)以上三个select()方法的唯一区别在于它们的阻塞行为。无参数的select()方法会阻塞等待,直到至少有一个注册信道中有感兴趣的操作准备就绪,或有别的线程调用了该选择器wakeup()方法(这种情况下select()方法将返回 0)。以超时时长作为参数的select方法也会阻塞等待,直到至少有一个信道准备就绪,或等待时间超过了指定的毫秒数(正数),或者有另一个线程调用其 wakeup()方法。selectNow()方法是一个非阻塞版本:它总数立即返回,如果没有信道准备就绪,则返回0.wakeup()方法可以使用当前阻塞(也就是说在另一个线程中阻塞)的任何一种select()方法立即返回;如果当前没有select方法阻塞,下一次调用者三种方法的任何一个都将立即返回。
          选择之后,我们需要知道哪些信道准备好了特定的I/O操作。每个选择器都维护了一个已选键集,与这些键关联的信道都有即将发生的特定I/O操作。通过调用 selectedKey()方法可以访问已选键集,该方法返回一组selectionKey。我们可以在这组键上进行迭代,分别处理等待在每个键关联的信道上的I/O操作。
       Iterator<SelectionKey>keyIter =
    selector.selectedKeys().iterator();
    while(keyIter.hasNext()){
           SelectionKey key = keyIter.next();
           //...在这里处理该key所关联的信道channel
           keyIter.remove();
          }
       }
     
    Selector获取键集
       Set<SelectionKey> keys()
       Set<SelectionKey>selectedKeys()
          以上方法返回选择器的不同键集。keys()方法返回当前已注册的所有键。返回的键集是不可修改的;任何对其进行修改的尝试(如,调用其remove() 方法)都将抛出UnsupportedOperationException异常。selectedKeys()方法用于返回上次调用select()方法时,被“选中”的已准备好进行I/O操作的键。重要提示:selectedKeys()方法返回的键是可修改的,在实际上在两次调用select()方法之间,都必须“手工”将清空。换句话说,select方法只会在已有的所选键集上添加键,它们不会创建新的键集。
       所有键集指示了哪些信道当前可以进行I/O操作。对于选中的每个信道,我们需要知道它们各自准备好的特定I/O操作。除了兴趣操作集外,每个键还维护了一个即将进行的I/O操作集,称为就绪操作集。
    SelectionKey查找就绪的I/O操作
       int readyOps()
       boolean isAcceptable()
       boolean isConnectable()
       boolean isReadable()
       boolean isValid()
       boolean isWritable()
       对于给定的键,可以使用readyOps()方法或其他指示方法来确定兴趣集中的哪些I/O操作可以执行。readyOps()方法以位图的形式返回所有准备就绪的操作集。其他方法用于分别检查各种操作是否可用。
    例如,查看键关联的信道上是否有正在等待的读操作,可以使用以下代码:
    (key.readOps() &SelectionKey.OP_READ) != 0
    key.isReadable()
       选择器的已选键集中的键,以及每个键中准备就绪的操作,都是由select()方法来确定的。随着时间的推进,这些信息可能会过时。其他线程可能会处理准备就绪的I/O操作。同时,键也不是永远存在的。但其关联的信道或选择器关闭时,键也将失效。通过调用其cancel()方法可以显示地将键设置为无效。调用其isValid()方法可以检测一个键的有效性。无效的键将添加到选择器的注销集中,并在下次调用任意一种形式的select()方法和或者 close()方法时从键集中移除。(当然,从键集中移除键意味着与它关联的信道也不再受监听。)
     
    3) 信道附件
       当一个信道准备好进行I/O操作时,通常还需要额外的信息来处理请求。例如,在前面的回显协议中,但客户端信道准备好写操作时,就需要有数据可写。当然,我们所需要的可写数据是由之前同一信道上的读操作收集的,但是在其可写之前,这些数据存放在什么地方呢?另一个例子,如果一个消息一次传来了多个字节,我们需要保存已接收的部分消息,直到整个消息接收完成。这两种情况都需要维护每个信道的状态信息。然而,我们非常幸运!SelectionKey通过使用附件使保存每个信道的状态变得容易。
    SelectionKey查找就绪的I/O操作
    Object attach(Object ob)
    Object attachment()
            每个键可以有一个附件,数据类型只能是Object类。附件可以在信道第一次调用register()方法时与之关联,或者后来再使用attach()方法直接添加到键上。通过SelectionKey的attachment()方法可以访问键的附件。
     
    4) Selector小结
       总的来说,使用Selector的步骤如下:
    1、 创建一个Selector实例。
    2、将其注册到各种信道,指定每个信道上感兴趣的I/O操作。
    3、 重复执行:
    1) 调用一种select方法
    2) 获取选取的键列表
    3) 对于已选键集中的每个键。
    a. 获取信道,并从键中获取附件(如果合适的话)
    b. 确定准备就绪的操作并执行。如果是accept操作,将接受的信道设置为非阻塞模式,并将其与选择器注册。
    c. 如果需要,修改键的兴趣操作集
    d. 从已选键中移除键
    如果选择器告诉了你什么时候I/O操作准备就绪,你还需要非阻塞I/O吗?答案是肯定的。信道在已选键集中的键并不能确保非阻塞I/O,因为调用了 select()方法后,键集信息可能会过时。另外,阻塞式写操作会阻塞等待直到写完所有字节,而就绪集中的OP_WRITE仅表示至少有一个字节可写。实际上,只是非阻塞模式的信道才能与选择器进行注册:如果信道在阻塞模式,SelectableChannel类的register()方法将抛出 IllegalBlockingModeException异常。
     
  • 相关阅读:
    登录及注册页面
    多方式登录
    git笔记
    后台主页模块设计
    auth模块迁移后需新增字段
    使用idea构建SpringBoot源码
    Springboot相关面试问题
    Springboot自动加载工具-devtools的理解与使用
    SpringBoot项目的一些简单常用配置
    Java线程池及Executor框架的理解
  • 原文地址:https://www.cnblogs.com/leehongee/p/3323840.html
Copyright © 2011-2022 走看看