由JDK1.7提供的NIO2.0新增了异步的套接字通道,它是真正的异步I/O,在异步I/O操作的时候可以传递信号变量,当操作完成后会回调相关的方法,异步I/o也被称为AIO,对应于UNIX网络编程中的事件驱动I/O;不再需要通过多路复用器(Selector)对注册的通道进行轮询操作就可以实现异步读写
package com.hjp.netty.aio; import java.io.IOException; public class TimeServer { public static void main(String[] args)throws IOException{ int port=8080; if (args!=null&&args.length>0){ try { port=Integer.valueOf(args[0]); }catch (NumberFormatException e){ } } AsyncTimeServerHandler timeServerHandler=new AsyncTimeServerHandler(port); new Thread(timeServerHandler,"AIOServer").start(); } }
package com.hjp.netty.aio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.CountDownLatch; /** * Created by JiaPeng on 2017/7/24. */ public class AsyncTimeServerHandler implements Runnable { private int port; CountDownLatch latch; AsynchronousServerSocketChannel asynchronousServerSocketChannel; public AsyncTimeServerHandler(int port) { this.port = port; try { asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); //绑定监听端口 asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //CountDownLatch作用是完成一组正在执行的操作之前,允许当前的线程一直阻塞, //实际项目中不需要独立启动一个线程来处理的 latch=new CountDownLatch(1); doAccept(); try{ latch.await(); }catch (InterruptedException e){ e.printStackTrace(); } } public void doAccept(){ asynchronousServerSocketChannel.accept(this,new AcceptCompletionHandler()); } }
package com.hjp.netty.aio; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> { @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { //再次让asynchronousServerSocketChannel对象调用accept方法是因为: //调用AsynchronousServerSocketChannel的accept方法后,如果有新的客户端接入, // 系统将回调我们传入的CompletionHandler实例的completed方法,表示新客户端连接成功。 // 因为AsynchronousServerSocketChannel可以接受成千上万个客户端,所以需要继续调用它的accept方法, // 接受其他客户端连接,最终形成一个环;每当一个客户端连接成功后,再异步接受新的客户端连接 attachment.asynchronousServerSocketChannel.accept(attachment,this); ByteBuffer readBuffer=ByteBuffer.allocate(1024); result.read(readBuffer,readBuffer,new ReadCompletionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.latch.countDown(); } }
package com.hjp.netty.aio; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.Date; public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel socketChannel; public ReadCompletionHandler(AsynchronousSocketChannel socketChannel) { if (this.socketChannel == null) { this.socketChannel = socketChannel; } } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] body = new byte[attachment.remaining()]; attachment.get(body); try { String request = new String(body, "UTF-8"); System.out.println("The time server receive order : " + request); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(request) ? new Date().toString() : "BAD ORDER"; doWrite(currentTime); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } private void doWrite(String currentTime) { if (currentTime != null && currentTime.trim().length() > 0) { byte[] bytes = currentTime.getBytes(); final ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); socketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { //如果没有发送完继续发送 if (attachment.hasRemaining()) { socketChannel.write(attachment, attachment, this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { socketChannel.close(); } catch (IOException e) { } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }
package com.hjp.netty.aio; public class TimeClient { public static void main(String[] args){ int port=8080; if(args!=null&&args.length>0){ try { port=Integer.valueOf(args[0]); }catch (NumberFormatException e){ } } new Thread(new AsyncTimeClientHandler("127.0.0.1",port),"AIOClient").start(); } }
package com.hjp.netty.aio; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable { private AsynchronousSocketChannel socketChannel; private String host; private int port; private CountDownLatch latch; public AsyncTimeClientHandler(String host,int port){ this.host=host; this.port=port; try { socketChannel=AsynchronousSocketChannel.open(); }catch (IOException e){ e.printStackTrace(); } } @Override public void run() { latch=new CountDownLatch(1); socketChannel.connect(new InetSocketAddress(host,port),this,this); try { latch.await(); }catch (InterruptedException e){ e.printStackTrace(); } try { socketChannel.close(); }catch (IOException e){ e.printStackTrace(); } } @Override public void completed(Void result, AsyncTimeClientHandler attachment) { byte[] request="QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer=ByteBuffer.allocate(request.length); writeBuffer.put(request); writeBuffer.flip(); socketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()){ socketChannel.write(attachment,attachment,this); }else { ByteBuffer readBuffer=ByteBuffer.allocate(1024); socketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] bytes=new byte[attachment.remaining()]; attachment.get(bytes); try { String body=new String(bytes,"UTF-8"); System.out.println("Now is "+body); latch.countDown(); }catch (UnsupportedEncodingException e){ e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { socketChannel.close(); latch.countDown(); }catch (IOException e){ } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { socketChannel.close(); latch.countDown(); }catch (IOException e){ } } }); } @Override public void failed(Throwable exc, AsyncTimeClientHandler attachment) { exc.printStackTrace(); try { socketChannel.close(); latch.countDown(); }catch (IOException e){ e.printStackTrace(); } } }