zoukankan      html  css  js  c++  java
  • Netty权威指南(笔记一)

    转载:http://blog.csdn.net/clarkkentyang/article/details/52529785

    第一章(略)

    第二章 NIO入门

    2.1传统的BIO编程(同步阻塞I/O服务端通信模型【一客户一线程】)

    网络编程的基本模型:Client/Server模型,也就是2个进程之间进行相互通信,其中服务端提供位置信息(绑定的IP地址和监听端口),客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接,如果连接建立成功,双方就可以通过网络套接字(Socket)进行通信。

    传统同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功之后,双方通过输入和输出流进行同步阻塞式通信。

     

    缺点:缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,由于线程是Java虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者僵死,不能对外提供服务。

     

    服务端代码:

     

    [java] view plain copy
     
    1. <span style="white-space:pre">    </span>public static void main(String[] args) throws IOException {  
    2.         int port = 8080;  
    3.         if (args != null && args.length > 0) {  
    4.   
    5.             try {  
    6.                 port = Integer.valueOf(args[0]);  
    7.             } catch (NumberFormatException e) {  
    8.                 // 采用默认值  
    9.             }  
    10.   
    11.         }  
    12.         ServerSocket server = null;  
    13.         try {  
    14.             server = new ServerSocket(port);  
    15.             System.out.println("The time server is start in port : " + port);  
    16.             Socket socket = null;  
    17.             while (true) {  
    18.                 socket = server.accept();  
    19.                 new Thread(new TimeServerHandler(socket)).start();  
    20.             }  
    21.         } finally {  
    22.             if (server != null) {  
    23.                 System.out.println("The time server close");  
    24.                 server.close();  
    25.                 server = null;  
    26.             }  
    27.         }  
    28.     }  

     

     

    TimeServerHandler代码:

     

    [java] view plain copy
     
    1. public class TimeServerHandler implements Runnable {  
    2.   
    3.     private Socket socket;  
    4.   
    5.     public TimeServerHandler(Socket socket) {  
    6.     this.socket = socket;  
    7.     }  
    8.   
    9.     /* 
    10.      * (non-Javadoc) 
    11.      *  
    12.      * @see java.lang.Runnable#run() 
    13.      */  
    14.     @Override  
    15.     public void run() {  
    16.     BufferedReader in = null;  
    17.     PrintWriter out = null;  
    18.     try {  
    19.         in = new BufferedReader(new InputStreamReader(  
    20.             this.socket.getInputStream()));  
    21.         out = new PrintWriter(this.socket.getOutputStream(), true);  
    22.         String currentTime = null;  
    23.         String body = null;  
    24.         while (true) {  
    25.         body = in.readLine();  
    26.         if (body == null)  
    27.             break;  
    28.         System.out.println("The time server receive order : " + body);  
    29.         currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(  
    30.             System.currentTimeMillis()).toString() : "BAD ORDER";  
    31.         out.println(currentTime);  
    32.         }  
    33.   
    34.     } catch (Exception e) {  
    35.         if (in != null) {  
    36.         try {  
    37.             in.close();  
    38.         } catch (IOException e1) {  
    39.             e1.printStackTrace();  
    40.         }  
    41.         }  
    42.         if (out != null) {  
    43.         out.close();  
    44.         out = null;  
    45.         }  
    46.         if (this.socket != null) {  
    47.         try {  
    48.             this.socket.close();  
    49.         } catch (IOException e1) {  
    50.             e1.printStackTrace();  
    51.         }  
    52.         this.socket = null;  
    53.         }  
    54.     }  
    55.     }  
    56. }  



     


    客户端代码:

     

    [java] view plain copy
     
    1. <span style="white-space:pre">    </span>public static void main(String[] args) {  
    2.   
    3.     int port = 8080;  
    4.     if (args != null && args.length > 0) {  
    5.   
    6.         try {  
    7.         port = Integer.valueOf(args[0]);  
    8.         } catch (NumberFormatException e) {  
    9.         // 采用默认值  
    10.         }  
    11.   
    12.     }  
    13.     Socket socket = null;  
    14.     BufferedReader in = null;  
    15.     PrintWriter out = null;  
    16.     try {  
    17.         socket = new Socket("127.0.0.1", port);  
    18.         in = new BufferedReader(new InputStreamReader(  
    19.             socket.getInputStream()));  
    20.         out = new PrintWriter(socket.getOutputStream(), true);  
    21.         out.println("QUERY TIME ORDER");  
    22.         System.out.println("Send order 2 server succeed.");  
    23.         String resp = in.readLine();  
    24.         System.out.println("Now is : " + resp);  
    25.     } catch (Exception e) {  
    26.         e.printStackTrace();  
    27.     } finally {  
    28.         if (out != null) {  
    29.         out.close();  
    30.         out = null;  
    31.         }  
    32.   
    33.         if (in != null) {  
    34.         try {  
    35.             in.close();  
    36.         } catch (IOException e) {  
    37.             e.printStackTrace();  
    38.         }  
    39.         in = null;  
    40.         }  
    41.   
    42.         if (socket != null) {  
    43.         try {  
    44.             socket.close();  
    45.         } catch (IOException e) {  
    46.             e.printStackTrace();  
    47.         }  
    48.         socket = null;  
    49.         }  
    50.     }  
    51.     }  


    2.2伪异步I/O编程

     

    在BIO的基础上进行优化,后端通过一个线程池来处理多个客户端的请求接入,通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。

     

    服务端代码:

     

    [java] view plain copy
     
    1.    public static void main(String[] args) throws IOException {  
    2. int port = 8080;  
    3. if (args != null && args.length > 0) {  
    4.   
    5.     try {  
    6.     port = Integer.valueOf(args[0]);  
    7.     } catch (NumberFormatException e) {  
    8.     // 采用默认值  
    9.     }  
    10.   
    11. }  
    12. ServerSocket server = null;  
    13. try {  
    14.     server = new ServerSocket(port);  
    15.     System.out.println("The time server is start in port : " + port);  
    16.     Socket socket = null;  
    17.     TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(  
    18.         50, 10000);// 创建IO任务线程池  
    19.     while (true) {  
    20.     socket = server.accept();  
    21.     singleExecutor.execute(new TimeServerHandler(socket));  
    22.     }  
    23. finally {  
    24.     if (server != null) {  
    25.     System.out.println("The time server close");  
    26.     server.close();  
    27.     server = null;  
    28.     }  
    29. }  
    30.    }  


    连接池代码:

     

    [java] view plain copy
     
    1. public class TimeServerHandlerExecutePool {  
    2.   
    3.     private ExecutorService executor;  
    4.   
    5.     public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {  
    6.     executor = new ThreadPoolExecutor(Runtime.getRuntime()  
    7.         .availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS,  
    8.         new ArrayBlockingQueue<java.lang.Runnable>(queueSize));  
    9.     }  
    10.   
    11.     public void execute(java.lang.Runnable task) {  
    12.     executor.execute(task);  
    13.     }  
    14. }  


    优点:避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。

     

    缺点:但是由于它底层的通信依然采用同步阻塞模型,因此无法从跟本上解决问题。

     

    2.3 NIO编程

    NIO为New I/O的简称,也是非阻塞I/O。

    INO提供了SocketChannel和ServerSocketChannel两种不同的套接字通道实现。这两种新增的通道都支持阻塞和非阻塞两种模式。开发人员可以根据自己需求选择合适的模式。一般来说,低负载,低并发的应用程序可以选择同步阻塞I/O以降低编程复杂度,但是对于高负载,高并发的网络应用,需要使用NIO的非阻塞模式进行开发。

     

    2.3.1 NIO类库解析

     

    1.缓冲区Buffer

    最常用的缓冲区为ByteBuffer,提供了一组功能用于操作数组。除此之外,还有CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer。

     

    2.通道Channel

    是一个通道,主要通过它读取和写入数据。

    Channel主要分为网络读写SelectableChannel和文件操作FileChannel

    Netty主要涉及ServerSocketChannel和SocketChannel都是SelectableChannel的子类

     

    3.多路复用器Selector

    不断的扫描新的TCP连接接入、读和写事件的Channel,如果有,Channel就会处于就绪状态,被Selector轮询出来,然后通过selectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

     

    2.3.2 NIO源码

    1.服务端代码

     

    [java] view plain copy
     
    1. public static void main(String[] args) throws IOException {  
    2. int port = 8080;  
    3. if (args != null && args.length > 0) {  
    4.     try {  
    5.     port = Integer.valueOf(args[0]);  
    6.     } catch (NumberFormatException e) {  
    7.     // 采用默认值  
    8.     }  
    9. }  
    10. MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);  
    11. new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();  

     

     

    2.MultiplexerTimeServer类

    [java] view plain copy
     
    1. public class MultiplexerTimeServer implements Runnable {  
    2.   
    3.     private Selector selector;  
    4.   
    5.     private ServerSocketChannel servChannel;  
    6.   
    7.     private volatile boolean stop;  
    8.   
    9.     /** 
    10.      * 初始化多路复用器、绑定监听端口 
    11.      *  
    12.      * @param port 
    13.      */  
    14.     public MultiplexerTimeServer(int port) {  
    15.     try {  
    16.         selector = Selector.open();  
    17.         servChannel = ServerSocketChannel.open();  
    18.         servChannel.configureBlocking(false);  
    19.         servChannel.socket().bind(new InetSocketAddress(port), 1024);  
    20.         servChannel.register(selector, SelectionKey.OP_ACCEPT);  
    21.         System.out.println("The time server is start in port : " + port);  
    22.     } catch (IOException e) {  
    23.         e.printStackTrace();  
    24.         System.exit(1);  
    25.     }  
    26.     }  
    27.   
    28.     public void stop() {  
    29.     this.stop = true;  
    30.     }  
    31.   
    32.     /* 
    33.      * (non-Javadoc) 
    34.      *  
    35.      * @see java.lang.Runnable#run() 
    36.      */  
    37.     @Override  
    38.     public void run() {  
    39.     while (!stop) {  
    40.         try {  
    41.         selector.select(1000);  
    42.         Set<SelectionKey> selectedKeys = selector.selectedKeys();  
    43.         Iterator<SelectionKey> it = selectedKeys.iterator();  
    44.         SelectionKey key = null;  
    45.         while (it.hasNext()) {  
    46.             key = it.next();  
    47.             it.remove();  
    48.             try {  
    49.             handleInput(key);  
    50.             } catch (Exception e) {  
    51.             if (key != null) {  
    52.                 key.cancel();  
    53.                 if (key.channel() != null)  
    54.                 key.channel().close();  
    55.             }  
    56.             }  
    57.         }  
    58.         } catch (Throwable t) {  
    59.         t.printStackTrace();  
    60.         }  
    61.     }  
    62.   
    63.     // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源  
    64.     if (selector != null)  
    65.         try {  
    66.         selector.close();  
    67.         } catch (IOException e) {  
    68.         e.printStackTrace();  
    69.         }  
    70.     }  
    71.   
    72.     private void handleInput(SelectionKey key) throws IOException {  
    73.   
    74.     if (key.isValid()) {  
    75.         // 处理新接入的请求消息  
    76.         if (key.isAcceptable()) {  
    77.         // Accept the new connection  
    78.         ServerSocketChannel ssc = (ServerSocketChannel) key.channel();  
    79.         SocketChannel sc = ssc.accept();  
    80.         sc.configureBlocking(false);  
    81.         // Add the new connection to the selector  
    82.         sc.register(selector, SelectionKey.OP_READ);  
    83.         }  
    84.         if (key.isReadable()) {  
    85.         // Read the data  
    86.         SocketChannel sc = (SocketChannel) key.channel();  
    87.         ByteBuffer readBuffer = ByteBuffer.allocate(1024);  
    88.         int readBytes = sc.read(readBuffer);  
    89.         if (readBytes > 0) {  
    90.             readBuffer.flip();  
    91.             byte[] bytes = new byte[readBuffer.remaining()];  
    92.             readBuffer.get(bytes);  
    93.             String body = new String(bytes, "UTF-8");  
    94.             System.out.println("The time server receive order : "  
    95.                 + body);  
    96.             String currentTime = "QUERY TIME ORDER"  
    97.                 .equalsIgnoreCase(body) ? new java.util.Date(  
    98.                 System.currentTimeMillis()).toString()  
    99.                 : "BAD ORDER";  
    100.             doWrite(sc, currentTime);  
    101.         } else if (readBytes < 0) {  
    102.             // 对端链路关闭  
    103.             key.cancel();  
    104.             sc.close();  
    105.         } else  
    106.             ; // 读到0字节,忽略  
    107.         }  
    108.     }  
    109.     }  
    110.   
    111.     private void doWrite(SocketChannel channel, String response)  
    112.         throws IOException {  
    113.     if (response != null && response.trim().length() > 0) {  
    114.         byte[] bytes = response.getBytes();  
    115.         ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);  
    116.         writeBuffer.put(bytes);  
    117.         writeBuffer.flip();  
    118.         channel.write(writeBuffer);  
    119.     }  
    120.     }  
    121. }  

     

    3.客户端代码

     

    [java] view plain copy
     
    1. public static void main(String[] args) {  
    2.   
    3.     int port = 8080;  
    4.     if (args != null && args.length > 0) {  
    5.         try {  
    6.         port = Integer.valueOf(args[0]);  
    7.         } catch (NumberFormatException e) {  
    8.         // 采用默认值  
    9.         }  
    10.     }  
    11.     new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001")  
    12.         .start();  
    13.     }  


    4.TimeClientHandle类

     

     

    [java] view plain copy
     
    1. public class TimeClientHandle implements Runnable {  
    2.   
    3.     private String host;  
    4.     private int port;  
    5.   
    6.     private Selector selector;  
    7.     private SocketChannel socketChannel;  
    8.   
    9.     private volatile boolean stop;  
    10.   
    11.     public TimeClientHandle(String host, int port) {  
    12.     this.host = host == null ? "127.0.0.1" : host;  
    13.     this.port = port;  
    14.     try {  
    15.         selector = Selector.open();  
    16.         socketChannel = SocketChannel.open();  
    17.         socketChannel.configureBlocking(false);  
    18.     } catch (IOException e) {  
    19.         e.printStackTrace();  
    20.         System.exit(1);  
    21.     }  
    22.     }  
    23.   
    24.     /* 
    25.      * (non-Javadoc) 
    26.      *  
    27.      * @see java.lang.Runnable#run() 
    28.      */  
    29.     @Override  
    30.     public void run() {  
    31.     try {  
    32.         doConnect();  
    33.     } catch (IOException e) {  
    34.         e.printStackTrace();  
    35.         System.exit(1);  
    36.     }  
    37.     while (!stop) {  
    38.         try {  
    39.         selector.select(1000);  
    40.         Set<SelectionKey> selectedKeys = selector.selectedKeys();  
    41.         Iterator<SelectionKey> it = selectedKeys.iterator();  
    42.         SelectionKey key = null;  
    43.         while (it.hasNext()) {  
    44.             key = it.next();  
    45.             it.remove();  
    46.             try {  
    47.             handleInput(key);  
    48.             } catch (Exception e) {  
    49.             if (key != null) {  
    50.                 key.cancel();  
    51.                 if (key.channel() != null)  
    52.                 key.channel().close();  
    53.             }  
    54.             }  
    55.         }  
    56.         } catch (Exception e) {  
    57.         e.printStackTrace();  
    58.         System.exit(1);  
    59.         }  
    60.     }  
    61.   
    62.     // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源  
    63.     if (selector != null)  
    64.         try {  
    65.         selector.close();  
    66.         } catch (IOException e) {  
    67.         e.printStackTrace();  
    68.         }  
    69.   
    70.     }  
    71.   
    72.     private void handleInput(SelectionKey key) throws IOException {  
    73.   
    74.     if (key.isValid()) {  
    75.         // 判断是否连接成功  
    76.         SocketChannel sc = (SocketChannel) key.channel();  
    77.         if (key.isConnectable()) {  
    78.         if (sc.finishConnect()) {  
    79.             sc.register(selector, SelectionKey.OP_READ);  
    80.             doWrite(sc);  
    81.         } else  
    82.             System.exit(1);// 连接失败,进程退出  
    83.         }  
    84.         if (key.isReadable()) {  
    85.         ByteBuffer readBuffer = ByteBuffer.allocate(1024);  
    86.         int readBytes = sc.read(readBuffer);  
    87.         if (readBytes > 0) {  
    88.             readBuffer.flip();  
    89.             byte[] bytes = new byte[readBuffer.remaining()];  
    90.             readBuffer.get(bytes);  
    91.             String body = new String(bytes, "UTF-8");  
    92.             System.out.println("Now is : " + body);  
    93.             this.stop = true;  
    94.         } else if (readBytes < 0) {  
    95.             // 对端链路关闭  
    96.             key.cancel();  
    97.             sc.close();  
    98.         } else  
    99.             ; // 读到0字节,忽略  
    100.         }  
    101.     }  
    102.   
    103.     }  
    104.   
    105.     private void doConnect() throws IOException {  
    106.     // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答  
    107.     if (socketChannel.connect(new InetSocketAddress(host, port))) {  
    108.         socketChannel.register(selector, SelectionKey.OP_READ);  
    109.         doWrite(socketChannel);  
    110.     } else  
    111.         socketChannel.register(selector, SelectionKey.OP_CONNECT);  
    112.     }  
    113.   
    114.     private void doWrite(SocketChannel sc) throws IOException {  
    115.     byte[] req = "QUERY TIME ORDER".getBytes();  
    116.     ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);  
    117.     writeBuffer.put(req);  
    118.     writeBuffer.flip();  
    119.     sc.write(writeBuffer);  
    120.     if (!writeBuffer.hasRemaining())  
    121.         System.out.println("Send order 2 server succeed.");  
    122.     }  
    123.   
    124. }  



     

    2.4 AIO编程

    JDK7的产物,即NIO2.0。

     

    1.服务端代码

     

    [java] view plain copy
     
    1. public class TimeServer {  
    2.   
    3.     /** 
    4.      * @param args 
    5.      * @throws IOException 
    6.      */  
    7.     public static void main(String[] args) throws IOException {  
    8.     int port = 8080;  
    9.     if (args != null && args.length > 0) {  
    10.         try {  
    11.         port = Integer.valueOf(args[0]);  
    12.         } catch (NumberFormatException e) {  
    13.         // 采用默认值  
    14.         }  
    15.     }  
    16.     AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);  
    17.     new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();  
    18.     }  
    19. }  


    2.AsyncTimeServerHandler类

     

     

    [java] view plain copy
     
    1. public class AsyncTimeServerHandler implements Runnable {  
    2.   
    3.     private int port;  
    4.   
    5.     CountDownLatch latch;  
    6.     AsynchronousServerSocketChannel asynchronousServerSocketChannel;  
    7.   
    8.     public AsyncTimeServerHandler(int port) {  
    9.     this.port = port;  
    10.     try {  
    11.         asynchronousServerSocketChannel = AsynchronousServerSocketChannel  
    12.             .open();  
    13.         asynchronousServerSocketChannel.bind(new InetSocketAddress(port));  
    14.         System.out.println("The time server is start in port : " + port);  
    15.     } catch (IOException e) {  
    16.         e.printStackTrace();  
    17.     }  
    18.     }  
    19.   
    20.     /* 
    21.      * (non-Javadoc) 
    22.      *  
    23.      * @see java.lang.Runnable#run() 
    24.      */  
    25.     @Override  
    26.     public void run() {  
    27.   
    28.     latch = new CountDownLatch(1);  
    29.     doAccept();  
    30.     try {  
    31.         latch.await();  
    32.     } catch (InterruptedException e) {  
    33.         e.printStackTrace();  
    34.     }  
    35.     }  
    36.   
    37.     public void doAccept() {  
    38.     asynchronousServerSocketChannel.accept(this,  
    39.         new AcceptCompletionHandler());  
    40.     }  
    41.   
    42. }  


    3.AcceptCompletionHandler类

     

     

    [java] view plain copy
     
    1. public class AcceptCompletionHandler implements  
    2.     CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {  
    3.   
    4.     @Override  
    5.     public void completed(AsynchronousSocketChannel result,  
    6.         AsyncTimeServerHandler attachment) {  
    7.     attachment.asynchronousServerSocketChannel.accept(attachment, this);  
    8.     ByteBuffer buffer = ByteBuffer.allocate(1024);  
    9.     result.read(buffer, buffer, new ReadCompletionHandler(result));  
    10.     }  
    11.   
    12.     @Override  
    13.     public void failed(Throwable exc, AsyncTimeServerHandler attachment) {  
    14.     exc.printStackTrace();  
    15.     attachment.latch.countDown();  
    16.     }  
    17.   
    18. }  


    4.ReadCompletionHandler类

     

     

    [java] view plain copy
     
    1. public class ReadCompletionHandler implements  
    2.     CompletionHandler<Integer, ByteBuffer> {  
    3.   
    4.     private AsynchronousSocketChannel channel;  
    5.   
    6.     public ReadCompletionHandler(AsynchronousSocketChannel channel) {  
    7.     if (this.channel == null)  
    8.         this.channel = channel;  
    9.     }  
    10.   
    11.     @Override  
    12.     public void completed(Integer result, ByteBuffer attachment) {  
    13.     attachment.flip();  
    14.     byte[] body = new byte[attachment.remaining()];  
    15.     attachment.get(body);  
    16.     try {  
    17.         String req = new String(body, "UTF-8");  
    18.         System.out.println("The time server receive order : " + req);  
    19.         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(  
    20.             System.currentTimeMillis()).toString() : "BAD ORDER";  
    21.         doWrite(currentTime);  
    22.     } catch (UnsupportedEncodingException e) {  
    23.         e.printStackTrace();  
    24.     }  
    25.     }  
    26.   
    27.     private void doWrite(String currentTime) {  
    28.     if (currentTime != null && currentTime.trim().length() > 0) {  
    29.         byte[] bytes = (currentTime).getBytes();  
    30.         ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);  
    31.         writeBuffer.put(bytes);  
    32.         writeBuffer.flip();  
    33.         channel.write(writeBuffer, writeBuffer,  
    34.             new CompletionHandler<Integer, ByteBuffer>() {  
    35.             @Override  
    36.             public void completed(Integer result, ByteBuffer buffer) {  
    37.                 // 如果没有发送完成,继续发送  
    38.                 if (buffer.hasRemaining())  
    39.                 channel.write(buffer, buffer, this);  
    40.             }  
    41.   
    42.             @Override  
    43.             public void failed(Throwable exc, ByteBuffer attachment) {  
    44.                 try {  
    45.                 channel.close();  
    46.                 } catch (IOException e) {  
    47.                 // ingnore on close  
    48.                 }  
    49.             }  
    50.             });  
    51.     }  
    52.     }  
    53.   
    54.     @Override  
    55.     public void failed(Throwable exc, ByteBuffer attachment) {  
    56.     try {  
    57.         this.channel.close();  
    58.     } catch (IOException e) {  
    59.         e.printStackTrace();  
    60.     }  
    61.     }  
    62.   
    63. }  


    5.客户端代码

     

     

    [java] view plain copy
     
    1. public class TimeClient {  
    2.   
    3.     /** 
    4.      * @param args 
    5.      */  
    6.     public static void main(String[] args) {  
    7.     int port = 8080;  
    8.     if (args != null && args.length > 0) {  
    9.         try {  
    10.         port = Integer.valueOf(args[0]);  
    11.         } catch (NumberFormatException e) {  
    12.         // 采用默认值  
    13.         }  
    14.   
    15.     }  
    16.     new Thread(new AsyncTimeClientHandler("127.0.0.1", port),  
    17.         "AIO-AsyncTimeClientHandler-001").start();  
    18.   
    19.     }  
    20. }  


    6.AsyncTimeClientHandler

     

     

    [java] view plain copy
     
    1. public class AsyncTimeClientHandler implements  
    2.     CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {  
    3.   
    4.     private AsynchronousSocketChannel client;  
    5.     private String host;  
    6.     private int port;  
    7.     private CountDownLatch latch;  
    8.   
    9.     public AsyncTimeClientHandler(String host, int port) {  
    10.     this.host = host;  
    11.     this.port = port;  
    12.     try {  
    13.         client = AsynchronousSocketChannel.open();  
    14.     } catch (IOException e) {  
    15.         e.printStackTrace();  
    16.     }  
    17.     }  
    18.   
    19.     @Override  
    20.     public void run() {  
    21.   
    22.     latch = new CountDownLatch(1);  
    23.     client.connect(new InetSocketAddress(host, port), this, this);  
    24.     try {  
    25.         latch.await();  
    26.     } catch (InterruptedException e1) {  
    27.         e1.printStackTrace();  
    28.     }  
    29.     try {  
    30.         client.close();  
    31.     } catch (IOException e) {  
    32.         e.printStackTrace();  
    33.     }  
    34.     }  
    35.   
    36.     @Override  
    37.     public void completed(Void result, AsyncTimeClientHandler attachment) {  
    38.     byte[] req = "QUERY TIME ORDER".getBytes();  
    39.     ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);  
    40.     writeBuffer.put(req);  
    41.     writeBuffer.flip();  
    42.     client.write(writeBuffer, writeBuffer,  
    43.         new CompletionHandler<Integer, ByteBuffer>() {  
    44.             @Override  
    45.             public void completed(Integer result, ByteBuffer buffer) {  
    46.             if (buffer.hasRemaining()) {  
    47.                 client.write(buffer, buffer, this);  
    48.             } else {  
    49.                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);  
    50.                 client.read(  
    51.                     readBuffer,  
    52.                     readBuffer,  
    53.                     new CompletionHandler<Integer, ByteBuffer>() {  
    54.                     @Override  
    55.                     public void completed(Integer result,  
    56.                         ByteBuffer buffer) {  
    57.                         buffer.flip();  
    58.                         byte[] bytes = new byte[buffer  
    59.                             .remaining()];  
    60.                         buffer.get(bytes);  
    61.                         String body;  
    62.                         try {  
    63.                         body = new String(bytes,  
    64.                             "UTF-8");  
    65.                         System.out.println("Now is : "  
    66.                             + body);  
    67.                         latch.countDown();  
    68.                         } catch (UnsupportedEncodingException e) {  
    69.                         e.printStackTrace();  
    70.                         }  
    71.                     }  
    72.   
    73.                     @Override  
    74.                     public void failed(Throwable exc,  
    75.                         ByteBuffer attachment) {  
    76.                         try {  
    77.                         client.close();  
    78.                         latch.countDown();  
    79.                         } catch (IOException e) {  
    80.                         // ingnore on close  
    81.                         }  
    82.                     }  
    83.                     });  
    84.             }  
    85.             }  
    86.   
    87.             @Override  
    88.             public void failed(Throwable exc, ByteBuffer attachment) {  
    89.             try {  
    90.                 client.close();  
    91.                 latch.countDown();  
    92.             } catch (IOException e) {  
    93.                 // ingnore on close  
    94.             }  
    95.             }  
    96.         });  
    97.     }  
    98.   
    99.     @Override  
    100.     public void failed(Throwable exc, AsyncTimeClientHandler attachment) {  
    101.     exc.printStackTrace();  
    102.     try {  
    103.         client.close();  
    104.         latch.countDown();  
    105.     } catch (IOException e) {  
    106.         e.printStackTrace();  
    107.     }  
    108.     }  
    109.   
    110. }  


    2.5  4中I/O对比

     

     

  • 相关阅读:
    python学习之ajax和可视化管理工具
    操作系统-保护模式中的特权级下
    redis 分布式锁的 5个坑,真是又大又深
    数据库之数据表控制语句
    【NoSQL】Consul中服务注册的两种方式
    netstat命令使用方法以及详解
    Dockerfile与Dockerfile实战
    Spring boot+redis实现消息发布与订阅
    怎么寻回位置不可用移动硬盘的数据
    python字符前面u,r,f等含义
  • 原文地址:https://www.cnblogs.com/douyamv/p/6585455.html
Copyright © 2011-2022 走看看