zoukankan      html  css  js  c++  java
  • Java NIO 入门

        最近接手了一个java项目,里面用到了nio。自己以前是做.net的,底子不足,所以花时间研究了一下。

      要点简述
      nio有三个最关键的概念,通道(Channel)、选择器(Selector)和缓冲器(ByteBuffer)。
        
        通道(Channel):
        这玩意说白点就是个管子,外面的数据传过来会到这个管子里,你向外发送数据的时候也得塞到这个管子里。与tcp编程相关的主要是两种通道:
         1. 服务器管道(ServerSocketChannel):提供一个服务器端的监听器。
         2. 普通管道(SocketChannel):提供一个一般通道,客户端和服务端互相通讯的时候会用到。

         一般的客户端和服务端程序结构如下:
         

        服务器先建立一个ServerSocketChannel监听,某个客户端创建一个SocketChannel(比如SocketChannel A)去连接服务器。服务器接收到客户端连接请求后创建一个新SocketChannel(比如SocketChannel 1),负责和客户端通讯。
        ServerSocketChannel的创建比较简单,主要工作是要绑定一个端口监听客户端请求,下面的代码演示了一个简单的创建过程:    

    1     //创建服务通道
    2     ServerSocketChannel serverChannel = ServerSocketChannel.open();
    3     serverChannel.configureBlocking(false);
    4     int port = 8888;
    5     InetSocketAddress address = new InetSocketAddress(port);
    6     serverChannel.socket().bind(address);

        第三行配置了阻塞模式,这里设置成非阻塞。使用非阻塞模式时,通道进行读写或accept操作时,将立即返回,不会阻塞等待,返回值确定了调用是否成功。比如调用serverChannel的accept方法时,如果无客户端连接,则立即返回null。读写操作如果失败直接返回0,这个在SocketChannel时会用到。
        SocketChannel有两种情况,服务器端是在调用serverChannel的accept方法动态创建的;客户端则要主动创建,然后调用connect方法连接到服务器。下面是个简单例子:    

    1     //create socket channel
    2     SocketChannel clientChannel = SocketChannel.open();
    3     clientChannel.configureBlocking(false);
    4     if(!clientChannel.connect(new InetSocketAddress("127.0.0.1", 8888)))  {
    5         while(!clientChannel.finishConnect()) {
    6             Thread.sleep(1000);
    7         }
    8     }

        第三行和ServerSocketChannel是一样的,不废话了,关键是第4-7行。因为客户的channel被设置成非阻塞,所以connect方法调用完会立即返回,但是这时候客户端不一定已经完成了连接的创建,所以我们需要用一个循环通过调用finnishConnect方法来检测连接是否完成。

        选择器(Selector): 
        这个可以说是nio里面最重要的一个概念,它是整个nio架子的基础。选择器帮助程序员用事件模型来处理channel间的通讯,我们可以通过轮询调用它的select方法实现一个类似windows消息循环的东西。这样既可以及时处理客户请求,又避免了在服务器端创建大量的线程。在nio之前,服务器客户端编程一般都是为每个客户端创建一个线程,这种方法会创建很多线程,在客户数比较大的时候,服务器压力会很大,线程的创建和上下文切换会吃掉很多服务器资源,服务器要么累死,要么奔溃。在nio中,如果我们用非阻塞轮询select的方法,则只要一个线程就可以同时响应多个客户端。当然如果也可以利用线程池来处理具体的数据计算,主线程只用来轮询派发事件,这样能充分利用服务器的多核或多cpu优势。由于选择器的主要作用是优化服务器性能和增加并发处理数,所以一般情况下客户端并不使用选择器。下面的代码演示了使用选择器的一般步骤:    

     1     //创建选择器
     2     Selector selector = Selector.open();
     3     
     4     //注册相关通道
     5     serverChannel.register(selector, SelectionKey.OP_ACCEPT);
     6     
     7     //轮询选择
     8     while(true) {
     9         int keyCnt = selector.select(SELECT_TIME_OUT);
    10         //nothing to handle, just continue polling.
    11         if(keyCnt == 0) continue;    
    12         Iterator<SelectionKey> itr = selector.selectedKeys().iterator();
    13         while(itr.hasNext()) {
    14             
    15             // TODO do some thing here
    16             
    17             //remove the key
    18             itr.remove();
    19         }
    20     }

        第5行注册了一个服务器通道,这里也可以注册多个通道。不过一般情况下都是服务器通道accept到客户端后将动态创建的SocketChannel注册到选择器中。  

        第9行调用select方法选择出需要处理的key,返回的是需要处理的key的个数。参数是一个超时时间,当没有key需要处理时会等待响应时间。下面的代码则是迭代需要处理的key,然后根据key的事件类型做响应处理。需要注意的是第18行,每个key处理完需要移除,否则下次select调用仍然会选择到。

        缓冲器(ByteBuffer):
       
    这个概念也很重要,其主要作用吧是在channel读写时提供缓冲。说白点就是channel读数据时将把数据塞到一个ByteBuffer里面,向channel里面写数据时也要先在一个ByteBuffer里面准备好数据,然后传递给channel。nio里面增加此类主要是提供更为底层和灵活的数据读写方式。在以前的socket编程时只能用流来读写,封装的太深,很多地方难以优化,内存也难以控制。另外流是阻塞的,这也和nio的非阻塞用法相斥。
        ByteBuffer说白点就是对一个定长数组的包装。里面有几个指针控制读写的位置和范围。数组的长度是capacity,读写的当前位置是position,读写范围的结束地点为limit。position的位置会随着读写向后移动,当到达limit位置时会报出响应的异常。比如读的范围超过了limit,会报BufferUnderflowException,如果写的时候超过了limit,会报BufferOverflowException。position和limit都提供了相应方法设置和获取,使用起来相当灵活。下面的图演示了ByteBuffer的主要结构。    

        一般情况下,我们往channel里面写入数据时,会先调用ByteBuffer的clear方法,将position设置成0,limit设置成capacity,然后开始调用ByteBuffer的各种put方法来向里面塞入数据,然后调用ByteBuffer的flip方法将position重置为0、limit设置为老的position,然后调用channel的write写入数据。
        读数据也一样,直接调用ByteBuffer的clear方法清空buffer,然后调用channel的read方法读入数据,然后调用flip方法重置指针,再调用ByteBuffer的各种get方法来从buffer中提取数据。
        有时候我们需要准确的读写某个范围的数据,这时候get和put方法里面可以传入读写开始位置(index)。get和put方法有各种支持index的版本。
        另外一种情况是读写的都是同一个buffer,而且要同时使用,这时候读完了可以调用ByteBuffer的compact方法,这个方法将会将已经读过的区域后面的数据移动到buffer开头,这样可以腾出更多区域写。不过这个方法频繁使用效率不咋地。
        还有一点需要说明,调用ByteBuffer的wrap或者allocate方法创建的buffer都在jvm堆上面,优点是对buffer操作比较快,但channel读写buffer时需要将ByteBuffer里面的数据复制到内核中的buffer或者从内核中的buffer复制到ByteBuffer,如果读写很频繁,而且数据量比较大,这种复制将很吃资源。解决方法是调用ByteBuffer的allocateDirect方法直接将ByteBuffer创建在内核里,这样channel读写效率将提高,同时节省内存,不过对ByteBuffer的put和get操作将更加耗时,因为java代码需要与内核通讯。

        废话说的有点多,其实nio写代码不是很好些,毕竟是非阻塞模式。最近同时在研究netty,这个功能强大,而且用起来也简单,性能也不错,还支持WebSocket。以后有时间多研究一下。

        简单远程调用的例子

        下面是个简答的例子,实现了一个简单的远程调,类似java的RMI。调用和简单,客户端像服务器发送一个操作(字符串),服务器返回一个结果(字符串)。服务器也只用了单线程。如果改进的话,可以在请求返回时用json,服务器也可以创建一个线程池处理具体逻辑,select只负责轮询。
        总共三个类,Consts--公用常量,RPCClient--客户端,RPCServer--服务器。
        

    Consts.java
     1 package org.alala.nio;
     2 
     3 public final class Consts {
     4     
     5     /**
     6      * 远程传输字符串编码
     7      */
     8     public static final String CODE = "utf-8";
     9     
    10     /**
    11      * int的字节数
    12      */
    13     public static final int INT_BYTES = Integer.SIZE / 8;
    14     
    15     /**
    16      * 简单回显
    17      */
    18     public static final String ECHO = "echo";
    19     
    20     /**
    21      * 请求服务器时间
    22      */
    23     public static final String SERVER_TIME = "server_time";
    24 }
    RPCClient.java
      1 package org.alala.nio;
      2 
      3 import java.io.IOException;
      4 import java.net.InetSocketAddress;
      5 import java.nio.ByteBuffer;
      6 import java.nio.channels.SocketChannel;
      7 
      8 public class RPCClient {
      9 
     10     private static final String CODE = "utf-8";
     11 
     12     /**
     13      * @param args
     14      */
     15     public static void main(String[] args) {
     16 
     17         try {
     18             
     19             //create socket channel
     20             SocketChannel clientChannel = SocketChannel.open();
     21             clientChannel.configureBlocking(false);
     22             if(!clientChannel.connect(new InetSocketAddress("127.0.0.1", 8888)))  {
     23                 while(!clientChannel.finishConnect()) {
     24                     Thread.sleep(1000);
     25                 }
     26             }
     27 
     28             //执行远程调用
     29             String result = CallService(Consts.ECHO, clientChannel);
     30             System.out.println("Get ECHO result:" + result);
     31             result = CallService(Consts.SERVER_TIME, clientChannel);
     32             System.out.println("Get SERVER_TIME result:" + result);
     33             
     34             //关闭通道
     35             clientChannel.close();
     36         } catch (Exception e) {
     37             // TODO Auto-generated catch block
     38             e.printStackTrace();
     39         }
     40     }
     41 
     42     private static String CallService(String op, SocketChannel clientChannel) throws IOException, InterruptedException {        
     43         ClientData data = new ClientData();
     44         byte[] bytes = op.getBytes(CODE);
     45         data.buf.putInt(bytes.length);
     46         data.buf.put(bytes);
     47         data.buf.flip();
     48         int readSize = 0;
     49         int resultDataSize = 0;
     50         boolean writable = true;// 是否处于发送请求阶段
     51 
     52         final int TIME_OUT = 60000;//超时时间一分钟
     53         long timeStart = System.currentTimeMillis();
     54         while( System.currentTimeMillis() - timeStart < TIME_OUT) {        
     55 
     56             //写数据
     57             if(writable && data.buf.hasRemaining()) {
     58                 int writeCnt = clientChannel.write(data.buf);
     59                 if(writeCnt == -1) {
     60                     System.err.println("There are write errors, close the channel.");
     61                     clientChannel.close();
     62                  } 
     63                 continue;
     64             } else {
     65                 writable = false;
     66                 data.buf.clear();
     67             }
     68 
     69             //读数据
     70             int readCnt = clientChannel.read(data.buf);
     71             if(readCnt == -1) {
     72                 System.err.println("There are read errors, close the channel.");
     73                 clientChannel.close();
     74             } else if(readCnt > 0) {
     75                 readSize += readCnt;
     76                 if(resultDataSize == 0 && readSize >= Consts.INT_BYTES) {
     77                     //获得结果字符串大小
     78                     resultDataSize = data.buf.getInt(0);
     79                 } 
     80                 if(resultDataSize > 0 && readSize >= Consts.INT_BYTES + resultDataSize) {
     81                     //获得结果内容
     82                     byte[] resultBytes = new byte[resultDataSize];
     83                     data.buf.limit(data.buf.position());
     84                     data.buf.position(Consts.INT_BYTES);
     85                     data.buf.get(resultBytes);
     86                     //返回结果
     87                     return new String(resultBytes, CODE);
     88                 }
     89             } else if(readCnt == 0) {
     90                 Thread.sleep(500);
     91             }
     92         }    
     93 
     94         System.out.println("Call service time out, the operation is " + op);
     95         return "";
     96     }
     97 
     98     static class ClientData {
     99         public ByteBuffer buf = ByteBuffer.allocate(1024);
    100     }
    101 
    102     static class ClientHandler {
    103 
    104     }
    105 
    106 }
    RPCServer.java
      1 package org.alala.nio;
      2 
      3 import java.io.IOException;
      4 import java.net.InetSocketAddress;
      5 import java.nio.ByteBuffer;
      6 import java.nio.channels.SelectionKey;
      7 import java.nio.channels.Selector;
      8 import java.nio.channels.ServerSocketChannel;
      9 import java.nio.channels.SocketChannel;
     10 import java.util.Date;
     11 import java.util.Iterator;
     12 import java.util.logging.Logger;
     13 
     14 public class RPCServer {
     15 
     16     private static final int SELECT_TIME_OUT = 1000;
     17     private static Logger log = Logger.getLogger("TCPServer");
     18 
     19     /**
     20      * @param args
     21      */
     22     public static void main(String[] args) {
     23         try {
     24             //创建服务通道
     25             ServerSocketChannel serverChannel = ServerSocketChannel.open();
     26             serverChannel.configureBlocking(false);
     27             int port = 8888;
     28             InetSocketAddress address = new InetSocketAddress(port);
     29             serverChannel.socket().bind(address);
     30 
     31             //创建选择器并注册
     32             Selector selector = Selector.open();
     33             serverChannel.register(selector, SelectionKey.OP_ACCEPT);
     34 
     35             //创建处理器
     36             ServerHandler handler = new ServerHandler();
     37 
     38             log.info("Server starts polling. Port:" + port);
     39 
     40             //轮询选择
     41             while(true) {
     42                 int keyCnt = selector.select(SELECT_TIME_OUT);
     43                 log.info("One selecting, selected keys:" + keyCnt);
     44                 //nothing to handle, just continue polling.
     45                 if(keyCnt == 0) continue;                
     46 
     47                 Iterator<SelectionKey> itr = selector.selectedKeys().iterator();
     48                 while(itr.hasNext()) {
     49                     //获取下一个key
     50                     SelectionKey key = itr.next();
     51 
     52                     try {
     53                         //:处理其他各种情况
     54                         if(key.isAcceptable()) {
     55                             handler.handleAccept(key);
     56                         }
     57                         if(key.isReadable()) {
     58                             handler.handleRead(key);
     59                         }
     60                         if(key.isWritable()) {
     61                             handler.handleWrite(key);
     62                         }
     63                     } catch (Exception exp) {
     64                         log.warning("There are errors:" +exp.getMessage() );
     65                         try {
     66                             key.channel().close();
     67                         } catch (Exception e) {
     68                             log.warning("There are errors when closing the channel." + e.getMessage());
     69                         }
     70                     }
     71 
     72 
     73                     //处理过必须移除
     74                     itr.remove();
     75                 }
     76             }
     77 
     78         } catch (IOException e) {
     79             // TODO Auto-generated catch block
     80             e.printStackTrace();
     81         }
     82     }
     83 
     84     /**
     85      *  客户端上下文
     86      * @author shun.li
     87      *
     88      */
     89     static class ClientContext {
     90         //请求操作
     91         public String op;
     92         //返回结果
     93         public String result;
     94         public ByteBuffer buf = ByteBuffer.allocate(1024) ;
     95         public int readSize = 0;
     96         public int writeSize = 0;
     97         public int readDataLen = 0;//读取数据大小
     98         public int writeDataLen = 0;//写入数据大小
     99 
    100         public synchronized void clearRead() {
    101             buf.clear();
    102             readSize = 0;
    103             readDataLen = 0;
    104         }
    105 
    106         public synchronized void clearWrite() {
    107             buf.clear();
    108             writeSize = 0;
    109             writeDataLen = 0;
    110         }
    111     }
    112 
    113     /** 
    114      * 具体的处理器
    115      * @author shun.li
    116      *
    117      */
    118     static class ServerHandler {
    119 
    120         private Logger log = Logger.getLogger("TCPHandler");
    121         private static final String CODE = "utf-8";
    122 
    123         public void handleAccept(SelectionKey key) throws IOException {            
    124             ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
    125             SocketChannel clientChannel = serverChannel.accept();
    126             clientChannel.configureBlocking(false);            
    127             log.info("A client is accepted.The address:"
    128                     + clientChannel.socket().getInetAddress().getHostAddress());
    129             clientChannel.register(key.selector(), SelectionKey.OP_READ, new ClientContext());
    130         }
    131 
    132         public void handleRead(SelectionKey key) throws IOException {
    133             SocketChannel socketChannel = (SocketChannel)key.channel();
    134             ClientContext client = (ClientContext)key.attachment();
    135             int readCnt = socketChannel.read(client.buf);
    136             log.info("Read data. Address:"
    137                     + socketChannel.socket().getInetAddress().getHostAddress()
    138                     + ".Read count:" + readCnt);
    139             if(readCnt == -1) {
    140                 //there are some errors
    141                 log.warning("A client read fail, the channel would close.The address:"
    142                         + socketChannel.socket().getInetAddress().getHostAddress());
    143                 socketChannel.close();
    144             } else if(readCnt > 0) {
    145                 //update the read size
    146                 client.readSize += readCnt;
    147 
    148                 if(client.readSize >= Consts.INT_BYTES && client.readDataLen == 0) {
    149                     //read the data length
    150                     client.readDataLen = client.buf.getInt(0);
    151                 } 
    152                 if (client.readDataLen > 0 && client.readSize >= Consts.INT_BYTES + client.readDataLen) {
    153                     //read the real data
    154                     byte[] data = new byte[client.readDataLen];
    155                     client.buf.limit(client.buf.position());
    156                     client.buf.position(Consts.INT_BYTES);
    157                     client.buf.get(data);
    158                     client.op = new String(data, CODE);
    159 
    160                     //clear read status
    161                     client.clearRead();
    162                     //the read is finised, server can send the request data to the client.
    163                     key.interestOps(SelectionKey.OP_WRITE);
    164                     return;
    165                 }
    166                 if(client.readDataLen > 0 && client.readSize < Consts.INT_BYTES + client.readDataLen) {
    167                     //reading is not finnished, continue
    168                     key.interestOps(SelectionKey.OP_READ);
    169                 }
    170             }
    171         }
    172 
    173         public void handleWrite(SelectionKey key) throws IOException {
    174             ClientContext client = (ClientContext)key.attachment();            
    175             SocketChannel socketChannel = (SocketChannel)key.channel();
    176 
    177             if(client.writeSize == 0) {
    178                 //一开始写时根据命令计算返回结果
    179                 String op = client.op;
    180                 if(op.equals(Consts.ECHO)) {                
    181                     String clientName = socketChannel.socket().getInetAddress().getHostAddress();
    182                     client.result = clientName;
    183                 } else if(op.equals(Consts.SERVER_TIME)) {
    184                     String serverTime = new Date().toString();
    185                     client.result = serverTime;    
    186                 }
    187                 //构建写入数据
    188                 byte[] writeData = client.result.getBytes(CODE);
    189                 client.buf.putInt(writeData.length);
    190                 client.buf.put(writeData);
    191                 client.buf.flip();
    192                 client.writeDataLen = writeData.length;
    193             }
    194 
    195             int writeCnt = socketChannel.write(client.buf);
    196             log.info("Write data. Address:"
    197                     + socketChannel.socket().getInetAddress().getHostAddress()
    198                     + ".Write count:" + writeCnt);
    199             if(writeCnt == -1) {
    200                 //无法写入,处理错误
    201                 log.warning("Write fail, would close the channel. Client address:" 
    202                         + socketChannel.socket().getInetAddress().getHostAddress());
    203                 socketChannel.close();
    204             } else {
    205                 client.writeSize += writeCnt;
    206                 if(client.writeSize >=client.writeDataLen) {
    207                     client.clearWrite();
    208                     //写入完毕,可以重新读取客户端请求
    209                     key.interestOps(SelectionKey.OP_READ);
    210                 } else {
    211                     //没写完,继续写
    212                     key.interestOps(SelectionKey.OP_WRITE);
    213                 }
    214             }
    215         }
    216     }
    217 
    218 }



        

  • 相关阅读:
    Map 中的EntrySet() ,Map的遍历
    Struts2拦截器原理
    基于 <tx> 和 <aop> 命名空间的声明式事务管理
    Spring AOP Example 文件下载:
    Struts2拦截器
    好多开发文档——官网打不开的福音
    threejs样例
    bootCDN,用了都说好
    react拷贝index.html很恶心之解决办法
    网络爬虫好
  • 原文地址:https://www.cnblogs.com/alala666888/p/2705720.html
Copyright © 2011-2022 走看看