zoukankan      html  css  js  c++  java
  • AIO(异步IO)

    前言

      AIO是异步IO的缩写,即Asynchronized IO。虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的,对于NIO来说,我们的业务线程是在IO操作准备好时,得到通知,接着就由这个线程自行进行IO操作,IO操作本身还是同步的。

      但是对于AIO来说,则更加的进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此,AIO是完全不会阻塞的。此时,我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。

    NIO和AIO的使用场景

      NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,JDK1.4开始支持。

      AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如HTTP服务器等,充分调用OS参与并发操作,JDK7开始支持

    下面来通过AIO实现的服务器来加深了解AIO:

    AIOEchoServer:

     1 public class AIOEchoServer {
     2     public static final int PORT = 8000;
     3     private AsynchronousServerSocketChannel server;//异步通道
     4 
     5     public AIOEchoServer() throws IOException {
     6         server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT));
     7     }
     8 
     9     //接收和处理
    10     public void start(){
    11         System.out.println("Server listen on " + PORT);
    12         server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
    13             final ByteBuffer buffer = ByteBuffer.allocate(1024);
    14             public void completed(AsynchronousSocketChannel result,Object attachment){
    15                 System.out.println(Thread.currentThread().getName());
    16                 Future<Integer> writeResult = null;
    17                 buffer.clear();
    18                 try {
    19                     result.read(buffer).get(100, TimeUnit.SECONDS);
    20                     buffer.flip();
    21                     writeResult = result.write(buffer);
    22                 } catch (InterruptedException e) {
    23                     e.printStackTrace();
    24                 } catch (ExecutionException e) {
    25                     e.printStackTrace();
    26                 } catch (TimeoutException e) {
    27                     e.printStackTrace();
    28                 }finally {
    29 
    30                     server.accept(null,this);
    31                     try {
    32                         writeResult.get();
    33                         result.close();
    34                     } catch (InterruptedException e) {
    35                         e.printStackTrace();
    36                     } catch (ExecutionException e) {
    37                         e.printStackTrace();
    38                     } catch (IOException e) {
    39                         e.printStackTrace();
    40                     }
    41                 }
    42             }
    43             @Override
    44             public void failed(Throwable exc, Object attachment) {
    45                 System.out.println("failed : " + exc);
    46             }
    47         });
    48     }
    49 }

       异步IO(AIO)需要使用异步通道。这里使用的是AsynchronousServerSocketChannel。

      上述代码定义的start()方法开启了服务器,值得注意的是,这里只是调用了一个函数server.accept()。之后,这一大堆的代码只是这个函数的参数。

      AsynchronousServerSocketChannel.accept()方法会立即返回。它并不会真的等待客户端的到来,这里使用的accept()方法的签名是:

    public final <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler)

      它的第一个参数是一个附件,可以是任意类型,作用是让当前线程和后续的回调方法可以共享这个信息,它会在后续的调用中,传递给handler。它的第二个参数是CompletionHandler接口。这个接口有两个方法:

        void completed(V result,A attachment)
        void failed(Throwable exc,A attachment)

      这两个方法分别在异步操作accept()成功调用completed()和失败调用failed()。

      因此,AsynchronousServerSocketChannel.accept()实际上做了两件事,第一就是发起accept请求,告诉系统可以开始监听端口了。第二,注册CompletionHandler实例,告诉系统,一旦有客户端前来连接,如果连接成功,就去执行CompletionHandler.completed()方法;如果连接失败,就去执行CompletionHandler.failed()方法。

      所以,server.accept()方法不会阻塞,它会立即返回。

      到这里,上述代码的意思其实也就差不多明白了:当completed()被执行时,意味着已经有客户端连接成功了。在第19行,使用read()方法读取客户端的数据,这里需要注意,AsynchronousServerSocketChannel.read()方法也是异步的,换句话说,就是它不会等到数据读取完成了再返回,而是立即返回,返回的结果是一个Future对象,因此这里是Future模式的典型应用。在这里为了编程方便,直接调用了Future.get()方法(第32行),进行等待,将这个异步方法变为了同步方法。因此,在19行执行完成后,数据读取就已经完成了。

      之后,将数据回写给客户端(第21行),这里调用的是AsynchronousServerSocketChannel.write()方法,这个方法也是异步的,同样的返回一个Future对象。

      再之后,第30行,服务器进行下一个客户端的连接准备。同时关闭当前正在处理的客户端连接。但是在关闭之前,得先确认之前的write()操作已经完成,因此,使用Future.get()方法进行等待(第32行)。

    接下来,我们只需要在main函数中调用这个start()方法就可以开启服务器了:

    1 public static void main(String[] args) throws IOException, InterruptedException {
    2         new AIOEchoServer().start();
    3         while (true){
    4             Thread.sleep(1000);
    5         }
    6     }

      上述代码第2行,调用start()方法开启服务器。但是由于start()方法中使用的是异步方法,因此它会立即返回,它并不会像阻塞方法那样会进行等待,因此,如果想让程序驻守执行,第3~5行的等待语句是必须的。否则,在start()方法结束后,不等客户端到来,程序就已经运行完成,主线程就将退出。

    AIOClient:

     1 public class AIOClient {
     2     public static void main(String[] args) throws IOException, InterruptedException {
     3         final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
     4         client.connect(new InetSocketAddress("localhost", 8000), null, new CompletionHandler<Void, Object>() {
     5             @Override
     6             public void completed(Void result, Object attachment) {
     7                 client.write(ByteBuffer.wrap("Hello!".getBytes()), null, new CompletionHandler<Integer, Object>() {
     8                     @Override
     9                     public void completed(Integer result, Object attachment) {
    10                         ByteBuffer buffer = ByteBuffer.allocate(1024);
    11                         client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
    12                             @Override
    13                             public void completed(Integer result, ByteBuffer attachment) {
    14                                 buffer.flip();
    15                                 System.out.println(new String(buffer.array()));
    16                                 try {
    17                                     client.close();
    18                                 } catch (IOException e) {
    19                                     e.printStackTrace();
    20                                 }
    21                             }
    22                             @Override
    23                             public void failed(Throwable exc, ByteBuffer attachment) {
    24                             }
    25                         });
    26                     }
    27                     @Override
    28                     public void failed(Throwable exc, Object attachment) {
    29                     }
    30                 });
    31             }
    32             @Override
    33             public void failed(Throwable exc, Object attachment) {
    34             }
    35         });
    36         //由于主线程会立即结束,所以这里等待上述处理全部完成
    37         Thread.sleep(1000);
    38     }
    39 }

    上述的AIOClient代码看起来很长,实际上只有三个语句:

      第一个语句:代码第3行,打开AsynchronousSocketChannel通道。

      第二个语句:代码第4~35行,它让客户端去连接指定的服务器,并注册了一系列事件。

      第三个语句:代码第37行,让主线程进行等待。

    代码的第4行,客户端进行网络连接,并注册了连接成功的回调函数CompletionHandler<Void,Object>。待连接成功后,就会进入代码第7行。第7行进行数据写入,向服务端发送数据。这个过程是异步的,会很快返回,写入完成后,会通知回调接口CompletionHandler<Integer,Object>,进入第10行。准备进行数据读取,从服务端读取回写的数据。当然代码的第11行的read()方法也是立即返回的,成功读取所有的数据后,会回调CompletionHandler<Integer,ByteBuffer>接口,进入第14行。在第15行,打印接收到的数据。  

    AIO的特点

      1. 读完了再通知我

      2. 不会加快IO,只是在读完后进行通知

      3. 使用回调函数,进行业务处理

    参考:《Java高并发程序设计》 葛一鸣 郭超 编著:

    作者:Joe
    努力了的才叫梦想,不努力的就是空想,努力并且坚持下去,毕竟这是我相信的力量
  • 相关阅读:
    三连击
    铺地毯
    超级玛丽游戏
    A+B problem
    怎么感觉欠缺点什么、灵光在哪,让我顿悟下
    2016年10月,人生的转折点。
    python第七天
    python第六天
    汉诺塔问题(Hanoi Tower)递归算法解析(Python实现)
    计算多数的乘积(Python实现)
  • 原文地址:https://www.cnblogs.com/Joe-Go/p/9987894.html
Copyright © 2011-2022 走看看