zoukankan      html  css  js  c++  java
  • Java高并发程序设计(八)—— NIO和AIO

    一、NIO

    1. 为什么需要了解NIO和AIO

    本质上来说,看起来它们跟多线程关系并不密切,并发更多的是线程的控制和线程的操作等内容,但是,虽然NIO并没有对多线程的控制与协作提出一些特别的观点,但是它改变了线程在应用层面的使用方式,解决了一些实际的困难,节省了一些系统的成本。

    异步IO,所谓异步的操作,异步调用通常会在后台会使用一种线程的形式执行某种操作,使得前面的线程很快能够返回

    2.  NIO

    NIO是New I/O的简称,与旧式的基于流的I/O方法相对,从名字看,它表示新的一套Java I/O标准。它是在Java 1.4中被纳入到JDK中的,并具有以下特性:

      *  NIO是基于块(Block)的,它以块为基本单位处理数据

        解释:传统的标准的I/O是基于流的,是一个字节一个字节读的。NIO是基于块的,硬盘上存储数据就是基于块去做存储的,并不是基于字节的,并不是一个字节一个字节去存储的,那么它读写的最小单位是块,两者进行了一个对应,因此NIO的性能高于基于流的传统IO。

      *  为所有的原始类型提供(Buffer)缓存支持

        解释:跟这个NIO打交道的时候,都会使用一个Buffer的对象。对于每一种原始的数据类型都有一种Buffer进行支持。

      *  增加通道(Channel)对象,作为新的原始I/O抽象

        解释:标准IO,使用流,InputStream和OutputStream,这里使用Channel

      *  支持锁和内存映射文件的文件访问接口

        解释:文件锁是指,有些程序运行时生成.lock,这个就是文件锁,当有这个文件存在时,就表示当前程序中可能有一个线程占用了这把锁,其它线程如果同样想使用这把锁,就会陷入等待,它使用完这把锁,就会把文件删除,之后,其它线程才能进去做一些相应的操作。也就是说拿文件系统来实现锁,与ReentrantLock相比,ReentrantLock是使用原子整数来实现锁,这里使用文件进行加锁,NIO支持用这种方式写代码。内存映射文件,简单理解,一个文件在硬盘磁盘里面的,把一个文件映射到内存当中去,就相当于把文件读到内存中,这种方式比传统的把文件中的数据一个个读上来方式快很多。

      *  提供了基于Selector的异步网络I/O

        解释:基于网络操作来讲,提供了Selector,也是重点内容。

    二、Buffer&Channel

    1. Buffer&&Channel

     Buffer缓冲区是NIO的核心部分,所有的NIO的操作都要通过Buffer去做。当你从通道里面读数据的,要先写入这个buffer,要从通道里面写数据的时候,也要先写一个buffer,然后由buffer写入到通道里面去。而通道就是IO的抽象,在通道的另一端,可能就是我们要操作的文件或者socket。我们通过buffer的读写对实际IO的目标进行修改和读取。

    2. Buffer的实现类

     使用最多的是ByteBuffer,因为通常我们都喜欢一个字节一个字节的读数据。

    3. Buffer的简单使用

     新建文件后,拿到InputStream,然后通过InputStream,得到Channel,进而得到FinalChannel,它对应这个文件,所以在FinalChannel读写数据时,就是对文件进行操作。

    对ByteBuffer分配了1k的buffer,然后从channel中读数据到buffer中去,buffer中存放文件内容。

    把channel关闭掉,对byteBuffer做读写转换,表示后面可能读取buffer中的数据。

    4. NIO复制文件

     复制文件需要一个源和一个地址。fis是源,fos是目标,都能拿到一个通道channel,分配一个1k的buffer,读写都要通过buffer去做。读写转换后,把buffer内容写到目标中去,通过这样一个循环,最终把原数据内容1k 1k的写出来,读到最终会读完,关掉。实现了文件的复制过程。

    5. Buffer中有3个重要的参数:位置(position)、容量(capacity)和上限(limit)

     其实buffer中很多函数的操作,其实大部分就是修改这3个位置,其实API大部分是对这3个参数调整。

    position:操作当前缓冲区,已经读到或者写到哪个位置了,下一次读写从position下一个位置读写数据。position当前操作位置。

    capacity:整个缓冲区的总容量上限。到底一共能放多少数据。

    limit:缓冲区的实际上限。整个缓冲区一共有10个byte,容量有10个byte,最多不能超过10个byte,读了一个文件,这个文件有5个byte,缓冲区中实际用了5个byte,这个时候,可以认为limit是5,虽然缓冲区可以放10,但是只放了5,有意义的数据只到了5,后面的数据没有意义,因此上限理解为有意义的数据的位置,因此代表可读的总容量。

    6. 案例

     

     

     

     

    7. 文件映射到内存

    三、网络编程

    1. 多线程网络服务器的一般结构

     有多个客户端连接这个服务器,有一个派发线程,只做accept操作,它不会做客户端实际的业务处理,它会把接到对的客户端派发给不同的线程做处理,同时把socket派发给线程去做处理,处理过程中涉及到读写。

    2. 简单案例

     这里实现一个简单的多线程服务器,它的功能就是读到客户端的什么数据,就往客户端回写相同的数据。

    这个程序首先通过new ServerSocket建立socket,监听8000端口,然后accept()等待客户端连接,如果等到后,通过线程池tp执行操作,处理消息,从线程池里面挑一个线程做处理,同时把客户端clientSocket作为参数传过去,这样新的线程就能和客户端进行通信,主程序只做客户端的接入。

     HandleMsg先从客户端读进来readLine(),读进来的照模照样的写进来,OutputStream也是从clientSocket构造出来的。

     客户端新建一个socket,连接8000端口。读的时候也是从socket当中读InputStream。

    3. 问题

    为每一个客户端使用一个线程,如果客户端出现延时等异常,线程可能会被占用很长时间。因为数据的转呗和读取都在这个线程中。

    此时,如果客户端数量众多,可能会消耗大量的系统资源。

    传统的网络编程方式,在这种模式下,系统为每一个客户端使用一个线程,如果客户端出现了异常,在系统当中,最不可靠的就是网络,CPU、硬盘、内存都相对可靠,网络有各种各样的情况读不到,比如网络丢包,网络拥塞,网络延迟,线路没插好,WiFi出问题,信号太弱,因此网络并不可靠,因此在网络上传输数据会出现延时中断等。当出现这种问题的时候,其实这些数据通道产生的问题,会对整个系统的交互产生影响。由于线程是一对一服务的,比如产生一个线程就是对这个客户端服务的,因此当客户端或者线路出现问题的时候,线程就会占用很多时间,因为数据的准备和读取都是会由新建的线程负责,如果客户单数量众多,大量的并发连接,只要有若干个不好的状况,可能导致比较大的消耗。

    4. 解决

    非阻塞的NIO

    数据准备好了在工作

    我读取数据不需要做等待。本来情况是一个线程对应一个客户端,这个线程在读取客户端数据的时候要等待,等待数据读完,才把数据拿上来。IO操作要有两部分,一部分是准备做IO,一部分是真正做IO。非阻塞是指准备做IO的时候不会做线程阻塞,只有当数据准备好了,去读一下或者去写一下而已,所以NIO是非阻塞的IO。如果说客户端反应迟钝缓慢,这个延时的时间其实并不会出现NIO所处理的线程中,如果没有准备好数据,线程不会开始工作的。

    5. 模拟低效的客户端

     

     因为客户端花了6秒钟写数据,所以服务端每处理一个数据花费6秒钟。每一个线程都卡了6秒钟

    6. NIO方式处理如上问题

     selectable channel是可选择的channel,它可以被轮询,被选择。selector是选择器,它可以选择一个channel,去做数据的处理。关键点:一个selector只对应一个线程,一个线程可以用一个selector,一个selector可以轮询多个channel。而每一个channel后面对应一个socket,在这种模式下,一个线程轮询了多个socket,这就是线程的复用。selector通常情况下,要去看哪个客户端准备好了(每一个channel后面对应一个socket,每一个socket就是一个客户端),有两种方式,select()和selectNow()。

    select():它就是告诉你, 当前已经有客户端准备好数据,如果当前没有客户端准备好数据,在selector上面调用select方法,是会阻塞的,当无数据准备好的时候,selector是无事可做的,当有数据准备好的时候,selector就会返回。返回之后,实际上在后面得到一个selectionKey的东西,必须在执行select后得到selectionKey。selectionKey表示一对selector和channel,表示在哪个selector上发现哪个channel是数据准备好的,只有准备好数据的channel(client、socket)才会被selector选中,才会触发selector做返回。如果没有一个数据准备好,这个selector是被阻塞的,不会被返回的。这样就使得一个线程同时监视多个客户端,而且当客户端反应迟钝的,只是这个线程本身不会做返回,但是线程数量很少,因为只有几个线程就可以监控大量的客户端,因此阻塞几个线程对系统的性能影响很小的。并且在这个读取数据的过程中不会等待。selectNow不阻塞,直接返回,如果当前没有人准备好,就返回零,如果准备好,返回准备好数据的client个数。

    7. 参考代码

    示例1:

     1 package com.sishisan.ch8;
     2 
     3 import java.io.BufferedReader;
     4 import java.io.IOException;
     5 import java.io.InputStreamReader;
     6 import java.io.PrintWriter;
     7 import java.net.ServerSocket;
     8 import java.net.Socket;
     9 import java.util.concurrent.ExecutorService;
    10 import java.util.concurrent.Executors;
    11 
    12 public class MultiThreadEchoServer {
    13     private static ExecutorService tp = Executors.newCachedThreadPool();
    14     static class HandleMsg implements Runnable {
    15         Socket clientSocket;
    16         public HandleMsg(Socket clientSocket) {
    17             this.clientSocket = clientSocket;
    18         }
    19         
    20         @Override
    21         public void run() {
    22             BufferedReader is = null;
    23             PrintWriter os = null;
    24             try {
    25                 is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
    26                 os = new PrintWriter(clientSocket.getOutputStream(),true);
    27                 //从InputStream当中读取客户端所发送的数据
    28                 String inputLine = null;
    29                 long b = System.currentTimeMillis();
    30                 while((inputLine = is.readLine()) != null) {
    31                     os.print(inputLine);
    32                 }
    33                 long e = System.currentTimeMillis();
    34                 System.out.println("spend:" + (e-b) + "ms");
    35             } catch (IOException e) {
    36                 // TODO Auto-generated catch block
    37                 e.printStackTrace();
    38             }finally {
    39                 try {
    40                     if(is != null) is.close();
    41                     if(os != null) os.close();
    42                     clientSocket.close();
    43                 } catch (IOException e) {
    44                     // TODO Auto-generated catch block
    45                     e.printStackTrace();
    46                 }
    47             }
    48             
    49         }
    50     }
    51     
    52     public static void main(String[] args) {
    53         ServerSocket echoServer = null;
    54         Socket clientSocket = null;
    55         try {
    56             echoServer = new ServerSocket(8000);
    57         } catch (IOException e) {
    58             // TODO Auto-generated catch block
    59             e.printStackTrace();
    60         }
    61         while(true) {
    62             try {
    63                 clientSocket = echoServer.accept();
    64                 System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");
    65                 tp.execute(new HandleMsg(clientSocket));
    66             } catch (IOException e) {
    67                 // TODO Auto-generated catch block
    68                 e.printStackTrace();
    69             }
    70         }
    71     }
    72 }
    View Code

    示例2:

     1 package com.sishisan.ch8;
     2 
     3 import java.io.BufferedReader;
     4 import java.io.IOException;
     5 import java.io.InputStreamReader;
     6 import java.io.PrintWriter;
     7 import java.net.InetSocketAddress;
     8 import java.net.Socket;
     9 import java.util.concurrent.ExecutorService;
    10 import java.util.concurrent.Executors;
    11 import java.util.concurrent.locks.LockSupport;
    12 
    13 public class HeavySocketClient {
    14     private static ExecutorService tp = Executors.newCachedThreadPool();
    15     private static final int sleep_time = 1000*1000*1000;
    16     public static class EchoClient implements Runnable {
    17 
    18         @Override
    19         public void run() {
    20             Socket client = null;
    21             PrintWriter writer = null;
    22             BufferedReader reader = null;
    23             
    24             try {
    25                 client = new Socket();
    26                 client.connect(new InetSocketAddress("localhost",8000));
    27                 writer = new PrintWriter(client.getOutputStream(),true);
    28                 writer.print("H");
    29                 LockSupport.parkNanos(sleep_time);
    30                 writer.print("e");
    31                 LockSupport.parkNanos(sleep_time);
    32                 writer.print("l");
    33                 LockSupport.parkNanos(sleep_time);
    34                 writer.print("l");
    35                 LockSupport.parkNanos(sleep_time);
    36                 writer.print("o");
    37                 LockSupport.parkNanos(sleep_time);
    38                 writer.print("!");
    39                 LockSupport.parkNanos(sleep_time);
    40                 writer.println();
    41                 writer.flush();
    42                 
    43                 reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
    44                 System.out.println("from server:" + reader.readLine());
    45             } catch (IOException e) {
    46                 // TODO Auto-generated catch block
    47                 e.printStackTrace();
    48             } finally {
    49                 try {
    50                     if(writer != null) {
    51                         writer.close();
    52                     }
    53                     if(reader != null) {
    54                         reader.close();
    55                     }
    56                 } catch (IOException e) {
    57                     // TODO Auto-generated catch block
    58                     e.printStackTrace();
    59                 }
    60             }
    61         }    
    62     }
    63     
    64     public static void main(String[] args) {
    65         EchoClient ec = new EchoClient();
    66         for(int i = 0; i < 10; i++) {
    67             tp.execute(ec);
    68         }
    69     }
    70 }
    View Code

    示例3:

      1 package com.sishisan.ch8;
      2 
      3 import java.io.IOException;
      4 import java.net.InetAddress;
      5 import java.net.InetSocketAddress;
      6 import java.net.Socket;
      7 import java.nio.ByteBuffer;
      8 import java.nio.channels.SelectionKey;
      9 import java.nio.channels.Selector;
     10 import java.nio.channels.ServerSocketChannel;
     11 import java.nio.channels.SocketChannel;
     12 import java.nio.channels.spi.SelectorProvider;
     13 import java.util.HashMap;
     14 import java.util.Iterator;
     15 import java.util.LinkedList;
     16 import java.util.Map;
     17 import java.util.Set;
     18 import java.util.concurrent.ExecutorService;
     19 import java.util.concurrent.Executors;
     20 
     21 public class MultiThreadNIOEchoServer {
     22     public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>(10240);
     23     class EchoClient {
     24         private LinkedList<ByteBuffer> outq;
     25         
     26         EchoClient(){
     27             outq = new LinkedList<ByteBuffer>();
     28         }
     29         
     30         public LinkedList<ByteBuffer> getOutputQueue() {
     31             return outq;
     32         }
     33         
     34         public void enqueue(ByteBuffer bb) {
     35             outq.addFirst(bb);
     36         }
     37     }
     38     
     39     class HandleMsg implements Runnable {
     40         
     41         SelectionKey sk;
     42         ByteBuffer bb;
     43         
     44         public HandleMsg(SelectionKey sk, ByteBuffer bb) {
     45             this.sk = sk;
     46             this.bb = bb;
     47         }
     48         
     49         @Override
     50         public void run() {
     51             EchoClient echoClient = (EchoClient) sk.attachment();
     52             echoClient.enqueue(bb);
     53             
     54             //We have enqueued data to be written to the client, 
     55             //we must don't set interest in OP_WRITE.
     56             sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
     57             //强迫selector立即返回
     58             selector.wakeup();
     59         }
     60     }
     61     
     62     /**
     63      * Called when a SelectionKey is ready for writing
     64      */
     65     private void doWrite(SelectionKey sk) {
     66         SocketChannel channel = (SocketChannel) sk.channel();
     67         EchoClient echoClient = (EchoClient) sk.attachment();
     68         LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();
     69         
     70         ByteBuffer bb = outq.getLast();
     71         try {
     72             int len = channel.write(bb);
     73             if(len == -1) {
     74                 disconnect(sk);
     75                 return;
     76             }
     77             if(bb.remaining() == 0) {
     78                 //The buffer was completely written, remove it.
     79                 outq.removeLast();
     80             }
     81         } catch (IOException e) {
     82             // TODO Auto-generated catch block
     83             System.out.println("Failed to write to client.");
     84             e.printStackTrace();
     85             disconnect(sk);
     86         }
     87         
     88         //If thre is no more data to be written, remove interest in OP_WRITE
     89         if(outq.size() == 0) {
     90             sk.interestOps(SelectionKey.OP_READ);
     91         }
     92     }
     93     
     94     private void disconnect(SelectionKey sk) {
     95         try {
     96             if(sk != null) {
     97                 sk.channel().close();
     98             }
     99         } catch (IOException e) {
    100             // TODO Auto-generated catch block
    101             System.out.println("Failed to close client socket channel.");
    102             e.printStackTrace();
    103         }
    104     }
    105     
    106     /**
    107      * Read from a client. Enqueue the data on the clients output queue and set the selector to notify on OP_WRITE.
    108      */
    109     private void doRead(SelectionKey sk) {
    110         SocketChannel channel = (SocketChannel) sk.channel();
    111         ByteBuffer bb = ByteBuffer.allocate(8192);
    112         int len;
    113         
    114         try {
    115             len = channel.read(bb);
    116             if(len < 0) {
    117                 disconnect(sk);
    118                 return;
    119             }
    120         } catch (IOException e) {
    121             // TODO Auto-generated catch block
    122             System.out.println("Fialed to read from client");
    123             e.printStackTrace();
    124             disconnect(sk);
    125             return;
    126         }
    127         
    128         //Flip the buffer
    129         bb.flip();
    130         tp.execute(new HandleMsg(sk,bb));
    131     }
    132     private Selector selector;
    133     private ExecutorService tp = Executors.newCachedThreadPool();
    134     
    135     /**
    136      * Accept a new client and set it up for reading
    137      */
    138     private void doAccept(SelectionKey sk) {
    139         ServerSocketChannel server = (ServerSocketChannel) sk.channel();
    140         SocketChannel clientChannel;
    141         
    142         try {
    143             clientChannel = server.accept();
    144             clientChannel.configureBlocking(false);
    145             
    146             //Register this channel for reading
    147             SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
    148             //Allocate an EchoClient instance and attach it to this selection key.
    149             EchoClient echoClient = new EchoClient();
    150             clientKey.attach(echoClient);
    151             
    152             InetAddress clientAddress = clientChannel.socket().getInetAddress();
    153             System.out.println("Accept connection from" + clientAddress.getHostAddress());
    154         } catch (IOException e) {
    155             // TODO Auto-generated catch block
    156             System.out.println("Failed to accept new client.");
    157             e.printStackTrace();
    158         }
    159         
    160     }
    161     
    162     private void startServer() throws Exception {
    163         selector = SelectorProvider.provider().openSelector();
    164         
    165         //create non-blocking server socket
    166         ServerSocketChannel ssc = ServerSocketChannel.open();
    167         
    168         //Bind the server socket to localhost
    169         //InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(), 8000);
    170         InetSocketAddress isa = new InetSocketAddress(8000);
    171         ssc.socket().bind(isa);
    172         
    173         //Register the socket for select events
    174         SelectionKey acceptKey = ssc.register(selector,  SelectionKey.OP_ACCEPT);
    175         
    176         //Local forever
    177         for(;;) {
    178             selector.select();
    179 //            if(selector.selectNow() == 0) {
    180 //                continue;
    181 //            }
    182             Set readyKeys = selector.selectedKeys();
    183             Iterator i = readyKeys.iterator();
    184             long e = 0;
    185             while(i.hasNext()) {
    186                 SelectionKey sk = (SelectionKey) i.next();
    187                 i.remove();
    188                 if(sk.isAcceptable()) {
    189                     doAccept(sk);
    190                 } else if(sk.isValid() && sk.isReadable()) {
    191                     if(!geym_time_stat.containsKey(((SocketChannel)sk.channel()).socket())) {
    192                         geym_time_stat.put(((SocketChannel)sk.channel()).socket(), System.currentTimeMillis());
    193                     }
    194                     doRead(sk);
    195                 } else if(sk.isValid() && sk.isWritable()) {
    196                     doWrite(sk);
    197                     e = System.currentTimeMillis();
    198                     long b = geym_time_stat.remove(((SocketChannel)sk.channel()).socket());
    199                     System.out.println("spend:" + (e-b) + "ms");
    200                 }
    201             }
    202         }
    203     }
    204     
    205     public static void main(String[] args) {
    206         MultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer();
    207         try {
    208             echoServer.startServer();
    209         } catch (Exception e) {
    210             // TODO Auto-generated catch block
    211             System.out.println("Exception caught, program exiting...");
    212             e.printStackTrace();
    213         }
    214     }
    215 }
    View Code

    8. 总结

    NIO会将数据准备好后,再交由应用进行处理,数据的读取过程依然在应用线程中完成。

    节省数据准备时间(因为selector可以复用)

    数据没有准备好是跑不到应用中的,因此可以节省数据准备时间,把等待的时间剥离到一个单独的线程,从而提升运行效率。

    四、AIO

    1. 简介

    读完了再通知我

    不会加快IO,只是在读完后进行通知。

    使用回调函数,进行业务处理。

    对于应用程序来说,不用读不用写,只用等在那里,只需要等待系统把数据处理完了,调用回调函数,就会来执行。

    IO本身速度不变,它只是修改了原来线程对线程的处理方式,导致看起来系统性能更高,只是我们进行了更加合理的时间和线程调度。只是读完写完后给我一个通知。

    AIO主要是定义大量的回调函数,进行业务处理。

     2. 主要使用AsynchronousServerSocketChannel

     因为是异步IO,所以accept不能阻塞,直接返回。这里使用回调函数,实际上是一个回调接口,做处理,当真正有任务accept上来的时候,调用handler,把accept相关的客户端信息传给你,然后你再去做accept后面的逻辑,这就是AIOde基本思想。

    3. AsynchronousSocketChannel的主要方法

     read是从Socket里面读数据,因为是异步的,所有的read方法都是立即放回的。

    ByteBuffer读到哪里去,long,TimeUnit指的是超时时间,CompletionHandler指的是读完后要做的事情。返回值Future表示读完后到底读到了多少字节。

    ByteBuffer[]有时候网络协议报文头部大小固定,这时候使用这种方式,很快把头部剥离处理。

    4. 示例代码

  • 相关阅读:
    this指向
    call方法
    js浮点数的比较
    最近写h5 后台可配置大图轮播 发现pc上面正常,手机端无法显示
    记录好用的网站
    s-table组件设定
    iview TimePicker实现选择时间段
    symbol的使用
    JS笔记
    linux网络基础管理
  • 原文地址:https://www.cnblogs.com/upyang/p/13514908.html
Copyright © 2011-2022 走看看