AIO(异步非阻塞)AIO采用了Proactor模式,AIO与NIO的不同之处在于当AIO在进行读写操作时,不用先等通知,可直接调用相应的read/write方法,这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序,而NIO的通知是发生在动作之前的,是在可读、写的时候,Selector发现了这些事件后调用Handler处理.
在AIO socket编程中,服务端通道是AsynchronousServerSocketChannel,这个类提供了一个open()静态工厂,一个bind()方法用于绑定服务端IP地址(还有端口号),另外还提供了accept()用于接收用户连接请求。在客户端使用的通道是AsynchronousSocketChannel,这个通道处理提供open静态工厂方法外,还提供了read和write方法。
在AIO编程中,发出一个事件(accept read write等)之后要指定事件处理类(回调函数),AIO中的事件处理类是CompletionHandler<V,A>,这个接口定义了如下两个方法,分别在异步操作成功和失败时被回调。
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousServerSocketChannel;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
- public class AIOEchoServer {
- public final static int PORT = 8001;
- public final static String IP = "127.0.0.1";
- private AsynchronousServerSocketChannel server = null;
- public AIOEchoServer(){
- try {
- //同样是利用工厂方法产生一个通道,异步通道 AsynchronousServerSocketChannel
- server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP,PORT));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- //使用这个通道(server)来进行客户端的接收和处理
- public void start(){
- System.out.println("Server listen on "+PORT);
- //注册事件和事件完成后的处理器,这个CompletionHandler就是事件完成后的处理器
- server.accept(null,new CompletionHandler<AsynchronousSocketChannel,Object>(){
- final ByteBuffer buffer = ByteBuffer.allocate(1024);
- @Override
- public void completed(AsynchronousSocketChannel result,Object attachment) {
- System.out.println(Thread.currentThread().getName());
- Future<Integer> writeResult = null;
- try{
- buffer.clear();
- result.read(buffer).get(100,TimeUnit.SECONDS);
- System.out.println("In server: "+ new String(buffer.array()));
- //将数据写回客户端
- buffer.flip();
- writeResult = result.write(buffer);
- }catch(InterruptedException | ExecutionException | TimeoutException e){
- e.printStackTrace();
- }finally{
- server.accept(null,this);
- try {
- writeResult.get();
- result.close();
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- @Override
- public void failed(Throwable exc, Object attachment) {
- System.out.println("failed:"+exc);
- }
- });
- }
- public static void main(String[] args) {
- new AIOEchoServer().start();
- while(true){
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
客户端:
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
- public class AIOClient {
- public static void main(String[] args) throws IOException {
- final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
- InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1",8001);
- CompletionHandler<Void, ? super Object> handler = new CompletionHandler<Void,Object>(){
- @Override
- public void completed(Void result, Object attachment) {
- client.write(ByteBuffer.wrap("Hello".getBytes()),null,
- new CompletionHandler<Integer,Object>(){
- @Override
- public void completed(Integer result,
- Object attachment) {
- final ByteBuffer buffer = ByteBuffer.allocate(1024);
- client.read(buffer,buffer,new CompletionHandler<Integer,ByteBuffer>(){
- @Override
- public void completed(Integer result,
- ByteBuffer attachment) {
- buffer.flip();
- System.out.println(new String(buffer.array()));
- try {
- client.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void failed(Throwable exc,
- ByteBuffer attachment) {
- }
- });
- }
- @Override
- public void failed(Throwable exc, Object attachment) {
- }
- });
- }
- @Override
- public void failed(Throwable exc, Object attachment) {
- }
- };
- client.connect(serverAddress, null, handler);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }