zoukankan      html  css  js  c++  java
  • JDK NIO编程


    我们首先需要澄清一个概念:NIO到底是什么的简称?有人称之为New I/O,因为它相对于之前的I/O类库是新增的,所以被称为New I/O,这是它的官方叫法。但是,由于之前老的I/O类库是阻塞I/O,New I/O类库的目标就是要让Java支持非阻塞I/O,所以,更多的人喜欢称之为非阻塞I/O(Non-block I/O),由于非阻塞I/O更能够体现NIO的特点。

    与Socket类和ServerSocket类相对应,NIO也提供了SocketChannelServerSocketChannel两种不同的套接字通道实现。这两种新增的通道支持阻塞和非阻塞两种模式。阻塞模式使用非常简单,但是性能和可靠性都不好,非阻塞模式则正好相反。开发人员一般可以根据自己的需要来选择合适的模式,一般来说,低负载、低并发的应用程序可以选择同步阻塞I/O以降低编程复杂度,但是对于高负载、高并发的网络应用,需要使用NIO的非阻塞模式进行开发。

    NIO类库简介

    新的输入/输出(NIO)库是在JDK 1.4中引入的。NIO弥补了原来同步阻塞I/O的不足,它在标准Java代码中提供了高速的、面向块的I/O。通过定义包含数据的类,以及通过以块的形式处理这些数据,NIO不用使用本机代码就可以利用低级优化,这是原来的I/O包所无法做到的。

    1.缓冲区Buffer

    我们首先介绍缓冲区(Buffer)的概念,Buffer是一个对象,它包含一些要写入或者要读出的数据。在NIO类库中加入Buffer对象,体现了新库与原I/O的一个重要区别。在面向流的I/O中,可以将数据直接写入或者将数据直接读到Stream对象中。

    在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。

    缓冲区实质上是一个数组。通常它是一个字节数组(ByteBuffer),也可以使用其他种类的数组。但是一个缓冲区不仅仅是一个数组,缓冲区提供了对数据的结构化访问以及维护读写位置(limit)等信息。

    最常用的缓冲区是ByteBuffer,一个ByteBuffer提供了一组功能用于操作byte数组。除了ByteBuffer,还有其他的一些缓冲区,事实上,每一种Java基本类型(除了Boolean类型)都对应有一种缓冲区,具体如下:

    ByteBuffer:字节缓冲区

    CharBuffer:字符缓冲区

    ShortBuffer:短整型缓冲区

    IntBuffer:整形缓冲区

    LongBuffer:长整形缓冲区

    FloatBuffer:浮点型缓冲区

    DoubleBuffer:双精度浮点型缓冲区

    每一个Buffer类都是Buffer接口的一个子实例。除了ByteBuffer,每一个 Buffer类都有完全一样的操作,只是它们所处理的数据类型不一样。因为大多数标准I/O操作都使用ByteBuffer,所以它除了具有一般缓冲区的操作之外还提供一些特有的操作,方便网络读写。

    2.通道Channel

    Channel是一个通道,可以通过它读取和写入数据,它就像自来水管一样,网络数据通过Channel读取和写入。通道与流的不同之处在于通道是双向的,流只是在一个方向上移动(一个流必须是InputStream或者OutputStream的子类),而且通道可以用于读、写或者同时用于读写。

    因为Channel是全双工的,所以它可以比流更好地映射底层操作系统的API。特别是在UNIX网络编程模型中,底层操作系统的通道都是全双工的,同时支持读写操作。

     

    自顶向下看,前三层主要是Channel接口,用于定义它的功能,后面是一些具体的功能类(抽象类),从类图可以看出,实际上Channel可以分为两大类:分别是用于网络读写的SelectableChannel和用于文件操作的FileChannel。

    3.多路复用器Selector

    多路复用器Selector,它是Java NIO编程的基础,熟练地掌握Selector对于掌握NIO编程至关重要。多路复用器提供选择已经就绪的任务的能力。简单来讲,Selector会不断地轮询注册在其上的Channel,如果某个Channel上面有新的TCP连接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

    一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用了epoll()代替传统的select实现,所以它并没有最大连接句柄1024/2048的限制。这也就意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端,这确实是个非常巨大的进步。

    NIO服务端序列图 

    下面,我们对NIO服务端的主要创建过程进行讲解和说明,作为NIO的基础入门,我们将忽略掉一些在生产环境中部署所需要的一些特性和功能。

    步骤一:打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道,代码示例如下。

    ServerSocketChannel acceptorSvr = ServerSocketChannel.open();

    步骤二:绑定监听端口,设置连接为非阻塞模式,示例代码如下。

    acceptorSvr.socket().bind(new InetSocketAddress(InetAddress.getByName(“IP”), port));

    acceptorSvr.configureBlocking(false);

    步骤三:创建Reactor线程,创建多路复用器并启动线程,代码如下。

    Selector selector = Selector.open();

    new Thread(new ReactorTask()).start();

    步骤四:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件,代码如下。

    SelectionKey key = acceptorSvr.register( selector, SelectionKey.OP_ACCEPT, ioHandler);

    步骤五:多路复用器在线程run方法的无限循环体内轮询准备就绪的Key,代码如下。

    int num = selector.select();

    Set selectedKeys = selector.selectedKeys();

    Iterator it = selectedKeys.iterator();

    while (it.hasNext()) {

    SelectionKey key = (SelectionKey)it.next();

    // ... deal with I/O event ...

    }

    步骤六:多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路,代码示例如下。

    SocketChannel channel = svrChannel.accept();

    步骤七:设置客户端链路为非阻塞模式,示例代码如下。

    channel.configureBlocking(false);

    channel.socket().setReuseAddress(true);

    ......

    步骤八:将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,用来读取客户端发送的网络消息,代码如下。

    SelectionKey key = socketChannel.register( selector, SelectionKey.OP_READ, ioHandler);

    步骤九:异步读取客户端请求消息到缓冲区,示例代码如下。

    int readNumber = channel.read(receivedBuffer);

    步骤十:对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排。

    Object message = null;

    while(buffer.hasRemain())

    {

      byteBuffer.mark();

      Object message = decode(byteBuffer);

      if (message == null)

      {

        byteBuffer.reset();

        break;

      }

      messageList.add(message );

    }

    if (!byteBuffer.hasRemain())

      byteBuffer.clear();

    else

      byteBuffer.compact();

    if (messageList != null & !messageList.isEmpty())

    {

      for(Object messageE : messageList)

        handlerTask(messageE);

    }

    步骤十一:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端,示例代码如下。

    socketChannel.write(buffer);

    注意:如果发送区TCP缓冲区满,会导致写半包,此时,需要注册监听写操作位,循环写,直到整包消息写入TCP缓冲区。

     

    服务端代码示例:

    复制代码
    import java.io.IOException;
    

    public class TimeServer {

    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span> main(String[] args) <span style="color: #0000ff;">throws</span><span style="color: #000000;"> IOException {
        </span><span style="color: #0000ff;">int</span> port = 8080<span style="color: #000000;">;
        </span><span style="color: #0000ff;">if</span> (args != <span style="color: #0000ff;">null</span> &amp;&amp; args.length &gt; 0<span style="color: #000000;">) {
            </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
                port </span>= Integer.valueOf(args[0<span style="color: #000000;">]);
            } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (NumberFormatException e) {
                </span><span style="color: #008000;">//</span><span style="color: #008000;"> 采用默认值</span>
    

    }
    }
    //MultiplexerTimeServer的多路复用类,它是个一个独立的线程,
    //负责轮询多路复用器Selector,可以处理多个客户端的并发接入。
    MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
    new Thread (timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
    }

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;

    public class MultiplexerTimeServer implements Runnable {

    </span><span style="color: #0000ff;">private</span><span style="color: #000000;"> Selector selector;
    
    </span><span style="color: #0000ff;">private</span><span style="color: #000000;"> ServerSocketChannel servChannel;
    
    </span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">volatile</span> <span style="color: #0000ff;">boolean</span><span style="color: #000000;"> stop;
    
    </span><span style="color: #008000;">//</span><span style="color: #008000;">在构造方法中进行资源初始化,创建多路复用器Selector、ServerSocketChannel,对Channel和TCP参数进行配置。
    </span><span style="color: #008000;">//</span><span style="color: #008000;">例如,将ServerSocketChannel设置为异步非阻塞模式,它的backlog设置为1024。
    </span><span style="color: #008000;">//</span><span style="color: #008000;">系统资源初始化成功后,将ServerSocket Channel注册到Selector,监听SelectionKey.OP_ACCEPT操作位;如果资源初始化失败(例如端口被占用),则退出。</span>
    <span style="color: #0000ff;">public</span> MultiplexerTimeServer(<span style="color: #0000ff;">int</span><span style="color: #000000;"> port) {
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
            selector </span>=<span style="color: #000000;"> Selector.open();
            servChannel </span>=<span style="color: #000000;"> ServerSocketChannel.open();
            servChannel.configureBlocking(</span><span style="color: #0000ff;">false</span><span style="color: #000000;">);
            servChannel.socket().bind(</span><span style="color: #0000ff;">new</span> InetSocketAddress(port), 1024<span style="color: #000000;">);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println(</span>"The time server is start in port : " +<span style="color: #000000;"> port);
        } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (IOException e) {
            e.printStackTrace();
            System.exit(</span>1<span style="color: #000000;">);
        }
    }
    
    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> stop() {
        </span><span style="color: #0000ff;">this</span>.stop = <span style="color: #0000ff;">true</span><span style="color: #000000;">;
    }
    
    @Override
    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> run() {
        </span><span style="color: #0000ff;">while</span> (!<span style="color: #000000;">stop) {
            </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
                </span><span style="color: #008000;">//</span><span style="color: #008000;">在线程的run方法的while循环体中循环遍历selector,它的休眠时间为1s,
                </span><span style="color: #008000;">//</span><span style="color: #008000;">无论是否有读写等事件发生,selector每隔1s都被唤醒一次,selector也提供了一个无参的select方法。
                </span><span style="color: #008000;">//</span><span style="color: #008000;">当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合,
                </span><span style="color: #008000;">//</span><span style="color: #008000;">通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作。</span>
                selector.select(1000<span style="color: #000000;">);
                Set selectedKeys </span>=<span style="color: #000000;"> selector.selectedKeys();
                Iterator it </span>=<span style="color: #000000;"> selectedKeys.iterator();
                SelectionKey key </span>= <span style="color: #0000ff;">null</span><span style="color: #000000;">;
                </span><span style="color: #0000ff;">while</span><span style="color: #000000;"> (it.hasNext()) {
                    key </span>=<span style="color: #000000;"> (SelectionKey) it.next();
                    it.remove();
                    </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
                        handleInput(key);<span style="color: #ff0000; background-color: #ffff00;">//这里可以用线程池启线程去单独处理客户端的请求业务</span>
                    } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (Exception e) {
                        </span><span style="color: #0000ff;">if</span> (key != <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
                            key.cancel();
                            </span><span style="color: #0000ff;">if</span> (key.channel() != <span style="color: #0000ff;">null</span><span style="color: #000000;">)
                                key.channel().close();
                        }
                    }
                }
            } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (Throwable t) {
                t.printStackTrace();
            }
        }
    
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源</span>
        <span style="color: #0000ff;">if</span> (selector != <span style="color: #0000ff;">null</span><span style="color: #000000;">)
            </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
                selector.close();
            } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (IOException e) {
                e.printStackTrace();
            }
    }
    
    </span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">void</span> handleInput(SelectionKey key) <span style="color: #0000ff;">throws</span><span style="color: #000000;"> IOException {
    
        </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (key.isValid()) {
            </span><span style="color: #008000;">//</span><span style="color: #008000;">根据SelectionKey的操作位进行判断即可获知网络事件的类型,</span>
            <span style="color: #0000ff;">if</span><span style="color: #000000;"> (key.isAcceptable()) {
                </span><span style="color: #008000;">//</span><span style="color: #008000;">通过ServerSocketChannel的accept接收客户端的连接请求并创建SocketChannel实例,
                </span><span style="color: #008000;">//</span><span style="color: #008000;">完成上述操作后,相当于完成了TCP的三次握手,TCP物理链路正式建立。
                </span><span style="color: #008000;">//</span><span style="color: #008000;">注意,我们需要将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,
                </span><span style="color: #008000;">//</span><span style="color: #008000;">例如TCP接收和发送缓冲区的大小等,作为入门的例子,没有进行额外的参数设置。</span>
                ServerSocketChannel ssc =<span style="color: #000000;"> (ServerSocketChannel) key.channel();
                SocketChannel sc </span>=<span style="color: #000000;"> ssc.accept();
                sc.configureBlocking(</span><span style="color: #0000ff;">false</span><span style="color: #000000;">);
                </span><span style="color: #008000;">//</span><span style="color: #008000;"> Add the new connection to the selector</span>
    

    sc.register(selector, SelectionKey.OP_READ);
    }
    if (key.isReadable()) {
    //首先创建一个ByteBuffer,由于我们事先无法得知客户端发送的码流大小,
    //作为例程,我们开辟一个1M的缓冲区。然后调用SocketChannel的read方法读取请求码流。
    //注意,由于我们已经将SocketChannel设置为异步非阻塞模式,因此它的read是非阻塞的。
    //使用返回值进行判断,看读取到的字节数
    SocketChannel sc = (SocketChannel) key.channel();
    ByteBuffer readBuffer
    = ByteBuffer.allocate(1024);
    int readBytes = sc.read(readBuffer);
    //返回值有以下三种可能的结果
    //返回值大于0:读到了字节,对字节进行编解码;
    //返回值等于0:没有读取到字节,属于正常场景,忽略;
    //返回值为-1:链路已经关闭,需要关闭SocketChannel,释放资源。
    if (readBytes > 0) {
    //当读取到码流以后,我们进行解码,首先对readBuffer进行flip操作,
    //它的作用是将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作。
    //然后根据缓冲区可读的字节个数创建字节数组,
    //调用ByteBuffer的get操作将缓冲区可读的字节数组复制到新创建的字节数组中,
    //最后调用字符串的构造函数创建请求消息体并打印。
    //如果请求指令是"QUERY TIME ORDER"则把服务器的当前时间编码后返回给客户端
    readBuffer.flip();
    byte[] bytes = new byte[readBuffer.remaining()];
    readBuffer.get(bytes);
    String body
    = new String(bytes, "UTF-8");
    System.out.println(
    "The time server receive order : "
    + body);
    String currentTime
    = "QUERY TIME ORDER"
    .equalsIgnoreCase(body)
    ? new java.util.Date(
    System.currentTimeMillis()).toString()
    :
    "BAD ORDER";
    //异步发送应答消息给客户端
    doWrite(sc, currentTime);
    }
    else if (readBytes < 0) {
    // 对端链路关闭
    key.cancel();
    sc.close();
    }
    else
    ;
    // 读到0字节,忽略
    }
    }
    }

    </span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> doWrite(SocketChannel channel, String response)
            </span><span style="color: #0000ff;">throws</span><span style="color: #000000;"> IOException {
        </span><span style="color: #008000;">//</span><span style="color: #008000;">首先将字符串编码成字节数组,根据字节数组的容量创建ByteBuffer,
        </span><span style="color: #008000;">//</span><span style="color: #008000;">调用ByteBuffer的put操作将字节数组复制到缓冲区中,然后对缓冲区进行flip操作,
        </span><span style="color: #008000;">//</span><span style="color: #008000;">最后调用SocketChannel的write方法将缓冲区中的字节数组发送出去。
        </span><span style="color: #008000;">//</span><span style="color: #008000;">需要指出的是,由于SocketChannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,
        </span><span style="color: #008000;">//</span><span style="color: #008000;">此时会出现“写半包”问题,我们需要注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕,
        </span><span style="color: #008000;">//</span><span style="color: #008000;">可以通过ByteBuffer的hasRemain()方法判断消息是否发送完成。
        </span><span style="color: #008000;">//</span><span style="color: #008000;">此处仅仅是个简单的入门级例程,没有演示如何处理“写半包”场景。</span>
        <span style="color: #0000ff;">if</span> (response != <span style="color: #0000ff;">null</span> &amp;&amp; response.trim().length() &gt; 0<span style="color: #000000;">) {
            </span><span style="color: #0000ff;">byte</span>[] bytes =<span style="color: #000000;"> response.getBytes();
            ByteBuffer writeBuffer </span>=<span style="color: #000000;"> ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
    

    }

    复制代码

    NIO客户端序列图

    步骤一:打开SocketChannel,绑定客户端本地地址(可选,默认系统会随机分配一个可用的本地地址),示例代码如下。

    SocketChannel clientChannel = SocketChannel.open();

    步骤二:设置SocketChannel为非阻塞模式,同时设置客户端连接的TCP参数,示例代码如下。

    clientChannel.configureBlocking(false);

    socket.setReuseAddress(true);

    socket.setReceiveBufferSize(BUFFER_SIZE);

    socket.setSendBufferSize(BUFFER_SIZE);

    步骤三:异步连接服务端,示例代码如下。

    boolean connected=clientChannel.connect(new InetSocketAddress(“ip”,port));

    步骤四:判断是否连接成功,如果连接成功,则直接注册读状态位到多路复用器中,如果当前没有连接成功(异步连接,返回false,说明客户端已经发送sync包,服务端没有返回ack包,物理链路还没有建立),示例代码如下。

    if (connected)

    {

      clientChannel.register( selector, SelectionKey.OP_READ, ioHandler);

    }

    else

    {

      clientChannel.register( selector, SelectionKey.OP_CONNECT, ioHandler);

    }

    步骤五:向Reactor线程的多路复用器注册OP_CONNECT状态位,监听服务端的TCP ACK应答,示例代码如下。

      clientChannel.register( selector, SelectionKey.OP_CONNECT, ioHandler);

    步骤六:创建Reactor线程,创建多路复用器并启动线程,代码如下。

      Selector selector = Selector.open();

      new Thread(new ReactorTask()).start();

    步骤七:多路复用器在线程run方法的无限循环体内轮询准备就绪的Key,代码如下。

    int num = selector.select();

    Set selectedKeys = selector.selectedKeys();

    Iterator it = selectedKeys.iterator();

    while (it.hasNext()) {

      SelectionKey key = (SelectionKey)it.next();

      // ... deal with I/O event ...

    }

    步骤八:接收connect事件进行处理,示例代码如下。

    if (key.isConnectable())

      handlerConnect();

    步骤九:判断连接结果,如果连接成功,注册读事件到多路复用器,示例代码如下。

    if (channel.finishConnect())

      registerRead();

    步骤十:注册读事件到多路复用器,示例代码如下。

    clientChannel.register( selector, SelectionKey.OP_READ, ioHandler);

    步骤十一:异步读客户端请求消息到缓冲区,示例代码如下。

    int readNumber = channel.read(receivedBuffer);

    步骤十二:对ByteBuffer进行编解码,如果有半包消息接收缓冲区Reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排,示例代码如下。

    Object message = null;

    while(buffer.hasRemain())

    {

      byteBuffer.mark();

      Object message = decode(byteBuffer);

      if (message == null)

      {

        byteBuffer.reset();

        break;

      }

      messageList.add(message );

    }

    if (!byteBuffer.hasRemain())

      byteBuffer.clear();

    else

      byteBuffer.compact();

    if (messageList != null & !messageList.isEmpty())

    {

      for(Object messageE : messageList)

        handlerTask(messageE);

    }

    步骤十三:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端,示例代码如下。

    socketChannel.write(buffer);

    客户端代码示例: 

    复制代码
    public class TimeClient {
    
    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> main(String[] args) {
        </span><span style="color: #0000ff;">int</span> port = 8080<span style="color: #000000;">;
        </span><span style="color: #0000ff;">new</span> Thread(<span style="color: #0000ff;">new</span> TimeClientHandle("127.0.0.1", port), "TimeClient- 001"<span style="color: #000000;">).start();
    }
    

    }

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;

    public class TimeClientHandle implements Runnable {
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    </span><span style="color: #0000ff;">public</span> TimeClientHandle(String host, <span style="color: #0000ff;">int</span><span style="color: #000000;"> port) {
        </span><span style="color: #008000;">//</span><span style="color: #008000;">构造函数用于初始化NIO的多路复用器和SocketChannel对象。
        </span><span style="color: #008000;">//</span><span style="color: #008000;">需要注意的是,创建SocketChannel之后,需要将其设置为异步非阻塞模式。
        </span><span style="color: #008000;">//</span><span style="color: #008000;">我们可以设置SocketChannel的TCP参数,例如接收和发送的TCP缓冲区大小。</span>
        <span style="color: #0000ff;">this</span>.host = host == <span style="color: #0000ff;">null</span> ? "127.0.0.1"<span style="color: #000000;"> : host;
        </span><span style="color: #0000ff;">this</span>.port =<span style="color: #000000;"> port;
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
            selector </span>=<span style="color: #000000;"> Selector.open();
            socketChannel </span>=<span style="color: #000000;"> SocketChannel.open();
            socketChannel.configureBlocking(</span><span style="color: #0000ff;">false</span><span style="color: #000000;">);
        } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (IOException e) {
            e.printStackTrace();
            System.exit(</span>1<span style="color: #000000;">);
        }
    }
    
    @Override
    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> run() {
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
            </span><span style="color: #008000;">//</span><span style="color: #008000;">作为示例,连接是成功的,所以不需要做重连操作,因此将其放到循环之前。</span>
    

    doConnect();
    }
    catch (IOException e) {
    e.printStackTrace();
    System.exit(
    1);
    }
    while (!stop) {
    try {
    //在循环体中轮询多路复用器Selector,当有就绪的Channel时,执行handleInput(key)方法
    selector.select(1000);
    Set selectedKeys
    = selector.selectedKeys();
    Iterator it
    = selectedKeys.iterator();
    SelectionKey key
    = null;
    while (it.hasNext()) {
    key
    = (SelectionKey) it.next();
    it.remove();
    try {
    handleInput(key);
    }
    catch (Exception e) {
    if (key != null) {
    key.cancel();
    if (key.channel() != null)
    key.channel().close();
    }
    }
    }
    }
    catch (Exception e) {
    e.printStackTrace();
    System.exit(
    1);
    }
    }

        </span><span style="color: #008000;">//</span><span style="color: #008000;">线程退出循环后,我们需要对连接资源进行释放,以实现“优雅退出”.
        </span><span style="color: #008000;">//</span><span style="color: #008000;">由于多路复用器上可能注册成千上万的Channel或者pipe,如果一一对这些资源进行释放显然不合适。
        </span><span style="color: #008000;">//</span><span style="color: #008000;">因此,JDK底层会自动释放所有跟此多路复用器关联的资源。
        </span><span style="color: #008000;">//</span><span style="color: #008000;">多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源</span>
        <span style="color: #0000ff;">if</span> (selector != <span style="color: #0000ff;">null</span><span style="color: #000000;">)
            </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
                selector.close();
            } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (IOException e) {
                e.printStackTrace();
            }
    }
    
    </span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">void</span> handleInput(SelectionKey key) <span style="color: #0000ff;">throws</span><span style="color: #000000;"> IOException {
        </span><span style="color: #008000;">//</span><span style="color: #008000;">我们首先对SelectionKey进行判断,看它处于什么状态。</span>
        <span style="color: #0000ff;">if</span><span style="color: #000000;"> (key.isValid()) {
            </span><span style="color: #008000;">//</span><span style="color: #008000;"> 判断是否连接成功</span>
            SocketChannel sc =<span style="color: #000000;"> (SocketChannel) key.channel();
            </span><span style="color: #008000;">//</span><span style="color: #008000;">如果是处于连接状态,说明服务端已经返回ACK应答消息。
            </span><span style="color: #008000;">//</span><span style="color: #008000;">这时我们需要对连接结果进行判断,调用SocketChannel的finishConnect()方法,
            </span><span style="color: #008000;">//</span><span style="color: #008000;">如果返回值为true,说明客户端连接成功;如果返回值为false或者直接抛出IOException,说明连接失败。
            </span><span style="color: #008000;">//</span><span style="color: #008000;">在本例程中,返回值为true,说明连接成功。</span>
            <span style="color: #0000ff;">if</span><span style="color: #000000;"> (key.isConnectable()) {
                </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (sc.finishConnect()) {
                    </span><span style="color: #008000;">//</span><span style="color: #008000;">将SocketChannel注册到多路复用器上,注册SelectionKey.OP_READ操作位,
                    </span><span style="color: #008000;">//</span><span style="color: #008000;">监听网络读操作,然后发送请求消息给服务端。</span>
    

    sc.register(selector, SelectionKey.OP_READ);
    doWrite(sc);
    }
    else
    System.exit(
    1);// 连接失败,进程退出
    }
    //客户端是如何读取时间服务器应答消息的。
    if (key.isReadable()) {
    //如果客户端接收到了服务端的应答消息,则SocketChannel是可读的,
    //由于无法事先判断应答码流的大小,我们就预分配1M的接收缓冲区用于读取应答消息,
    //调用SocketChannel的read()方法进行异步读取操作。由于是异步操作,所以必须对读取的结果进行判断。
    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    int readBytes = sc.read(readBuffer);
    if (readBytes > 0) {
    //如果读取到了消息,则对消息进行解码,最后打印结果。执行完成后将stop置为true,线程退出循环。
    readBuffer.flip();
    byte[] bytes = new byte[readBuffer.remaining()];
    readBuffer.get(bytes);
    String body
    = new String(bytes, "UTF-8");
    System.out.println(
    "Now is : " + body);
    this.stop = true;
    }
    else if (readBytes < 0) {
    // 对端链路关闭
    key.cancel();
    sc.close();
    }
    else
    ;
    // 读到0字节,忽略
    }
    }

    }
    
    </span><span style="color: #008000;">//</span><span style="color: #008000;">首先对SocketChannel的connect()操作进行判断,如果连接成功,
    </span><span style="color: #008000;">//</span><span style="color: #008000;">则将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_READ,
    </span><span style="color: #008000;">//</span><span style="color: #008000;">如果没有直接连接成功,则说明服务端没有返回TCP握手应答消息,
    </span><span style="color: #008000;">//</span><span style="color: #008000;">但这并不代表连接失败,我们需要将SocketChannel注册到多路复用器Selector上,
    </span><span style="color: #008000;">//</span><span style="color: #008000;">注册SelectionKey.OP_CONNECT,当服务端返回TCP syn-ack消息后,
    </span><span style="color: #008000;">//</span><span style="color: #008000;">Selector就能够轮询到这个SocketChannel处于连接就绪状态。</span>
    <span style="color: #0000ff;">private</span> <span style="color: #0000ff;">void</span> doConnect() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> IOException {
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答</span>
        <span style="color: #0000ff;">if</span> (socketChannel.connect(<span style="color: #0000ff;">new</span><span style="color: #000000;"> InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } </span><span style="color: #0000ff;">else</span><span style="color: #000000;"> {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }
    
    </span><span style="color: #008000;">//</span><span style="color: #008000;">构造请求消息体,然后对其编码,写入到发送缓冲区中,最后调用SocketChannel的write方法进行发送。
    </span><span style="color: #008000;">//</span><span style="color: #008000;">由于发送是异步的,所以会存在“半包写”问题。最后通过hasRemaining()方法对发送结果进行判断,
    </span><span style="color: #008000;">//</span><span style="color: #008000;">如果缓冲区中的消息全部发送完成,打印"Send order 2 server succeed."</span>
    <span style="color: #0000ff;">private</span> <span style="color: #0000ff;">void</span> doWrite(SocketChannel sc) <span style="color: #0000ff;">throws</span><span style="color: #000000;"> IOException {
        </span><span style="color: #0000ff;">byte</span>[] req = "QUERY TIME ORDER"<span style="color: #000000;">.getBytes();
        ByteBuffer writeBuffer </span>=<span style="color: #000000;"> ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        </span><span style="color: #0000ff;">if</span> (!<span style="color: #000000;">writeBuffer.hasRemaining())
            System.out.println(</span>"Send order 2 server succeed."<span style="color: #000000;">);
    }
    

    }

    复制代码

    我们发现NIO编程难度确实比同步阻塞BIO大很多,我们的NIO例程并没有考虑“半包读”和“半包写”,如果加上这些,代码将会更加复杂。NIO代码既然这么复杂,为什么它的应用却越来越广泛呢,使用NIO编程的优点总结如下。

    (1)客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。

    (2)SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信线程就可以处理其他的链路,不需要同步等待这个链路可用。

    (3)线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降,因此,它非常适合做高性能、高负载的网络服务器。

    JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0,引人注目的是,Java正式提供了异步文件I/O操作,同时提供了与UNIX网络编程事件驱动I/O对应的AIO。

  • 相关阅读:
    并发编程-操作系统简史,多道技术
    python下的excel表格处理 内含面试题
    epoll模型的探索与实践
    nginx搭建静态网站
    面向对象基础
    python+Django 下JWT的使用
    linux的history命令
    携程apollo配置中心Quick Start
    redis哨兵
    redis的主从复制
  • 原文地址:https://www.cnblogs.com/jpfss/p/9876338.html
Copyright © 2011-2022 走看看