zoukankan      html  css  js  c++  java
  • NIO源码阅读

      自己对着源码敲一遍练习,写上注释。发现NIO编程难度好高啊。。虽然很复杂,但是NIO编程的有点还是很多:

      1、客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECTION等待后续结果,不需要像BIO的客户端一样被同步阻塞。

      2、SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信模型就可以处理其他的链路,不需要同步等待这个链路可用。

      3、线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,没有连接句柄的限制,那么Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降。所以它非常适合做高性能、高负载的网络服务器。

      TimeClient:

     1 package nio;
     2 
     3 public class TimeClient {
     4     public static void main(String args[]){
     5         int port = 8080;
     6         if(args != null && args.length > 0){
     7             try{
     8                 port = Integer.valueOf(args[0]);
     9             }catch(NumberFormatException e){
    10                 //采用默认值
    11             }
    12         }
    13         new Thread(new TimeClientHandle("120.0.0.1",port),"TimeClient-001").start();
    14     }
    15 }

    TimeClientHandler:

      1 package nio;
      2 
      3 import java.io.IOException;
      4 import java.net.InetSocketAddress;
      5 import java.nio.ByteBuffer;
      6 import java.nio.channels.SelectionKey;
      7 import java.nio.channels.Selector;
      8 import java.nio.channels.SocketChannel;
      9 import java.util.Iterator;
     10 import java.util.Set;
     11 
     12 public class TimeClientHandle implements Runnable{
     13     private String host;
     14     private int port;
     15     private Selector selector;
     16     private SocketChannel socketChannel;
     17     private volatile boolean stop;
     18     
     19     public TimeClientHandle(String host,int port){
     20         this.host = host == null ? "127.0.0.1" : host;
     21         this.port = port;
     22         try{
     23             selector = Selector.open();
     24             socketChannel = SocketChannel.open();
     25             socketChannel.configureBlocking(false);
     26         }catch(IOException e){
     27             e.printStackTrace();
     28             System.exit(1);
     29         }
     30     }
     31     
     32     
     33     public void run() {
     34         //发送请求连接
     35         try{
     36             doConnect();
     37         }catch(IOException e){
     38             e.printStackTrace();
     39             System.exit(1);
     40         }
     41         while(!stop){
     42             try{
     43                 selector.select(1000);
     44                 Set<SelectionKey> selectedKeys = selector.selectedKeys();
     45                 Iterator<SelectionKey> it = selectedKeys.iterator();
     46                 SelectionKey key = null;
     47                 //当有就绪的Channel时,执行handleInput(key)方法
     48                 while(it.hasNext()){
     49                     key = it.next();
     50                     it.remove();
     51                     try{
     52                         handleInput(key);
     53                     }catch(Exception e){
     54                         if(key != null){
     55                         key.cancel();
     56                             if(key.channel() != null){
     57                                 key.channel().close();
     58                             }
     59                         }
     60                     }
     61                 }
     62             }catch(Exception e){
     63                 e.printStackTrace();
     64                 System.exit(1);
     65             }
     66         }
     67         
     68         //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
     69         if(selector != null){
     70             try{
     71                 selector.close();
     72             }catch(IOException e){
     73                 e.printStackTrace();
     74             }
     75         }
     76 
     77     }
     78     
     79     
     80     private void handleInput(SelectionKey key) throws IOException{
     81         if(key.isValid()){
     82             SocketChannel sc = (SocketChannel)key.channel();
     83             //判断是否连接成功
     84             if(key.isConnectable()){
     85                 if(sc.finishConnect()){
     86                     sc.register(selector, SelectionKey.OP_READ);
     87                 }else{
     88                     System.exit(1);
     89                 }
     90             }
     91             
     92             if(key.isReadable()){
     93                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
     94                 int readBytes = sc.read(readBuffer);
     95                 if(readBytes > 0){
     96                     readBuffer.flip();
     97                         byte[] bytes = new byte[readBuffer.remaining()];
     98                         readBuffer.get(bytes);
     99                         String body = new String(bytes,"UTF-8");
    100                         System.out.println("Now is :" + body);
    101                         this.stop = true;
    102                 }else if(readBytes < 0){
    103                     //对端链路关闭
    104                     key.cancel();
    105                     sc.close();
    106                 }else{
    107                     ; //读到0字节,忽略
    108                 }
    109             }
    110         }
    111     }
    112     
    113     private void doConnect() throws IOException{
    114         //如果直接连接成功,则注册到多路复用器上,发送请求信息,读应答
    115         if(socketChannel.connect(new InetSocketAddress(host,port))){
    116             socketChannel.register(selector, SelectionKey.OP_READ);
    117             doWrite(socketChannel);
    118         }else{
    119             //说明服务器没有返回TCP祸首应答消息,但这并不代表连接失败,当服务器返回TCP syn-ack消息后,Selector就能够轮训这个SocketChannel处于连接就绪状态
    120             socketChannel.register(selector, SelectionKey.OP_CONNECT);
    121         }
    122     }
    123     
    124     private void doWrite(SocketChannel sc) throws IOException{
    125         byte[] req = "QUERY TIME ORDER".getBytes();
    126         ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
    127         writeBuffer.put(req);
    128         writeBuffer.flip();
    129         sc.write(writeBuffer);
    130         if(!writeBuffer.hasRemaining()){
    131             System.out.println("Send order 2 server succeed.");
    132         }
    133     }
    134 
    135 }

    TimeServer:

     1 package nio;
     2 
     3 import java.io.IOException;
     4 
     5 public class TimeServer {
     6     
     7     public static void main(String[] args) throws IOException{
     8         int port = 8080;
     9         if(args != null && args.length >0){
    10             try{
    11                 port = Integer.valueOf(args[0]);
    12             }catch(NumberFormatException e){
    13                 //采用默认值
    14             }
    15         }
    16         //多路复用类,是一个独立的线程,负责轮训多路复用器Selctor,处理多个客户端的并发接入。
    17         MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
    18         new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
    19         }
    20 }

    MultiplexerTimeServer:

      1 package nio;
      2 
      3 import java.io.IOException;
      4 import java.net.InetSocketAddress;
      5 import java.nio.ByteBuffer;
      6 import java.nio.channels.SelectionKey;
      7 import java.nio.channels.Selector;
      8 import java.nio.channels.ServerSocketChannel;
      9 import java.nio.channels.SocketChannel;
     10 import java.util.Iterator;
     11 import java.util.Set;
     12 
     13 public class MultiplexerTimeServer implements Runnable {
     14     
     15     private Selector selector;
     16     
     17     private ServerSocketChannel servChannel;
     18     
     19     private volatile boolean stop;
     20 
     21     public MultiplexerTimeServer(int port){
     22         try{
     23             
     24             selector = Selector.open();
     25             servChannel.configureBlocking(false);
     26             //将ServerSocketChannel 设置为异步非阻塞,backlog设置为1024 
     27             servChannel.socket().bind(new InetSocketAddress(port),1024);
     28             //将ServerSocket Channel注册到Selector,监听SelectionKey.OP_ACCEPT操作位,如果初始化失败,则退出
     29             servChannel.register(selector,SelectionKey.OP_ACCEPT);
     30             System.out.println("The time server is start in port:" + port);
     31         }catch(IOException e){
     32             e.printStackTrace();
     33             System.exit(1);
     34         }
     35     }
     36     
     37     public void stop(){
     38         this.stop = true;
     39     }
     40     
     41     public void run() {
     42         while(!stop){
     43             try{
     44                 //遍历时间设置1秒,每隔一秒唤醒一次,当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合
     45                 selector.select(1000);
     46                 Set<SelectionKey> selectedKeys = selector.selectedKeys();
     47                 Iterator<SelectionKey> it = selectedKeys.iterator();
     48                 SelectionKey key = null;
     49                 //通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作
     50                 while(it.hasNext()){
     51                     key = it.next();
     52                     it.remove();
     53                     try{
     54                         handleInput(key);
     55                     }catch(Exception e){
     56                         if(key != null){
     57                             key.cancel();
     58                             if(key.channel() != null){
     59                                 key.channel().close();
     60                             }
     61                         }
     62                     }
     63                 }
     64             }catch(Throwable t){
     65                 t.printStackTrace();
     66             }
     67         }
     68         
     69         //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
     70         if(selector != null){
     71             try{
     72                 selector.close();
     73             }catch(IOException e){
     74                 e.printStackTrace();
     75             }
     76         }
     77     }
     78     
     79     //处理新接入的请求消息
     80     private void handleInput(SelectionKey key) throws IOException{
     81         if(key.isValid()){
     82             
     83             //根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作相当于
     84             //完成了TCP的三次握手,TCP物理链路正式建立
     85             if(key.isAcceptable()){
     86                 ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
     87                 SocketChannel sc = ssc.accept();
     88                 sc.configureBlocking(false);
     89                 //Add the new connection tothe selector
     90                 sc.register(selector, SelectionKey.OP_READ);
     91             }
     92             
     93             if(key.isReadable()){
     94                 //Read the data
     95                 
     96                 SocketChannel sc = (SocketChannel)key.channel();
     97                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
     98                 int readBytes = sc.read(readBuffer);
     99                 if(readBytes > 0){
    100                     //将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作
    101                     readBuffer.flip();
    102                     byte[] bytes = new byte[readBuffer.remaining()];
    103                     readBuffer.get(bytes);
    104                     String body = new String(bytes,"UTF-8");
    105                     System.out.println("The time server receive order: + body");
    106                     String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
    107                     doWrite(sc,currentTime);
    108                 }else if(readBytes < 0){
    109                     //对端链路关闭
    110                     key.cancel();
    111                     sc.close();
    112                 }else{
    113                     ; //读到0字节,忽略
    114                 }
    115             }
    116         }
    117     }
    118     
    119     private void doWrite(SocketChannel channel,String response) throws IOException{
    120         if(response != null && response.trim().length() >0){
    121             byte[] bytes = response.getBytes();
    122             ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
    123             writeBuffer.put(bytes);
    124             writeBuffer.flip();
    125             channel.write(writeBuffer);
    126         }
    127     }
    128 }
  • 相关阅读:
    机器人搬重物(BFS)
    POJ1386Play on Words(欧拉回路)
    轰炸
    杂务(动态规划)
    Prism框架的Regions使用
    MVVM(使用Prism框架)开发WPF
    WPF显示数据库内容
    UI案例
    VS的快捷操作
    谷歌浏览器插件安装、VIP看视频、解除百度网盘限速
  • 原文地址:https://www.cnblogs.com/yangsy0915/p/6136018.html
Copyright © 2011-2022 走看看