zoukankan      html  css  js  c++  java
  • BIO、NIO、AIO系列一:NIO

    一、几个基本概念

    1.同步、异步、阻塞、非阻塞

    同步:用户触发IO操作,你发起了请求就得等着对方给你返回结果,你不能走,针对调用方的,你发起了请求你等

    异步:触发触发了IO操作,即发起了请求以后可以做自己的事,等处理完以后会给你返回处理完成的标志,针对调用方的,你发起了请求你不等

    阻塞:你调用我,我试图对文件进行读写的时候发现没有可读写的文件,我的程序就会进入等待状态,等可以读写了,我处理完给你返回结果,这里的等待和同步的等待有很大的区别,针对服务提供方的,你调用我我发现服务不可用我等

    非阻塞:你调用我,我试图对文件读写的时候发现没有读写的文件,不等待直接返回,等我发现可以读写文件处理完了再给你返回成功标志,针对服务提供方的,你调用我我不等,我处理完了给你返回结果

    2、Java对BIO、NIO、AIO的支持:
    Java BIO : 同步阻塞:你调用我,你等待我给你返回结果,我发现没有可读写的资源我也等待,两个一起等,JDK1.4以前的唯一选择,适用于数目比较少并且比较固定的架构,对服务器资源要求比较高,大家都在等资源,等服务提供方处理完了再给你返回结果
    Java NIO :同步非阻塞: 你调用我,你等待我给你返回结果,我发现没有可以读写的资源,我不等待先直接返回,等我发现有可以读写的资源以后处理完给你返回结果,适用于连接数目多且连接时间比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
    Java AIO(NIO.2) : 异步非阻塞:你调用我,你不等待继续做自己的事,我发现没有可以读写的资源,我也不等待继续做我自己的事,等有可以读写的资源的时候我处理完给你返回结果,适用于连接数目多且连接时间比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
    3、BIO、NIO、AIO适用场景分析:
    BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。
    NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
    AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
     
    另外,I/O属于底层操作,需要操作系统支持,并发也需要操作系统的支持,所以性能方面不同操作系统差异会比较明显。

    二、NIO基础

    1、传统BIO模型-InputStream、OutputStream

    传统BIO是一种同步的阻塞IO,IO在进行读写时,该线程将被阻塞,线程无法进行其它操作。

    IO流在读取时,会阻塞。直到发生以下情况:1、有数据可以读取。2、数据读取完成。3、发生异常。

    服务端:

    BioServer.java

     1 import java.io.IOException;
     2 import java.net.ServerSocket;
     3 import java.net.Socket;
     4 
     5 
     6 public class BioServer {
     7     public static void main(String[] args) {
     8         int port=8080; //服务端默认端口
     9         if(args != null && args.length>0){
    10             try {
    11                 port = Integer.valueOf(args[0]);
    12             } catch (NumberFormatException e) {
    13             }
    14         }
    15         ServerSocket server = null;
    16         try {
    17             server = new ServerSocket(port);
    18             System.out.println("启动了服务端,端口:"+port);
    19             Socket socket = null;
    20             while(true){
    21                 socket = server.accept();//阻塞等待客户端连接
    22                 new Thread(new BioServerHandler(socket)).start();
    23             }
    24         } catch (Exception e) {
    25             e.printStackTrace();
    26         } finally {
    27             if(server!=null){
    28                 System.out.println("关闭了服务.");
    29                 try {
    30                     server.close();
    31                     server = null;
    32                 } catch (IOException e) {
    33                     e.printStackTrace();
    34                 }
    35             }
    36         }
    37     }
    38 }

    BioServerHandler.java

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.Socket;
    
    public class BioServerHandler implements Runnable {
    
        private Socket socket;
        public BioServerHandler(Socket socket){
            this.socket = socket;
        }
        
        @Override
        public void run() {
            BufferedReader in = null;
            try {
                in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
                String body = null;
                while(true){
                    body = in.readLine(); //阻塞等待数据可以被读取
                    if(body == null){
                        break;
                    }
                    System.out.println("服务器接收到指令:"+body);
                }
            } catch (Exception e) {
                if(in != null){
                    try {
                        in.close();
                        in = null;//
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
                if(socket != null){
                    try {
                        socket.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                    this.socket = null;
                }
            }
        }
    
    }

    客户端:

    BioServerClient.java

     1 import java.io.IOException;
     2 import java.io.PrintWriter;
     3 import java.net.Socket;
     4 
     5 public class BioServerClient {
     6 
     7     public static void main(String[] args) {
     8         int port=8080; //服务端默认端口
     9         if(args != null && args.length>0){
    10             try {
    11                 port = Integer.valueOf(args[0]);
    12             } catch (NumberFormatException e) {
    13             }
    14         }
    15         Socket socket = null;
    16         PrintWriter out = null;
    17         try {
    18             socket = new Socket("127.0.0.1", port);
    19             out = new PrintWriter(socket.getOutputStream(), true);
    20             out.println("9527");
    21             System.out.println("客户端向服务端发送了指令");
    22         } catch (Exception e) {
    23             e.printStackTrace();
    24         } finally {
    25             if(out !=null){
    26                 out.close();
    27                 out = null;
    28             }
    29             if(socket != null){
    30                 try {
    31                     socket.close();
    32                 } catch (IOException e) {
    33                     e.printStackTrace();
    34                 }
    35                 socket = null;
    36             }
    37         }
    38     }
    39 }

    2、伪异步IO模型

    以传统BIO模型为基础,通过线程池的方式维护所有的IO线程,实现相对高效的线程开销及管理。

     服务端:

    TimeServer.java

     1 import java.io.IOException;
     2 import java.net.ServerSocket;
     3 import java.net.Socket;
     4 public class TimeServer {
     5     public static void main(String[] args) {
     6         int port=8080; //服务端默认端口
     7         ServerSocket server = null;
     8         try {
     9             server = new ServerSocket(port);
    10             System.out.println("The time server is start in port:"+port);
    11             Socket socket = null;
    12             //通过线程池的方式维护所有的IO线程,实现相对高效的线程开销及管理
    13             TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(50, 10000);
    14             
    15             while(true){
    16                 socket = server.accept();
    17 //                new Thread(new TimeServerHandler(socket)).start();
    18                 singleExecutor.execute(new TimeServerHandler(socket));
    19             }
    20         } catch (Exception e) {
    21             e.printStackTrace();
    22         } finally {
    23             if(server!=null){
    24                 System.out.println("The time server is close.");
    25                 try {
    26                     server.close();
    27                     server = null;
    28                 } catch (IOException e) {
    29                     e.printStackTrace();
    30                 }
    31             }
    32         }
    33     }
    34 }

    TimeServerHandler.java

     1 import java.io.BufferedReader;
     2 import java.io.IOException;
     3 import java.io.InputStreamReader;
     4 import java.io.PrintWriter;
     5 import java.net.Socket;
     6 import java.util.Date;
     7 
     8 
     9 public class TimeServerHandler implements Runnable {
    10 
    11     private Socket socket;
    12     public TimeServerHandler(Socket socket){
    13         this.socket = socket;
    14     }
    15     
    16     @Override
    17     public void run() {
    18         BufferedReader in = null;
    19         PrintWriter out = null;
    20         try {
    21             in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
    22             out = new PrintWriter(this.socket.getOutputStream(), true);
    23             String currentTime = null;
    24             String body = null;
    25             while(true){
    26                 body = in.readLine();
    27                 if(body == null){
    28                     break;
    29                 }
    30                 System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order:"+body);
    31                 currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
    32                 out.println(currentTime);
    33             }
    34         } catch (Exception e) {
    35             if(in != null){
    36                 try {
    37                     in.close();
    38                     in = null;//
    39                 } catch (IOException e1) {
    40                     e1.printStackTrace();
    41                 }
    42             }
    43             if(out != null){
    44                 try {
    45                     out.close();
    46                     out = null;
    47                 } catch (Exception e1) {
    48                     e1.printStackTrace();
    49                 }
    50             }
    51             if(socket != null){
    52                 try {
    53                     socket.close();
    54                 } catch (IOException e1) {
    55                     e1.printStackTrace();
    56                 }
    57                 this.socket = null;
    58             }
    59         }
    60     }
    61 
    62 }

     服务端线程池TimeServerHandlerExecutePool.java

     1 import java.util.concurrent.ArrayBlockingQueue;
     2 import java.util.concurrent.ExecutorService;
     3 import java.util.concurrent.ThreadPoolExecutor;
     4 import java.util.concurrent.TimeUnit;
     5 
     6 public class TimeServerHandlerExecutePool {
     7 
     8     private ExecutorService executor;
     9     
    10     public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
    11         executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120l, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
    12     }
    13 
    14     public void execute(Runnable task) {
    15         executor.execute(task);
    16     }
    17 
    18 }

     客户端:

    TimeServerClient.java

     1 import java.io.BufferedReader;
     2 import java.io.IOException;
     3 import java.io.InputStreamReader;
     4 import java.io.PrintWriter;
     5 import java.net.Socket;
     6 
     7 public class TimeServerClient {
     8 
     9     public static void main(String[] args) {
    10         int port=8080; //服务端默认端口
    11         Socket socket = null;
    12         BufferedReader in = null;
    13         PrintWriter out = null;
    14         try {
    15             socket = new Socket("127.0.0.1", port);
    16             in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    17             out = new PrintWriter(socket.getOutputStream(), true);
    18             out.println("QUERY TIME ORDER");
    19             System.out.println("Send order to server succeed.");
    20             String resp = in.readLine();
    21             System.out.println("Now is : "+resp);
    22         } catch (Exception e) {
    23             e.printStackTrace();
    24         } finally {
    25             if(out !=null){
    26                 out.close();
    27                 out = null;
    28             }
    29             if(in != null){
    30                 try {
    31                     in.close();
    32                 } catch (IOException e) {
    33                     e.printStackTrace();
    34                 }
    35             }
    36             if(socket != null){
    37                 try {
    38                     socket.close();
    39                 } catch (IOException e) {
    40                     e.printStackTrace();
    41                 }
    42                 socket = null;
    43             }
    44         }
    45     }
    46 }

    3NIO模型

    NIOJDK1.4)模型是一种同步非阻塞IO,主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(多路复用器)。传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(多路复用器)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。

    NIO和传统IO(一下简称IO)之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的

    IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。 NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

    NIO优点:

    1、通过Channel注册到Selector上的状态来实现一种客户端与服务端的通信。

    2、Channel中数据的读取是通过Buffer , 一种非阻塞的读取方式。

    3Selector 多路复用器  单线程模型,  线程的资源开销相对比较小。

    NIO缺点

    1. API使用复杂。

    2. 需要具备一些多线程编码能力

    3. 断线重连问题比较严重

    4. NIO还有一些BUG

    Channel(通道)

    传统IO操作对read()或write()方法的调用,可能会因为没有数据可读/可写而阻塞,直到有数据响应。也就是说读写数据的IO调用,可能会无限期的阻塞等待,效率依赖网络传输的速度。最重要的是在调用一个方法前,无法知道是否会被阻塞。

    NIO的Channel抽象了一个重要特征就是可以通过配置它的阻塞行为,来实现非阻塞式的通道。

    Channel是一个双向通道,与传统IO操作只允许单向的读写不同的是,NIO的Channel允许在一个通道上进行读和写的操作。

    FileChannel:文件

    SocketChannel:

    ServerSocketChannel:

    DatagramChannel: UDP

    Buffer(缓冲区)

    Bufer顾名思义,它是一个缓冲区,实际上是一个容器,一个连续数组。Channel提供从文件、网络读取数据的渠道,但是读写的数据都必须经过Buffer。   

    Buffer缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该模块内存。为了理解Buffer的工作原理,需要熟悉它的三个属性capacity、position和limit。

    属性capacity、position和limit

    position和limit的含义取决于Buffer处在读模式还是写模式。不管Buffer处在什么模式,capacity的含义总是一样的。见下图:

     

    capacity:作为一个内存块,Buffer有固定的大小值,也叫作“capacity”,只能往其中写入capacity个byte、long、char等类型。一旦Buffer满了,需要将其清空(通过读数据或者清楚数据)才能继续写数据。

    position:当你写数据到Buffer中时,position表示当前的位置。初始position值为0,当写入一个字节数据到Buffer中后,position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity-1。当读取数据时,也是从某个特定位置读,将Buffer从写模式切换到读模式,position会被重置为0。当从Buffer的position处读取一个字节数据后,position向前移动到下一个可读的位置。

    limit:在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。 写模式下,limit等于Buffer的capacity。当切换Buffer到读模式时, limit表示你最多能读到多少数据。因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。换句话说,你能读到之前写入的所有数据(limit被设置成已写数据的数量,这个值在写模式下就是position)

    Buffer的分配:对Buffer对象的操作必须首先进行分配,Buffer提供一个allocate(int capacity)方法分配一个指定字节大小的对象。

    Buffer中写数据:写数据到Buffer中有两种方式:

    1、从channel写到Buffer

    int bytes = channel.read(buf); //将channel中的数据读取到buf中

    2、通过Buffer的put()方法写到Buffer

    buf.put(byte); //将数据通过put()方法写入到buf中

    flip()方法:将Buffer从写模式切换到读模式,调用flip()方法会将position设置为0,并将limit设置为之前的position的值。

    Buffer中读数据:从Buffer中读数据有两种方式:

    1、从Buffer读取数据到Channel

    int bytes = channel.write(buf); //将buf中的数据读取到channel中

    2、通过Buffer的get()方法读取数据

    byte bt = buf.get(); //从buf中读取一个byte

    rewind()方法Buffer.rewind()方法将position设置为0,使得可以重读Buffer中的所有数据,limit保持不变。Buffer中的数据,读取完成后,依然保存在Buffer中,可以重复读取

    clear()与compact()方法:一旦读完Buffer中的数据,需要让Buffer准备好再次被写入,可以通过clear()或compact()方法完成。如果调用的是clear()方法,position将被设置为0,limit设置为capacity的值。但是Buffer并未被清空,只是通过这些标记告诉我们可以从哪里开始往Buffer中写入多少数据。如果Buffer中还有一些未读的数据,调用clear()方法将被"遗忘 "。compact()方法将所有未读的数据拷贝到Buffer起始处,然后将position设置到最后一个未读元素的后面,limit属性依然设置为capacity。可以使得Buffer中的未读数据还可以在后续中被使用。

    mark()与reset()方法:通过调用Buffer.mark()方法可以标记一个特定的position,之后可以通过调用Buffer.reset()恢复到这个position上。

    Selector(多路复用器)

    Selector与Channel是相互配合使用的,将Channel注册在Selector上之后,才可以正确的使用Selector,但此时Channel必须为非阻塞模式。Selector可以监听Channel的四种状态(Connect、Accept、Read、Write),当监听到某一Channel的某个状态时,才允许对Channel进行相应的操作。

    Connect:某一个客户端连接成功后

    Accept:准备好进行连接

    Read:可读

    Write:可写

    4、NIO示例:

    服务端:

    MultiplexerTimeServer.java

      1 package com.studyio.demo3;
      2 
      3 import java.io.IOException;
      4 import java.net.InetSocketAddress;
      5 import java.nio.ByteBuffer;
      6 import java.nio.channels.SelectionKey;
      7 import java.nio.channels.Selector;
      8 import java.nio.channels.ServerSocketChannel;
      9 import java.nio.channels.SocketChannel;
     10 import java.util.Date;
     11 import java.util.Iterator;
     12 import java.util.Set;
     13 
     14 /**
     15  * 
     16  * @author lgs
     17  * 
     18  * 
     19  */
     20 public class MultiplexerTimeServer implements Runnable {
     21 
     22     private Selector selector;
     23     private ServerSocketChannel serverChannel;
     24     private volatile boolean stop;
     25     
     26     public MultiplexerTimeServer(int port) {
     27         try {
     28             //打开服务端的一个通道channel:ServerSocketChannel
     29             serverChannel = ServerSocketChannel.open();
     30             //把服务端的通道设置为非阻塞模式
     31             serverChannel.configureBlocking(false);
     32             //绑定监听的端口地址
     33             serverChannel.socket().bind(new InetSocketAddress(port), 1024);
     34             //创建Selector(多路复用器)线程
     35             selector = Selector.open();
     36             //将服务端通道ServerSocketChannel注册到Selector,交给Selector监听,告诉客户端服务端是可以连接的了
     37             serverChannel.register(selector, SelectionKey.OP_ACCEPT);
     38             System.out.println("The time server is start in port:"+port);
     39         } catch (Exception e) {
     40             e.printStackTrace();
     41             System.exit(1);
     42         }
     43     }
     44 
     45     public void stop(){
     46         this.stop = true;
     47     }
     48     @Override
     49     public void run() {
     50         //处理客户端消息
     51         while(!stop){
     52             try {
     53                 //通过Selector循环准备就绪的Key,这个key指的是客户端的通道
     54                 selector.select();
     55                 //拿到key以后把key放入迭代器iterator
     56                 Set<SelectionKey> selectionKeys = selector.selectedKeys();
     57                 Iterator<SelectionKey> iterator = selectionKeys.iterator();
     58                 SelectionKey selectionKey = null;
     59                 while(iterator.hasNext()){
     60                     selectionKey = iterator.next();
     61                     //取到key以后就移出,避免重复取
     62                     iterator.remove();
     63                     try {
     64                         //处理客户端传递过来的数据
     65                         handleInput(selectionKey);
     66                     } catch (Exception e) {
     67                         if(selectionKey!=null){
     68                             selectionKey.cancel();
     69                             if(selectionKey.channel()!=null){
     70                                 selectionKey.channel().close();
     71                             }
     72                         }
     73                     }
     74                 }
     75             } catch (Exception e) {
     76                 e.printStackTrace();
     77             }
     78         }
     79         if(selector !=null){
     80             try {
     81                 selector.close();
     82             } catch (IOException e) {
     83                 e.printStackTrace();
     84             }
     85         }
     86     }
     87 
     88     /**
     89      * 处理客户端传递过来的数据
     90      * @param selectionKey
     91      * @throws IOException
     92      */
     93     private void handleInput(SelectionKey selectionKey) throws IOException {
     94         if(selectionKey.isValid()){
     95             //客户端是可连接的
     96             if (selectionKey.isAcceptable()) {
     97                 ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
     98                 //多路复用器监听到新的客户端连接,处理连接请求,完成TCP三次握手。
     99                 SocketChannel client = server.accept();
    100                 //设置为非阻塞模式
    101                 client.configureBlocking(false);
    102                 // 将新连接注册到多路复用器上,监听其读操作,读取客户端发送的消息。
    103                 client.register(selector, SelectionKey.OP_READ);
    104             }
    105             //客户端是可读的
    106             if(selectionKey.isReadable()){
    107                 //获取取客户端的通道
    108                 SocketChannel client = (SocketChannel) selectionKey.channel();
    109                 ByteBuffer receivebuffer = ByteBuffer.allocate(1024);
    110                 //读取客户端请求消息到缓冲区
    111                 int count = client.read(receivebuffer);   //非阻塞    
    112                 if (count > 0) {
    113                     receivebuffer.flip();
    114                     byte[] bytes = new byte[receivebuffer.remaining()]; //remaining()方法
    115                     //从缓冲区读取消息到bytes数组里面
    116                     receivebuffer.get(bytes);
    117                     String body = new String(bytes, "UTF-8");
    118                     System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body);
    119                     //将currentTime响应给客户端(客户端Channel)
    120                     String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
    121                     //服务端向客户端响应数据,通过客户端的通道传递数据
    122                     doWrite(client, currentTime);
    123                 }else if(count < 0){
    124                     selectionKey.channel();
    125                     client.close();
    126                 }else{
    127                     
    128                 }
    129             }
    130         }
    131     }
    132 
    133     /**
    134      * 服务端向客户端响应数据,通过客户端的通道传递数据
    135      * @param client
    136      * @param currentTime
    137      * @throws IOException
    138      */
    139     private void doWrite(SocketChannel client, String currentTime) throws IOException {
    140         if(currentTime != null && currentTime.trim().length()>0){
    141             ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
    142             sendbuffer.put(currentTime.getBytes());
    143             sendbuffer.flip();
    144             //将客户端响应消息写入到客户端Channel中。
    145             client.write(sendbuffer);
    146             System.out.println("服务器端向客户端发送数据--:" + currentTime);
    147         }else{
    148             System.out.println("没有数据");
    149         }
    150     }
    151 
    152 }

    服务端入口程序TimeServer.java

    public class TimeServer {
    
        public static void main(String[] args) {
            int port=8080; //服务端默认端口
            MultiplexerTimeServer timeServer=new MultiplexerTimeServer(port);
            new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
        }
    }

     客户端:

    TimeClientHandler.java

      1 package com.studyio.demo3;
      2 
      3 import java.io.IOException;
      4 import java.net.InetSocketAddress;
      5 import java.nio.ByteBuffer;
      6 import java.nio.channels.SelectionKey;
      7 import java.nio.channels.Selector;
      8 import java.nio.channels.SocketChannel;
      9 import java.util.Iterator;
     10 import java.util.Set;
     11 
     12 /**
     13  * 
     14  * @author lgs
     15  * 
     16  */
     17 public class TimeClientHandler implements Runnable {
     18     
     19     private String host;
     20     private int port;
     21     private SocketChannel socketChannel;
     22     private Selector selector;
     23     private volatile boolean stop;
     24     
     25     public TimeClientHandler(String host, int port) {
     26         this.host = host;
     27         this.port = port;
     28         try {
     29             //客户端打开一个通道SocketChannel
     30             socketChannel = SocketChannel.open();
     31             //创建Selector(多路复用器)线程
     32             selector = Selector.open();
     33             //设置为非阻塞模式
     34             socketChannel.configureBlocking(false);
     35         } catch (Exception e) {
     36             e.printStackTrace();
     37             System.exit(1);
     38         }
     39     }
     40 
     41     @Override
     42     public void run() {
     43         try {
     44             //连接服务端并发送数据
     45             doConnect();
     46         } catch (Exception e) {
     47             e.printStackTrace();
     48             System.exit(1);
     49         }
     50         //处理服务端响应的数据,和服务端处理客户端发送的数据一样
     51         while(!stop){
     52             //轮训通道的状态
     53             try {
     54                 selector.select(1000);
     55                 Set<SelectionKey> selectionKeys = selector.selectedKeys();
     56                 Iterator<SelectionKey> iterator = selectionKeys.iterator();
     57                 SelectionKey selectionKey = null;
     58                 while(iterator.hasNext()){
     59                     selectionKey = iterator.next();
     60                     //取到key以后就移出,避免重复取
     61                     iterator.remove();
     62                     try {
     63                         //处理服务端响应的数据
     64                         handleInput(selectionKey);
     65                     } catch (Exception e) {
     66                         if(selectionKey!=null){
     67                             selectionKey.cancel();
     68                             if(selectionKey.channel()!=null){
     69                                 selectionKey.channel().close();
     70                             }
     71                         }
     72                     }
     73                 }
     74             } catch (Exception e) {
     75                 e.printStackTrace();
     76                 System.exit(1);
     77             }
     78         }
     79         if(selector !=null){
     80             try {
     81                 selector.close();
     82             } catch (IOException e) {
     83                 e.printStackTrace();
     84             }
     85         }
     86     }
     87 
     88     /**
     89      * 处理服务端响应的数据
     90      * @param selectionKey
     91      * @throws Exception
     92      */
     93     private void handleInput(SelectionKey selectionKey) throws Exception {
     94         if(selectionKey.isValid()){
     95             SocketChannel client = (SocketChannel) selectionKey.channel();
     96             if (selectionKey.isConnectable()){
     97                 if(client.finishConnect()){
     98                     client.register(selector, SelectionKey.OP_READ);
     99                     doWrite(client);
    100                 }else{
    101                     System.exit(1);
    102                 }
    103             }
    104             if (selectionKey.isReadable()) {
    105                 ByteBuffer receivebuffer = ByteBuffer.allocate(1024);
    106                 int count = client.read(receivebuffer);
    107                 if (count > 0) {
    108                     receivebuffer.flip();
    109                     byte[] bytes = new byte[receivebuffer.remaining()]; //remaining()方法
    110                     receivebuffer.get(bytes);
    111                     String body = new String(bytes, "UTF-8");
    112                     System.out.println("Now is "+body);
    113                     this.stop = true;
    114                 }else if(count < 0){
    115                     selectionKey.channel();
    116                     client.close();
    117                 }else{
    118                     
    119                 }
    120             }
    121         }
    122     }
    123 
    124     /**
    125      * 连接服务端并发送数据
    126      * @throws Exception
    127      */
    128     private void doConnect() throws Exception {
    129         //连接服务端
    130         boolean connect = socketChannel.connect(new InetSocketAddress(host, port));
    131         //判断是否连接成功,如果连接成功,则监听Channel的读状态。
    132         if(connect){
    133             //连接成功就把客户端的通道注册到多路复用器上,并设置通道状态为可读
    134             socketChannel.register(selector, SelectionKey.OP_READ);
    135             //写数据  写给服务端
    136             doWrite(socketChannel);
    137         }else{
    138             //如果没有连接成功,则向多路复用器注册Connect(可连接)状态
    139             socketChannel.register(selector, SelectionKey.OP_CONNECT);
    140         }
    141         
    142     }
    143 
    144     /**
    145      * 写数据  写给服务端
    146      * @param channel
    147      * @throws IOException
    148      */
    149     private void doWrite(SocketChannel channel) throws IOException {
    150         ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
    151         sendbuffer.put("QUERY TIME ORDER".getBytes());
    152         sendbuffer.flip();
    153         //向Channel中写入客户端的请求指令  写到服务端 写到通道里面
    154         channel.write(sendbuffer);
    155         if(!sendbuffer.hasRemaining()){
    156             System.out.println("Send order to server succeed.");
    157         }
    158     }
    159 }

     客户端程序入口:TimeServerClient.java

    public class TimeServerClient {
        
        public static void main(String[] args) {
            int port=8080; //服务端默认端口
            new Thread(new TimeClientHandler("127.0.0.1", port), "NIO-TimeServerClient-001").start();
        }
    }
  • 相关阅读:
    高并发编程之基础概念
    使用JAXB实现Bean与Xml相互转换
    python语法(五)—函数
    2018年终总结
    python语法(四)— 文件操作
    excel开发
    spring 常用注解,@primary注解
    spring中InitializingBean和@Bean的初始化,bean初始化
    @PostConstruct 注解
    LocalDateTime java8
  • 原文地址:https://www.cnblogs.com/leeSmall/p/8616316.html
Copyright © 2011-2022 走看看