zoukankan      html  css  js  c++  java
  • netty权威指南学习笔记一——NIO入门(4)AIO

      NIO2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供以下两种方式获取操作结果。

      1、通过java.util.concurrent.Future 类来表示异步操作的结果;

      2、在执行异步操作的时候传入一个java.io.channels。

      ComplementHandler接口的实现类作为操作完成的回调。

      NIO2.0的异步套接字通道是真正的异步非阻塞I/O,它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO编程模型。

      改造后的代码

    server端代码:

      

     1 package com.example.biodemo;
     2 
     3 
     4 import java.io.*;
     5 import java.net.ServerSocket;
     6 import java.net.Socket;
     7 
     8 public class TimeServer {
     9     public static void main(String[] args) throws IOException {
    10         int port = 8092;
    11         if (args != null && args.length > 0) {
    12             try {
    13                 port = Integer.valueOf(args[0]);
    14             } catch (NumberFormatException e) {
    15                 port = 8092;
    16             }
    17         }
    18         AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
    19         new Thread(timeServer,"AsychronousTimeServerHandler").start();
    20 
    21 
    22 
    23         // ===================改造代码为AIO将以下内容注释掉=================================
    24 
    25 //        创建多路复用线程类并初始化多路复用器,绑定端口等以及轮询注册功能
    26  /*       MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
    27 //        启动多路复用类线程负责轮询多路复用器Selector,IO数据处理等操作
    28         new Thread(timeServer, "NIO-MultiplexerTimeSever-001").start();*/
    29 
    30 // ===================以下内容注释掉=================================
    31 
    32        /* ServerSocket server = null;
    33         try {
    34             server = new ServerSocket(port);
    35             System.out.println("the timeServer is start in port :" + port);
    36             Socket socket = null;
    37 //           引入线程池start
    38             TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(50,10000);
    39             while (true) {
    40                 socket = server.accept();
    41 //           替换BIO中new Thread(new TimeServerHandler(socket)).start();为下一行代码
    42                 singleExecutor.execute(new TimeServerHandler(socket));
    43 //           引入线程池end
    44 
    45             }
    46         } finally {
    47             if (server != null) {
    48                 System.out.println("the time server close");
    49                 server.close();
    50                 server = null;
    51             }
    52         }*/
    53 
    54     }
    55 }

    异步时间服务处理器

     1 package com.example.biodemo;
     2 
     3 import java.io.IOException;
     4 import java.net.InetSocketAddress;
     5 import java.nio.channels.AsynchronousServerSocketChannel;
     6 import java.util.concurrent.CountDownLatch;
     7 
     8 public class AsyncTimeServerHandler implements Runnable{
     9      private int port;
    10 //     添加它作用是在完成一组正在执行的操作之前,允许当前线程一直阻塞
    11      CountDownLatch latch;
    12      AsynchronousServerSocketChannel asynchronousServerSocketChannel;
    13     public AsyncTimeServerHandler(int port) {
    14         this.port=port;
    15         try {
    16 //            创建异步服务套接字通道
    17             asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
    18 //            绑定监听端口
    19             asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
    20             System.out.println("The time server is start in port"+port);
    21         } catch (IOException e) {
    22             e.printStackTrace();
    23         }
    24     }
    25 
    26     @Override
    27     public void run() {
    28     latch =new CountDownLatch(1);
    29 //    接收客户端的连接
    30     doAccept();
    31         try {
    32             latch.await();
    33         } catch (InterruptedException e) {
    34             e.printStackTrace();
    35         }
    36 
    37     }
    38 
    39     private void doAccept() {
    40 //        AcceptCompletionHandler用于接收accept操作成功的通知消息
    41 //        accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler):接受连接,并为连接绑定一个CompletionHandler处理Socket连接
    42         asynchronousServerSocketChannel.accept(this,new AcceptCompletionHandler());
    43     }
    44 }

    AcceptCompletionHandle

     1 package com.example.biodemo;
     2 
     3 import java.nio.ByteBuffer;
     4 import java.nio.channels.AsynchronousServerSocketChannel;
     5 import java.nio.channels.AsynchronousSocketChannel;
     6 import java.nio.channels.CompletionHandler;
     7 
     8 public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler>{
     9 
    10     @Override
    11     public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
    12         //再次让asynchronousServerSocketChannel对象调用accept方法是
    13         //调用AsynchronousServerSocketChannel的accept方法后,如果有新的客户端接入,
    14         // 系统将回调我们传入的CompletionHandler实例的completed方法,表示新客户端连接成功。
    15         // 因为AsynchronousServerSocketChannel可以接受成千上万个客户端,所以需要继续调用它的accept方法,
    16         // 接受其他客户端连接,最终形成一个环;每当一个客户端连接成功后,再异步接受新的客户端连接
    17         attachment.asynchronousServerSocketChannel.accept(attachment,this);
    18 //        据上,链路建立成功,服务端需要接受客户端新请求消息,
    19         ByteBuffer buffer = ByteBuffer.allocate(1024);
    20 //        调用read方法进行异步读操作
    21         result.read(buffer,buffer,new ReadCompletionHandler(result));
    22     }
    23 
    24     @Override
    25     public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
    26         exc.printStackTrace();
    27         attachment.latch.countDown();
    28     }
    29 
    30 
    31 }

    ReadCompletionHandler

     1 package com.example.biodemo;
     2 
     3 import java.io.IOException;
     4 import java.io.UnsupportedEncodingException;
     5 import java.nio.ByteBuffer;
     6 import java.nio.channels.AsynchronousSocketChannel;
     7 import java.nio.channels.CompletionHandler;
     8 import java.util.Date;
     9 
    10 public class ReadCompletionHandler implements CompletionHandler<Integer,ByteBuffer> {
    11 
    12     private AsynchronousSocketChannel channel;
    13 
    14     public ReadCompletionHandler(AsynchronousSocketChannel channel) {
    15         if(this.channel==null){
    16             this.channel=channel;
    17         }
    18     }
    19 
    20     @Override
    21     public void completed(Integer result, ByteBuffer attachment) {
    22         attachment.flip();
    23         byte[] body = new byte[attachment.remaining()];
    24         attachment.get(body);
    25         try {
    26             String req = new String(body,"utf-8");
    27             System.out.println("The time server recerve order : "+req);
    28             String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
    29             doWriter(currentTime);
    30         } catch (UnsupportedEncodingException e) {
    31             e.printStackTrace();
    32         }
    33     }
    34 
    35     private void doWriter(String currentTime) {
    36         if (currentTime != null && currentTime.trim().length() > 0) {
    37             byte[] bytes = currentTime.getBytes();
    38             ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
    39             writeBuffer.put(bytes);
    40             writeBuffer.flip();
    41             channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
    42                 @Override
    43                 public void completed(Integer result, ByteBuffer attachment) {
    44                     //如果没有发送完继续发送
    45                     if (attachment.hasRemaining()) {
    46                         channel.write(attachment, attachment, this);
    47                     }
    48                 }
    49                 @Override
    50                 public void failed(Throwable exc, ByteBuffer attachment) {
    51                     try {
    52                         channel.close();
    53                     } catch (IOException e) {
    54                     }
    55                 }
    56             });
    57             }
    58     }
    59     @Override
    60     public void failed(Throwable exc, ByteBuffer attachment) {
    61         try {
    62             this.channel.close();
    63         } catch (IOException e) {
    64             e.printStackTrace();
    65         }
    66     }
    67 }

    客户端的代码

     1 package com.example.biodemo;
     2 
     3 import java.io.*;
     4 import java.net.Socket;
     5 
     6 public class TimeClient {
     7     public static void main(String[] args) {
     8         int port = 8092;
     9         if (args != null && args.length > 0) {
    10             try {
    11                 port = Integer.valueOf(args[0]);
    12             } catch (NumberFormatException ne) {
    13                 port = 8092;
    14             }
    15         }
    16         new Thread(new AsyncTimeClientHandler("127.0.0.1",port)).start();
    17 
    18 
    19 
    20 //        new Thread(new TimeClientHandles("127.0.0.1",port),"TimeClient-001").start();
    21       /* 代码改造注释掉以下代码
    22        Socket socket = null;
    23         BufferedReader in = null;
    24         PrintWriter out = null;
    25         try {
    26             socket = new Socket("127.0.0.1", port);
    27             System.out.println(socket.getInputStream());
    28             in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    29             out = new PrintWriter(socket.getOutputStream(), true);
    30             out.println("QUERY TIME ORDER");
    31             System.out.println("send order 2 server succeed.");
    32             String resp = in.readLine();
    33             System.out.println("now is :" + resp);
    34         } catch (IOException e1) {
    35 
    36         } finally {
    37             if (out != null) {
    38                 out.close();
    39                 out = null;
    40             }
    41 
    42             if (in != null) {
    43                 try {
    44                     in.close();
    45                 } catch (IOException e2) {
    46                     e2.printStackTrace();
    47                 }
    48                 in = null;
    49                 if (socket != null) {
    50                     try {
    51                         socket.close();
    52                     } catch (IOException e3) {
    53                         e3.printStackTrace();
    54                     }
    55 
    56                 }
    57                 socket = null;
    58             }
    59         }*/
    60     }
    61 }

    AsyncTimeClientHandler

      1 package com.example.biodemo;
      2 
      3 import java.io.IOException;
      4 import java.io.UnsupportedEncodingException;
      5 import java.net.InetSocketAddress;
      6 import java.nio.ByteBuffer;
      7 import java.nio.channels.AsynchronousSocketChannel;
      8 import java.nio.channels.CompletionHandler;
      9 import java.util.concurrent.CountDownLatch;
     10 
     11 public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
     12     private AsynchronousSocketChannel asynchronousSocketChannel;
     13     private String host;
     14     private int port;
     15     private CountDownLatch latch;
     16 
     17     public AsyncTimeClientHandler(String host, int port) {
     18         this.host = host == null ? "127.0.0.1" : host;
     19         this.port = port;
     20         try {
     21             asynchronousSocketChannel = AsynchronousSocketChannel.open();
     22         } catch (IOException e) {
     23             e.printStackTrace();
     24         }
     25     }
     26 
     27     @Override
     28     public void run() {
     29         latch = new CountDownLatch(1);
     30         asynchronousSocketChannel.connect(new InetSocketAddress(host, port), this, this);
     31         try {
     32             latch.await();
     33         } catch (InterruptedException e) {
     34             e.printStackTrace();
     35         }
     36         try {
     37             asynchronousSocketChannel.close();
     38         } catch (IOException e) {
     39             e.printStackTrace();
     40         }
     41     }
     42 
     43 
     44     @Override
     45     public void completed(Void result, AsyncTimeClientHandler attachment) {
     46         byte[] req = "QUERY TIME ORDER".getBytes();
     47         ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
     48         writeBuffer.put(req);
     49         writeBuffer.flip();
     50         asynchronousSocketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
     51             @Override
     52             public void completed(Integer result, ByteBuffer attachment) {
     53                 if (attachment.hasRemaining()) {
     54                     asynchronousSocketChannel.write(attachment, attachment, this);
     55                 } else {
     56                     ByteBuffer readBuffer = ByteBuffer.allocate(1024);
     57                     asynchronousSocketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
     58                         @Override
     59                         public void completed(Integer result, ByteBuffer attachment) {
     60                             attachment.flip();
     61                             byte[] bytes = new byte[attachment.remaining()];
     62                             attachment.get(bytes);
     63                             String body = null;
     64                             try {
     65                                 body = new String(bytes, "utf-8");
     66                                 System.out.println(body);
     67                                 System.out.println("Now is :" + body);
     68                                 latch.countDown();
     69                             } catch (UnsupportedEncodingException e) {
     70                                 e.printStackTrace();
     71                             }
     72 
     73                         }
     74 
     75                         @Override
     76                         public void failed(Throwable exc, ByteBuffer attachment) {
     77                             try {
     78                                 asynchronousSocketChannel.close();
     79                                 latch.countDown();
     80                             } catch (IOException e) {
     81                                 e.printStackTrace();
     82                             }
     83                         }
     84                     });
     85                 }
     86             }
     87 
     88             @Override
     89             public void failed(Throwable exc, ByteBuffer attachment) {
     90                 try {
     91                     asynchronousSocketChannel.close();
     92                     latch.countDown();
     93                 } catch (IOException e) {
     94                     e.printStackTrace();
     95                 }
     96             }
     97         });
     98     }
     99 
    100     @Override
    101     public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
    102         exc.printStackTrace();
    103         try {
    104             asynchronousSocketChannel.close();
    105             latch.countDown();
    106         } catch (IOException e) {
    107             e.printStackTrace();
    108         }
    109     }
    110 }

      学到这里我们对NIO编程已经有了一个感性认识。

  • 相关阅读:
    操作系统基础知识
    os库基本介绍
    原型模式
    ASP .NetCore 部署500错误 查看异常详情
    css设置文本自动换行
    SqlServer数据库链接字符串
    Json列表数据查找更新
    VB中将类标记为可序列化
    VB 性能优化点
    参加公司工作总结会要准备的内容 IT 技术部
  • 原文地址:https://www.cnblogs.com/xiaoyao-001/p/9279522.html
Copyright © 2011-2022 走看看