前言
AIO是异步IO的缩写,即Asynchronized IO。虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的,对于NIO来说,我们的业务线程是在IO操作准备好时,得到通知,接着就由这个线程自行进行IO操作,IO操作本身还是同步的。
但是对于AIO来说,则更加的进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此,AIO是完全不会阻塞的。此时,我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。
NIO和AIO的使用场景
NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,JDK1.4开始支持。
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如HTTP服务器等,充分调用OS参与并发操作,JDK7开始支持
下面来通过AIO实现的服务器来加深了解AIO:
AIOEchoServer:
1 public class AIOEchoServer { 2 public static final int PORT = 8000; 3 private AsynchronousServerSocketChannel server;//异步通道 4 5 public AIOEchoServer() throws IOException { 6 server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT)); 7 } 8 9 //接收和处理 10 public void start(){ 11 System.out.println("Server listen on " + PORT); 12 server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { 13 final ByteBuffer buffer = ByteBuffer.allocate(1024); 14 public void completed(AsynchronousSocketChannel result,Object attachment){ 15 System.out.println(Thread.currentThread().getName()); 16 Future<Integer> writeResult = null; 17 buffer.clear(); 18 try { 19 result.read(buffer).get(100, TimeUnit.SECONDS); 20 buffer.flip(); 21 writeResult = result.write(buffer); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 } catch (ExecutionException e) { 25 e.printStackTrace(); 26 } catch (TimeoutException e) { 27 e.printStackTrace(); 28 }finally { 29 30 server.accept(null,this); 31 try { 32 writeResult.get(); 33 result.close(); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } catch (ExecutionException e) { 37 e.printStackTrace(); 38 } catch (IOException e) { 39 e.printStackTrace(); 40 } 41 } 42 } 43 @Override 44 public void failed(Throwable exc, Object attachment) { 45 System.out.println("failed : " + exc); 46 } 47 }); 48 }
49 }
异步IO(AIO)需要使用异步通道。这里使用的是AsynchronousServerSocketChannel。
上述代码定义的start()方法开启了服务器,值得注意的是,这里只是调用了一个函数server.accept()。之后,这一大堆的代码只是这个函数的参数。
AsynchronousServerSocketChannel.accept()方法会立即返回。它并不会真的等待客户端的到来,这里使用的accept()方法的签名是:
public final <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler)
它的第一个参数是一个附件,可以是任意类型,作用是让当前线程和后续的回调方法可以共享这个信息,它会在后续的调用中,传递给handler。它的第二个参数是CompletionHandler接口。这个接口有两个方法:
void completed(V result,A attachment) void failed(Throwable exc,A attachment)
这两个方法分别在异步操作accept()成功调用completed()和失败调用failed()。
因此,AsynchronousServerSocketChannel.accept()实际上做了两件事,第一就是发起accept请求,告诉系统可以开始监听端口了。第二,注册CompletionHandler实例,告诉系统,一旦有客户端前来连接,如果连接成功,就去执行CompletionHandler.completed()方法;如果连接失败,就去执行CompletionHandler.failed()方法。
所以,server.accept()方法不会阻塞,它会立即返回。
到这里,上述代码的意思其实也就差不多明白了:当completed()被执行时,意味着已经有客户端连接成功了。在第19行,使用read()方法读取客户端的数据,这里需要注意,AsynchronousServerSocketChannel.read()方法也是异步的,换句话说,就是它不会等到数据读取完成了再返回,而是立即返回,返回的结果是一个Future对象,因此这里是Future模式的典型应用。在这里为了编程方便,直接调用了Future.get()方法(第32行),进行等待,将这个异步方法变为了同步方法。因此,在19行执行完成后,数据读取就已经完成了。
之后,将数据回写给客户端(第21行),这里调用的是AsynchronousServerSocketChannel.write()方法,这个方法也是异步的,同样的返回一个Future对象。
再之后,第30行,服务器进行下一个客户端的连接准备。同时关闭当前正在处理的客户端连接。但是在关闭之前,得先确认之前的write()操作已经完成,因此,使用Future.get()方法进行等待(第32行)。
接下来,我们只需要在main函数中调用这个start()方法就可以开启服务器了:
1 public static void main(String[] args) throws IOException, InterruptedException { 2 new AIOEchoServer().start(); 3 while (true){ 4 Thread.sleep(1000); 5 } 6 }
上述代码第2行,调用start()方法开启服务器。但是由于start()方法中使用的是异步方法,因此它会立即返回,它并不会像阻塞方法那样会进行等待,因此,如果想让程序驻守执行,第3~5行的等待语句是必须的。否则,在start()方法结束后,不等客户端到来,程序就已经运行完成,主线程就将退出。
AIOClient:
1 public class AIOClient { 2 public static void main(String[] args) throws IOException, InterruptedException { 3 final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); 4 client.connect(new InetSocketAddress("localhost", 8000), null, new CompletionHandler<Void, Object>() { 5 @Override 6 public void completed(Void result, Object attachment) { 7 client.write(ByteBuffer.wrap("Hello!".getBytes()), null, new CompletionHandler<Integer, Object>() { 8 @Override 9 public void completed(Integer result, Object attachment) { 10 ByteBuffer buffer = ByteBuffer.allocate(1024); 11 client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { 12 @Override 13 public void completed(Integer result, ByteBuffer attachment) { 14 buffer.flip(); 15 System.out.println(new String(buffer.array())); 16 try { 17 client.close(); 18 } catch (IOException e) { 19 e.printStackTrace(); 20 } 21 } 22 @Override 23 public void failed(Throwable exc, ByteBuffer attachment) { 24 } 25 }); 26 } 27 @Override 28 public void failed(Throwable exc, Object attachment) { 29 } 30 }); 31 } 32 @Override 33 public void failed(Throwable exc, Object attachment) { 34 } 35 }); 36 //由于主线程会立即结束,所以这里等待上述处理全部完成 37 Thread.sleep(1000); 38 } 39 }
上述的AIOClient代码看起来很长,实际上只有三个语句:
第一个语句:代码第3行,打开AsynchronousSocketChannel通道。
第二个语句:代码第4~35行,它让客户端去连接指定的服务器,并注册了一系列事件。
第三个语句:代码第37行,让主线程进行等待。
代码的第4行,客户端进行网络连接,并注册了连接成功的回调函数CompletionHandler<Void,Object>。待连接成功后,就会进入代码第7行。第7行进行数据写入,向服务端发送数据。这个过程是异步的,会很快返回,写入完成后,会通知回调接口CompletionHandler<Integer,Object>,进入第10行。准备进行数据读取,从服务端读取回写的数据。当然代码的第11行的read()方法也是立即返回的,成功读取所有的数据后,会回调CompletionHandler<Integer,ByteBuffer>接口,进入第14行。在第15行,打印接收到的数据。
AIO的特点
1. 读完了再通知我
2. 不会加快IO,只是在读完后进行通知
3. 使用回调函数,进行业务处理
参考:《Java高并发程序设计》 葛一鸣 郭超 编著: