zoukankan      html  css  js  c++  java
  • Java中的BIO、NIO、AIO-3

    Java中的BIO、NIO、AIO-3

    这一篇是代码篇,敲代码有助于理解记忆这些抽象的东西:

    参考资料:

    目录

    Java BIO代码

    服务器

    1. package sock; 
    2.  
    3. import java.io.BufferedReader; 
    4. import java.io.IOException; 
    5. import java.io.InputStreamReader; 
    6. import java.io.PrintWriter; 
    7. import java.net.ServerSocket; 
    8. import java.net.Socket; 
    9. import java.util.ArrayList; 
    10. import java.util.LinkedList; 
    11. import java.util.List; 
    12.  
    13.  
    14.  
    15. /* 
    16. 存在多线程对数据结构并发操作不安全的问题 
    17. */ 
    18. public class socketServerT extends ServerSocket
    19. private static final int port = 2018
    20. private static boolean isPrint = false
    21. private static List<String> user_list = new ArrayList(); 
    22. private static List<ServerThread> threadlist = new ArrayList<>();//应该使用线程安全的集合 
    23. private static LinkedList<String> message = new LinkedList<>(); 
    24.  
    25.  
    26. socketServerT() throws IOException{ 
    27. super(port); 
    28. new PrintOutThread(); 
    29. System.out.println( " server is created"); 
    30. try
    31. while(true){ 
    32. Socket sock = this.accept(); 
    33. new ServerThread(sock); 

    34. }catch( Exception e){ 
    35. e.printStackTrace(); 


    36. class PrintOutThread extends Thread
    37. PrintOutThread(){ 
    38. System.out.println(getName() + "!!!!!"); 
    39. start(); 

    40. public void run()
    41. while(true){ 
    42. if(isPrint){ 
    43. String m = message.getFirst(); 
    44. for(ServerThread i : threadlist){ 
    45. sendMessage(i,m); 

    46. message.removeFirst(); 
    47. isPrint = message.size()>0 ? true:false




    48. class ServerThread extends Thread
    49. private BufferedReader rec; 
    50. private PrintWriter send; 
    51. private Socket client; 
    52. private String name; 
    53.  
    54. ServerThread(Socket sock) throws IOException{ 
    55. client = sock; 
    56. rec = new BufferedReader(new InputStreamReader(client.getInputStream())); 
    57. send = new PrintWriter(client.getOutputStream(),true); 
    58. //rec.readLine(); 
    59. System.out.println(getName() + "is created"); 
    60. send.println("connected to chat room, please input your name!!"); 
    61. start(); 

    62. public PrintWriter getSend()
    63. return send; 

    64. public void run()
    65. try
    66. int flag = 0
    67. String line = ""
    68. while(!line.contains("bye")){ 
    69. line = rec.readLine(); 
    70. if("showuser".equals(line)){ 
    71. send.println(listOneUsers()); 
    72. //line = rec.readLine(); 
    73. continue

    74. if(flag == 0){ 
    75. flag ++; 
    76. name = line; 
    77. user_list.add(name); 
    78. threadlist.add(this); 
    79. send.println(name + " begin to chat"); 
    80. pushMessage("client <" + name+"> enter chat room"); 
    81. }else
    82. pushMessage("client <" + name+"> say :" + line); 

    83. //line = rec.readLine(); 

    84.  
    85. }catch (Exception e){ 
    86. e.printStackTrace(); 
    87. }finally
    88. try
    89. client.close(); 
    90. rec.close(); 
    91. send.close(); 
    92. }catch (IOException e){ 
    93. e.printStackTrace(); 

    94. threadlist.remove(this); 
    95. user_list.remove(name); 
    96. pushMessage("client <" + name+"> exit"); 

    97.  


    98. public void pushMessage(String mess)
    99. message.add(mess); 
    100. isPrint = true

    101. public String listOneUsers()
    102. StringBuffer s = new StringBuffer(); 
    103. s.append("---online users--- "); 
    104. for( String i:user_list){ 
    105. s.append(i + " "); 

    106. s.append("---end--- "); 
    107. return s.toString(); 

    108. public void sendMessage(ServerThread s,String m)
    109. //System.out.println("test"); 
    110. PrintWriter p = s.getSend(); 
    111. p.println(m); 
    112. //p.flush(); 

    113. public static void main(String args[])
    114. try
    115. socketServerT s = new socketServerT(); 
    116. }catch(Exception e){ 
    117. e.printStackTrace(); 



    118.  

    客户端

    1. package sock; 
    2.  
    3. import java.io.BufferedReader; 
    4. import java.io.IOException; 
    5. import java.io.InputStreamReader; 
    6. import java.io.PrintWriter; 
    7. import java.net.Socket; 
    8.  
    9. public class socketClientT extends Socket
    10. private static final String server = "127.0.0.1"
    11. private static final int port = 2018
    12.  
    13. private Socket sock; 
    14. private PrintWriter send; 
    15. private BufferedReader rec; 
    16. socketClientT() throws IOException{ 
    17. super(server,port); 
    18. sock = this
    19. send = new PrintWriter(sock.getOutputStream(),true); 
    20. rec = new BufferedReader(new InputStreamReader(sock.getInputStream())); 
    21. Thread t = new recvThread(); 
    22. BufferedReader sysBuff = new BufferedReader(new InputStreamReader(System.in)); 
    23. String line = ""
    24. while(! line.contains("bye")){ 
    25. line = sysBuff.readLine(); 
    26. send.println(line); 

    27. send.close(); 
    28. rec.close(); 
    29. sysBuff.close(); 
    30. this.close(); 

    31. class recvThread extends Thread
    32. private BufferedReader buff; 
    33. recvThread(){ 
    34. try
    35. buff = new BufferedReader(new InputStreamReader(sock.getInputStream())); 
    36. start(); 
    37. } catch (Exception e){ 
    38. e.printStackTrace(); 


    39. public void run()
    40. String res = ""
    41. try
    42. while(true){ 
    43. res = buff.readLine(); 
    44. if(res.contains("bye")) 
    45. break
    46. System.out.println(res); 

    47. send.close(); 
    48. buff.close(); 
    49. sock.close(); 
    50. }catch (Exception e){ 
    51. e.printStackTrace(); 



    52.  
    53. public static void main(String args[])
    54. try
    55. socketClientT s = new socketClientT(); 
    56. }catch (Exception e){ 
    57. e.printStackTrace(); 



    58.  

    Java NIO

    JDK 1.4的java.util.*;包中引入了新的Java I/O库,其目的是提高IO操作的速度。

    简介

    NIO我们一般认为是New I/O(也是官方的叫法),因为它是相对于老的I/O类库新增的(其实在JDK 1.4中就已经被引入了,但这个名词还会继续用很久,即使它们在现在看来已经是“旧”的了,所以也提示我们在命名时,需要好好考虑),做了很大的改变。但民间跟多人称之为Non-block I/O,即非阻塞I/O,因为这样叫,更能体现它的特点。而下文中的NIO,不是指整个新的I/O库,而是非阻塞I/O。
        NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。
        新增的着两种通道都支持阻塞和非阻塞两种模式。
    阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。
        对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用NIO的非阻塞模式来开发。
        下面会先对基础知识进行介绍。

    缓冲区Buffer

    Buffer是一个对象,包含一些要写入或者读出的数据。
    在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。
    缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息。
    具体的缓存区有这些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他们实现了相同的接口:Buffer。

    通道Channel

    我们对数据的读取和写入要通过Channel,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作。
    底层的操作系统的通道一般都是全双工的,所以全双工的Channel比流能更好的映射底层操作系统的API。
    Channel主要分两大类:

    • SelectableChannel:用户网络读写
    • FileChannel:用于文件操作

    后面代码会涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子类。

    多路复用器

    Selector是Java  NIO 编程的基础。
    Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。
    一个Selector可以同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。

    服务器端代码

    server:

    1. /** 
    2. * server 
    3. */ 
    4. public class server
    5.  
    6. private static int port = 8000
    7. private static serverHandle sHandle; 
    8.  
    9. public static void start()
    10. start(port); 

    11. private static synchronized void start(int port)
    12. if (sHandle != null) { 
    13. sHandle.setStarted(false); 

    14. sHandle = new serverHandle(port); 
    15. new Thread(sHandle,"server").start(); 
    16.  

    17. public static void main(String[] args)
    18. start(); 


    serverHandle

    1. import java.io.IOException; 
    2. import java.net.InetSocketAddress; 
    3. import java.net.ServerSocket; 
    4. import java.nio.ByteBuffer; 
    5. import java.nio.channels.SelectionKey; 
    6. import java.nio.channels.Selector; 
    7. import java.nio.channels.ServerSocketChannel; 
    8. import java.nio.channels.SocketChannel; 
    9. import java.util.Iterator; 
    10. import java.util.Set; 
    11.  
    12. //import jdk.internal.org.objectweb.asm.Handle; 
    13.  
    14. /** 
    15. * serverHandle 
    16. */ 
    17. public class serverHandle implements Runnable
    18.  
    19. private ServerSocketChannel serverChannel; 
    20. private Selector selector; 
    21. private volatile boolean started;//各个线程都能看到状态 
    22.  
    23. public serverHandle(int port)
    24. try
    25. //创建选择器 
    26. selector = Selector.open(); 
    27. //创建serverSocketChannel 
    28. serverChannel = ServerSocketChannel.open(); 
    29. //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式 
    30. serverChannel.configureBlocking(false); 
    31. //创建InetSocketAddress 
    32. InetSocketAddress socketAddress = new InetSocketAddress(port); 
    33. //得到ServerSocket 
    34. ServerSocket serverSocket = serverChannel.socket(); 
    35. //绑定ServetSocket到一个具体的端口,并设置backlog 
    36. serverSocket.bind(socketAddress, 1024); 
    37. //向selector注册ServerSocketChannel,设置为监听客户端的连接请求 
    38. serverChannel.register(selector, SelectionKey.OP_ACCEPT); 
    39. //标记服务器状态 
    40. started = true
    41. System.out.println("服务器已经启动,端口号:" + port); 
    42.  
    43. } catch (IOException e) { 
    44. //TODO: handle exception 
    45. e.printStackTrace(); 
    46. System.exit(1); 
    47. }  

    48.  
    49. public void setStarted(boolean flag)
    50. this.started = flag; 

    51. @Override 
    52. public void run()
    53. //循环遍历selector 
    54. while (started) { 
    55. try
    56. //无论是否有读写事件,selector每个1s唤醒一次 
    57.  
    58. try
    59. selector.select(1000); 
    60. } catch (Exception e) { 
    61. e.printStackTrace(); 

    62. //获得状态为ready的selectorkey 
    63. Set<SelectionKey> keys = selector.selectedKeys(); 
    64. Iterator<SelectionKey> iter = keys.iterator(); 
    65. SelectionKey key = null
    66. while (iter.hasNext()) { 
    67. key = iter.next(); 
    68. iter.remove(); 
    69. try
    70. handle(key); 
    71. } catch (Exception e) { 
    72.  
    73. if (key != null) { 
    74. key.cancel(); 
    75. if (key.channel() != null) { 
    76. key.channel().close(); 




    77. } catch (Throwable t) { 
    78.  
    79. t.printStackTrace(); 


    80. //关闭selector 
    81. if (selector != null) { 
    82. try
    83. selector.close(); 
    84. } catch (Exception e) { 
    85.  
    86. e.printStackTrace(); 



    87. private void handle(SelectionKey key) throws IOException
    88. //判断kei是否是有效的 
    89. if (key.isValid()) { 
    90.  
    91. //处理新接入的请求消息, 
    92. if (key.isAcceptable()) { 
    93. //通过selectionkey得到ServerSocketChannel,注意转型 
    94. ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); 
    95. //通过serversocketchannel的accept方法创建SocketChannel实例 
    96. SocketChannel client = serverSocketChannel.accept(); 
    97. //设置client为非阻塞模式 
    98. client.configureBlocking(false); 
    99. //把client注册到selector,注册事件为读 
    100. client.register(selector, SelectionKey.OP_READ); 

    101.  
    102. //读消息 
    103. if (key.isReadable()) { 
    104. SocketChannel sc = (SocketChannel) key.channel(); 
    105. //创建byteBuffer,大小为1M 
    106. ByteBuffer buffer = ByteBuffer.allocate(1024); 
    107. //读取请求码流,返回读取到的字节数 
    108. int readBytes = sc.read(buffer); 
    109. //读取到字节,对字节进行编码 
    110. if (readBytes > 0) { 
    111. //将缓冲区buffer的position设为0,用于后续对缓冲区的读操作 
    112. buffer.flip(); 
    113. //根据缓冲区可读字节数创建字节数组 
    114. byte[] bytes = new byte[buffer.remaining()]; 
    115. //将缓冲区可读字节数组复制到新建的数组中 
    116. buffer.get(bytes); 
    117. String expresString = new String(bytes, "UTF-8"); 
    118. System.out.println("服务器收到的消息:" + expresString); 
    119.  
    120. //处理数据 
    121. String res = null
    122. res = new StringBuffer(expresString).reverse().toString(); 
    123. //写入返回消息 
    124. dowrite(sc,res); 
    125. } else if (readBytes < 0) { 
    126. //链路关闭释放资源 
    127. key.cancel(); 
    128. sc.close(); 




    129. private void dowrite(SocketChannel sc, String res) throws IOException
    130. //把字符串编码为字节数组 
    131. byte[] bytes = res.getBytes(); 
    132. //根据数组容量创建ByteBuffer 
    133. ByteBuffer wBuffer = ByteBuffer.allocate(bytes.length); 
    134. //把字节数组复制到buffer中 
    135. wBuffer.put(bytes); 
    136. //flip操作,更改position为0,方便后续的写操作从头开始 
    137. wBuffer.flip(); 
    138. //发送缓冲区的数据 
    139. sc.write(wBuffer); 


    客户端代码

    client

    1. import java.util.Scanner; 
    2.  
    3. /** 
    4. * clientChannel 
    5. */ 
    6. public class clientChannel
    7. private static String host = "127.0.0.1"
    8. private static int port = 8000
    9. private static clientHandle cHandle; 
    10.  
    11. public static void start()
    12. start(host,port); 

    13. public static synchronized void start(String host, int port)
    14. if (cHandle != null) { 
    15. cHandle.stop(); 

    16. cHandle = new clientHandle(host, port); 
    17. new Thread(cHandle, "client").start();; 

    18.  
    19. public static Boolean sendMsg(String msg) throws Exception
    20. if (msg.contains("q")) { 
    21. return false

    22. cHandle.sendMsg(msg); 
    23. return true

    24. public static void main(String[] args)
    25. try
    26. start(); 
    27. Scanner s = new Scanner(System.in); 
    28. String tmp; 
    29. while ((tmp = s.nextLine())!= null) { 
    30. sendMsg(tmp); 

    31. } catch (Exception e) { 
    32. //TODO: handle exception 
    33. e.printStackTrace(); 

    34. //start(); 


    clientHandle

    1. import java.net.InetSocketAddress; 
    2. import java.nio.ByteBuffer; 
    3. import java.nio.channels.SelectionKey; 
    4. import java.nio.channels.Selector; 
    5. import java.nio.channels.SocketChannel; 
    6. import java.util.Iterator; 
    7. import java.util.Set; 
    8. import java.io.IOException; 
    9. /** 
    10. * clientHandle 
    11. */ 
    12. public class clientHandle implements Runnable
    13.  
    14. private String host; 
    15. private int port; 
    16. private Selector selector; 
    17. private SocketChannel socketChannel; 
    18. private volatile boolean started; 
    19.  
    20. public clientHandle(String ip, int port)
    21. this.host = ip; 
    22. this.port = port; 
    23.  
    24. try
    25. //创建选择器 
    26. this.selector = Selector.open(); 
    27. //创建socketchannel 
    28. this.socketChannel = SocketChannel.open(); 
    29. //配置socketChannel为非阻塞模式 
    30. this.socketChannel.configureBlocking(false); 
    31. this.started = true
    32.  
    33. } catch (Exception e) { 
    34. //TODO: handle exception 
    35. e.printStackTrace(); 
    36. System.exit(1); 

    37.  

    38.  
    39. public void stop()
    40. this.started = false

    41.  
    42. public void doConnection() throws Exception
    43. InetSocketAddress address = new InetSocketAddress(this.host, this.port); 
    44. if (socketChannel.connect(address)) { 
    45. System.out.println("连接服务器成功!!!"); 
    46. } else
    47. System.out.println("未连接成功,下一轮继续!!!"); 
    48. //向selector注册socketChannel的连接操作 
    49. socketChannel.register(selector, SelectionKey.OP_CONNECT); 


    50.  
    51. public void handleInput(SelectionKey key) throws Exception
    52. if (key.isValid()) { 
    53. //获取SeclectionKey对应的socketChannel 
    54. SocketChannel sc = (SocketChannel) key.channel(); 
    55. //测试key对应的socketChannel通道是否已完成或未能完成其套接连接操作 
    56. if (key.isConnectable()) { 
    57. //完成连接过程,返回true表示channel已经建立了连接,false表示建立连接失败 
    58. //当使用SocketChannel的connect()函数进行连接的时候,当处于非阻塞模式的情况下,可能连接不是立刻完成的,需要使用 
    59. //finidhConnect()来检查连接是否建立 
    60. if (sc.finishConnect()) { 
    61. } else
    62. System.exit(1); 
    63. }  

    64. //读消息,判断key对应的channel是否可读 
    65. if (key.isReadable()) { 
    66. //创建一个缓冲区,用来存读取的数据, 
    67. ByteBuffer buffer = ByteBuffer.allocate(1024); 
    68. //读取数据,返回读取的字节数 
    69. int readSize = sc.read(buffer); 
    70. //读取到字节,对字节进行编码 
    71. if (readSize > 0) { 
    72. //设置缓冲区的limit和position,方面后面读取数据 
    73. buffer.flip(); 
    74. //根据缓冲区的可读字节数创建字节数组 
    75. byte[] bytes = new byte[buffer.remaining()]; 
    76. //把缓冲区的内容复制到字节数组中去 
    77. buffer.get(bytes); 
    78. String res = new String(bytes, "UTF-8"); 
    79. System.out.println("客户端收到的数据为:" + res); 
    80. } else if (readSize < 0) { 
    81. //这种情况说明链路已经关闭,释放资源 
    82. key.cancel(); 
    83. sc.close(); 




    84.  
    85. public void doWrite(SocketChannel sc, String request) throws Exception
    86. //将数据转换为字节数组 
    87. byte[] bytes = request.getBytes(); 
    88. //创建字节缓冲区 
    89. ByteBuffer buffer = ByteBuffer.allocate(bytes.length); 
    90. //将字节数组放入字节缓冲区 
    91. buffer.put(bytes); 
    92. //flip操作,调整limit和position 
    93. buffer.flip(); 
    94. //将数据写入到channel中 
    95. sc.write(buffer); 

    96. public void sendMsg(String msg) throws Exception
    97. //这里还没明白为什么要先注册读操作?需要注册读操作才能,知道读状态是否就绪,方便handelInput函数处理!! 
    98. //但是还有一个疑问,什么时候使用OP_WRITE 
    99. socketChannel.register(selector, SelectionKey.OP_READ); 
    100. doWrite(socketChannel, msg); 

    101. @Override  
    102. public void run() {  
    103. try{  
    104. doConnection();  
    105. }catch(Exception e){  
    106. e.printStackTrace();  
    107. System.exit(1);  
    108. }  
    109. //循环遍历selector  
    110. while(started){  
    111. try{  
    112. //无论是否有读写事件发生,selector每隔1s被唤醒一次  
    113. selector.select(1000);  
    114. //阻塞,只有当至少一个注册的事件发生的时候才会继续.  
    115. // selector.select();  
    116. Set<SelectionKey> keys = selector.selectedKeys();  
    117. Iterator<SelectionKey> it = keys.iterator();  
    118. SelectionKey key = null;  
    119. while(it.hasNext()){  
    120. key = it.next();  
    121. it.remove();  
    122. try{  
    123. handleInput(key);  
    124. }catch(Exception e){  
    125. if(key != null){  
    126. key.cancel();  
    127. if(key.channel() != null){  
    128. key.channel().close();  
    129. }  
    130. }  
    131. }  
    132. }  
    133. }catch(Exception e){  
    134. e.printStackTrace();  
    135. System.exit(1);  
    136. }  
    137. }  
    138. //selector关闭后会自动释放里面管理的资源  
    139. if(selector != null)  
    140. try{  
    141. selector.close();  
    142. }catch (Exception e) {  
    143. e.printStackTrace();  
    144. }  


    ##测试及解析
    测试代码:

    1. import java.util.Scanner; 
    2.  
    3. /** 
    4. * Test 
    5. */ 
    6. public class Test
    7.  
    8. public static void main(String[] args)
    9. server.start(); 
    10. try
    11. Thread.sleep(3000); 
    12. } catch (Exception e) { 
    13. //TODO: handle exception 
    14. e.printStackTrace(); 

    15. clientChannel.start(); 
    16. Scanner s = new Scanner(System.in); 
    17. String tmp; 
    18. try
    19. while ((tmp = s.nextLine()) != null) { 
    20. clientChannel.sendMsg(tmp); 

    21. } catch (Exception e) { 
    22. //TODO: handle exception 
    23. e.printStackTrace(); 

    24.  

    25.  

    可以看到,创建NIO服务端的主要步骤如下:

    1. 打开ServerSocketChannel,监听客户端连接
    2. 绑定监听端口,设置连接为非阻塞模式
    3. 创建Reactor线程,创建多路复用器并启动线程
    4. 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
    5. Selector轮询准备就绪的key
    6. Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路
    7. 设置客户端链路为非阻塞模式
    8. 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
    9. 异步读取客户端消息到缓冲区
    10. 对Buffer编解码,处理半包消息,将解码成功的消息封装成Task
    11. 将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端

    Java AIO

    Java nio 2.0的主要改进就是引入了异步IO(包括文件和网络),这里主要介绍下异步网络IO API的使用以及框架的设计,以TCP服务端为例。首先看下为了支持AIO引入的新的类和接口:

    ** java.nio.channels.AsynchronousChannel**
           标记一个channel支持异步IO操作。

    ** java.nio.channels.AsynchronousServerSocketChannel**
           ServerSocket的aio版本,创建TCP服务端,绑定地址,监听端口等。

    ** java.nio.channels.AsynchronousSocketChannel**
           面向流的异步socket channel,表示一个连接。

    ** java.nio.channels.AsynchronousChannelGroup**
           异步channel的分组管理,目的是为了资源共享。一个AsynchronousChannelGroup绑定一个线程池,这个线程池执行两个任务:处理IO事件和派发CompletionHandlerAsynchronousServerSocketChannel创建的时候可以传入一个AsynchronousChannelGroup,那么通过AsynchronousServerSocketChannel创建的AsynchronousSocketChannel将同属于一个组,共享资。

    ** java.nio.channels.CompletionHandler**
           异步IO操作结果的回调接口,用于定义在IO操作完成后所作的回调工作。AIO的API允许两种方式来处理异步操作的结果:返回的Future模式或者注册CompletionHandler,推荐用CompletionHandler的方式,这些handler的调用是由AsynchronousChannelGroup的线程池派发的。显然,线程池的大小是性能的关键因素AsynchronousChannelGroup允许绑定不同的线程池,通过三个静态方法来创建:

     public static AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory) throws IOException
    
     public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor, int initialSize)
    
     public static AsynchronousChannelGroup withThreadPool(ExecutorService executor) throws IOException
    

    需要根据具体应用相应调整,从框架角度出发,需要暴露这样的配置选项给用户。

    在介绍完了aio引入的TCP的主要接口和类之后,我们来设想下一个aio框架应该怎么设计。参考非阻塞nio框架的设计,一般都是采用Reactor模式,Reactor负责事件的注册、select、事件的派发;相应地,异步IO有个Proactor模式,Proactor负责CompletionHandler的派发,查看一个典型的IO写操作的流程来看两者的区别:

    Reactor:  send(msg) -> 消息队列是否为空,如果为空  -> 向Reactor注册OP_WRITE,然后返回 -> Reactor select -> 触发Writable,通知用户线程去处理 ->先注销Writable(很多人遇到的cpu 100%的问题就在于没有注销),处理Writeable,如果没有完全写入,继续注册OP_WRITE。注意到,写入的工作还是用户线程在处理。

    Proactor: send(msg) -> 消息队列是否为空,如果为空,发起read异步调用,并注册CompletionHandler,然后返回。 -> 操作系统负责将你的消息写入,并返回结果(写入的字节数)给Proactor -> Proactor派发CompletionHandler。可见,写入的工作是操作系统在处理,无需用户线程参与。事实上在aio的API中,AsynchronousChannelGroup就扮演了Proactor的角色。

    CompletionHandler有三个方法,分别对应于处理成功、失败、被取消(通过返回的Future)情况下的回调处理:

    public interface CompletionHandler<V,A> {
    
         void completed(V result, A attachment);
    
        void failed(Throwable exc, A attachment);
    
        void cancelled(A attachment);
    }
    

    其中的泛型参数V表示IO调用的结果,而A是发起调用时传入的attchment。

    server端代码

    server

    1. //package aio; 
    2.  
    3. public class Server
    4. public static int clientCount = 0
    5. public static int port = 8000
    6. public static String hoString = "127.0.0.1"
    7.  
    8. public static void start()
    9. start(Server.port); 

    10. public static void start(int port)
    11. AsyncServerHandler serverHandler = new AsyncServerHandler(port); 
    12. Thread t1 = new Thread(serverHandler); 
    13. t1.start(); 

    14. public static void main(String[] args)
    15. start(); 


    16.  

    AsyncServerHandler

    1. //package aio; 
    2.  
    3. import java.io.IOException; 
    4. import java.net.InetSocketAddress; 
    5. import java.nio.channels.AsynchronousServerSocketChannel; 
    6. import java.nio.channels.CompletionHandler; 
    7. import java.util.concurrent.CountDownLatch; 
    8.  
    9. public class AsyncServerHandler implements Runnable
    10. private AsynchronousServerSocketChannel serverSocketChannel; 
    11. private CountDownLatch latch; 
    12. public AsyncServerHandler(int port)
    13. // TODO Auto-generated constructor stub 
    14. InetSocketAddress address = new InetSocketAddress(port); 
    15. try
    16. serverSocketChannel = AsynchronousServerSocketChannel.open(); 
    17. serverSocketChannel.bind(address); 
    18. } catch (IOException e) { 
    19. // TODO Auto-generated catch block 
    20. e.printStackTrace(); 

    21. System.out.println("服务器已经启动,端口号:" + port); 

    22.  
    23. @Override 
    24. public void run()
    25. // TODO Auto-generated method stub 
    26. //CountDownLatch初始化  
    27. //它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞  
    28. //此处,让现场在此阻塞,防止服务端执行完成后退出  
    29. //也可以使用while(true)+sleep  
    30. //生成环境就不需要担心这个问题,以为服务端是不会退出的  
    31. this.latch = new CountDownLatch(1); 
    32. serverSocketChannel.accept(this, new AcceptHandler(this.latch)); 
    33.  
    34. try
    35. latch.await(); 
    36. } catch (InterruptedException e) { 
    37. // TODO Auto-generated catch block 
    38. e.printStackTrace(); 


    39.  
    40. public AsynchronousServerSocketChannel getServerSocketChannel()
    41. return serverSocketChannel; 

    42.  
    43. public void setServerSocketChannel(AsynchronousServerSocketChannel serverSocketChannel)
    44. this.serverSocketChannel = serverSocketChannel; 

    45.  
    46. public CountDownLatch getLatch()
    47. return latch; 

    48.  
    49. public void setLatch(CountDownLatch latch)
    50. this.latch = latch; 

    51.  

    52.  

    AcceptHandler

    1. //package aio; 
    2.  
    3. import java.nio.ByteBuffer; 
    4. import java.nio.channels.AsynchronousServerSocketChannel; 
    5. import java.nio.channels.AsynchronousSocketChannel; 
    6. import java.nio.channels.CompletionHandler; 
    7. import java.util.concurrent.CountDownLatch; 
    8.  
    9. public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncServerHandler>
    10.  
    11. private CountDownLatch latch; 
    12. public AcceptHandler(CountDownLatch latch)
    13. // TODO Auto-generated constructor stub 
    14. this.latch = latch; 

    15. @Override 
    16. public void completed(AsynchronousSocketChannel socketChannel, AsyncServerHandler serverHandler)
    17. // TODO Auto-generated method stub 
    18. //进入这个函数说明说明事件处理成功,已经成功的拿到socketChanne 
    19. Server.clientCount ++; 
    20. System.out.println("当前连接的客户数:" + Server.clientCount); 
    21. //继续接受其他客户机的连接, 
    22. AsynchronousServerSocketChannel channel = serverHandler.getServerSocketChannel(); 
    23. channel.accept(serverHandler, this); 
    24. //创建新的buffer,为读取数据做准备 
    25. ByteBuffer buffer = ByteBuffer.allocate(1024); 
    26. socketChannel.read(buffer, buffer, new serverReadHandler(socketChannel)); 
    27.  

    28.  
    29. @Override 
    30. public void failed(Throwable exc, AsyncServerHandler attachment)
    31. // TODO Auto-generated method stub 
    32. exc.printStackTrace(); 
    33. this.latch.countDown(); 

    34.  
    35.  

    36.  

    serverReadHandler

    1. //package aio; 
    2.  
    3. import java.io.IOException; 
    4. import java.io.UnsupportedEncodingException; 
    5. import java.nio.ByteBuffer; 
    6. import java.nio.channels.AsynchronousSocketChannel; 
    7. import java.nio.channels.CompletionHandler; 
    8.  
    9. public class serverReadHandler implements CompletionHandler<Integer,ByteBuffer>
    10. //用于读取半包消息和应答消息 
    11. private AsynchronousSocketChannel serverChannel; 
    12. public serverReadHandler(AsynchronousSocketChannel channel)
    13. // TODO Auto-generated constructor stub 
    14. this.serverChannel = channel; 

    15.  
    16.  
    17. @Override 
    18. public void completed(Integer result, ByteBuffer buffer)
    19. // TODO Auto-generated method stub 
    20. //操作系统读取IO就绪之后,进入这个函数 
    21. //调整limit和position关系,方便读取 
    22. //buffer.flip(); 
    23. if (buffer.hasRemaining()) { 
    24. byte[] bytes = new byte[buffer.remaining()]; 
    25. buffer.get(bytes); 
    26. String msg = null
    27. try
    28. msg = new String(bytes, "UTF-8"); 
    29. } catch (UnsupportedEncodingException e) { 
    30. // TODO Auto-generated catch block 
    31. e.printStackTrace(); 

    32. System.out.println("服务器收到消息:" + msg); 
    33.  
    34. String calResult = null
    35. StringBuffer stringBuffer = new StringBuffer(msg); 
    36. calResult = stringBuffer.reverse().toString(); 
    37. //向客户端发送结果 
    38. byte[] resultBytes = calResult.getBytes(); 
    39. ByteBuffer rBuffer = ByteBuffer.allocate(resultBytes.length); 
    40. rBuffer.put(resultBytes); 
    41. this.serverChannel.write(rBuffer, rBuffer, new ServerWriteHandler(this.serverChannel)); 
    42. }else
    43. System.out.println("服务器没有读取到数据"); 


    44.  
    45. @Override 
    46. public void failed(Throwable exc, ByteBuffer buffer)
    47. // TODO Auto-generated method stub 
    48. try
    49. this.serverChannel.close(); 
    50. System.out.println("服务器socket关闭~~~"); 
    51. } catch (IOException e) { 
    52. // TODO Auto-generated catch block 
    53. e.printStackTrace(); 


    54.  

    55.  

    serverWriteHandler

    1. //package aio; 
    2.  
    3. import java.io.IOException; 
    4. import java.nio.ByteBuffer; 
    5. import java.nio.channels.AsynchronousSocketChannel; 
    6. import java.nio.channels.CompletionHandler; 
    7.  
    8. public class ServerWriteHandler implements CompletionHandler<Integer,ByteBuffer>
    9. private AsynchronousSocketChannel serverChannel; 
    10. public ServerWriteHandler(AsynchronousSocketChannel channel)
    11. // TODO Auto-generated constructor stub  
    12. this.serverChannel = channel; 

    13. @Override 
    14. public void completed(Integer result, ByteBuffer buffer)
    15. // TODO Auto-generated method stub 
    16. //调整limit和position的位置,方便下面输出使用 
    17. //buffer.flip(); 
    18. if (buffer.hasRemaining()) { 
    19. System.out.println("服务器输出数据~~~"); 
    20. buffer.clear(); 
    21. //向客户端写入数据 
    22. this.serverChannel.write(buffer, buffer, this); 
    23. } else
    24. //读取数据 
    25. ByteBuffer readBuffer = ByteBuffer.allocate(1024); 
    26. this.serverChannel.read(readBuffer, readBuffer, new serverReadHandler(this.serverChannel)); 


    27. @Override 
    28. public void failed(Throwable exc, ByteBuffer buffer)
    29. // TODO Auto-generated method stub 
    30. //出现异常关闭socketchannel 
    31. try
    32. this.serverChannel.close(); 
    33. } catch (IOException e) { 
    34. // TODO Auto-generated catch block 
    35. e.printStackTrace(); 


    36.  

    37.  

    客户端

    client

    1. //package aio; 
    2.  
    3. import java.util.Scanner; 
    4.  
    5. public class client
    6. private static String DEFAULT_HOST = "127.0.0.1";  
    7. private static int DEFAULT_PORT = 8000;  
    8. private static AsyncClientHandler clientHandle;  
    9. public static void start(){  
    10. start(DEFAULT_HOST,DEFAULT_PORT);  
    11. }  
    12. public static synchronized void start(String ip,int port){  
    13. if(clientHandle!=null)  
    14. return;  
    15. clientHandle = new AsyncClientHandler(ip,port);  
    16. new Thread(clientHandle,"Client").start();  
    17. }  
    18. //向服务器发送消息  
    19. public static boolean sendMsg(String msg) throws Exception{  
    20. if(msg.equals("q")) return false;  
    21. clientHandle.sendMsg(msg);  
    22. return true;  
    23. }  
    24. //@SuppressWarnings("resource")  
    25. public static void main(String[] args) throws Exception{  
    26. //System.out.println("请输入请求消息:");  
    27. start();  
    28. System.out.println("请输入请求消息:");  
    29. Scanner scanner = new Scanner(System.in);  
    30. String tmp = null
    31. for (int i = 0; i < 10; i++) { 
    32. tmp = scanner.nextLine(); 
    33. clientHandle.sendMsg(tmp); 

    34.  
    35. }  

    36.  

    AsyncClientHandler

    1. //package aio; 
    2.  
    3. import java.io.IOException; 
    4. import java.net.InetSocketAddress; 
    5. import java.nio.ByteBuffer; 
    6. import java.nio.channels.AsynchronousSocketChannel; 
    7. import java.nio.channels.CompletionHandler; 
    8. import java.util.concurrent.CountDownLatch; 
    9.  
    10. public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable
    11. private AsynchronousSocketChannel clientChannel; 
    12. private String host; 
    13. private int port; 
    14. private CountDownLatch latch; 
    15.  
    16. public AsyncClientHandler(String host, int port)
    17. // TODO Auto-generated constructor stub 
    18. this.host = host; 
    19. this.port = port; 
    20. try
    21. //创建异步客户端通道 
    22. clientChannel = AsynchronousSocketChannel.open(); 
    23. } catch (Exception e) { 
    24. // TODO: handle exception 
    25. e.printStackTrace(); 


    26.  
    27.  
    28. @Override 
    29. public void run()
    30. // TODO Auto-generated method stub 
    31. latch = new CountDownLatch(1); 
    32. //创建InetScoketAddress 
    33. InetSocketAddress address = new InetSocketAddress(this.host, this.port); 
    34. //发起异步连接操作,回调参数是这个类本身,如果连接成功会回调completed方法 
    35. clientChannel.connect(address, this, this); 
    36. try
    37. latch.await(); 
    38. } catch (InterruptedException e) { 
    39. // TODO Auto-generated catch block 
    40. e.printStackTrace(); 

    41. try
    42. clientChannel.close(); 
    43. } catch (IOException e) { 
    44. // TODO Auto-generated catch block 
    45. e.printStackTrace(); 


    46.  
    47. @Override 
    48. public void completed(Void result, AsyncClientHandler attachment)
    49. // TODO Auto-generated method stub 
    50. //连接服务器成功,就会调用这个函数 
    51. System.out.println("连接服务器成功!!!!"); 

    52.  
    53. @Override 
    54. public void failed(Throwable exc, AsyncClientHandler attachment)
    55. // TODO Auto-generated method stub 
    56. //连接服务器失败,会调用这个函数 
    57. System.out.println("连接服务器失败!!!"); 
    58. exc.printStackTrace(); 
    59.  
    60. try
    61. clientChannel.close(); 
    62. latch.countDown(); 
    63. } catch (IOException e) { 
    64. // TODO Auto-generated catch block 
    65. e.printStackTrace(); 


    66.  
    67. //向服务器发送消息 
    68. public void sendMsg(String msg)
    69. System.out.println(msg); 
    70. byte[] bytes = msg.getBytes(); 
    71. ByteBuffer buffer = ByteBuffer.allocate(bytes.length); 
    72. buffer.put(bytes); 
    73. //进行flip操作 
    74. buffer.flip(); 
    75. //异步写 
    76. clientChannel.write(buffer, buffer, new WriteHandler(clientChannel,latch)); 
    77.  
    78.  

    79.  
    80.  

    81.  
    82.  

    writeHandler

    1. //package aio; 
    2.  
    3. import java.io.IOException; 
    4. import java.nio.ByteBuffer; 
    5. import java.nio.channels.AsynchronousSocketChannel; 
    6. import java.nio.channels.CompletionHandler; 
    7. import java.util.concurrent.CountDownLatch; 
    8.  
    9. public class WriteHandler implements CompletionHandler<Integer, ByteBuffer>
    10.  
    11. private AsynchronousSocketChannel clientChannel; 
    12. private CountDownLatch latch; 
    13. public WriteHandler(AsynchronousSocketChannel channel, CountDownLatch latch)
    14. // TODO Auto-generated constructor stub 
    15. this.clientChannel = channel; 
    16. this.latch = latch; 

    17. @Override 
    18. public void completed(Integer result, ByteBuffer buffer)
    19. // TODO Auto-generated method stub 
    20. System.out.println("发送数据成功!~~~"); 
    21. //进行flip操作 
    22. //buffer.flip(); 
    23. //判断buffer中是否有需要发送的数据,如果有数据就进行发送,如果没有就进行读取操作 
    24. if (buffer.hasRemaining()) { 
    25. System.out.println("进入写数据!!!"); 
    26. clientChannel.write(buffer, buffer, this); 
    27. System.out.println("发送数据成功~~~"); 

    28. //读取数据 
    29. System.out.println("进入读取数据"); 
    30. ByteBuffer readBuffer = ByteBuffer.allocate(1024); 
    31. clientChannel.read(readBuffer, readBuffer, new ReadHandle(clientChannel,latch)); 
    32.  

    33.  
    34. @Override 
    35. public void failed(Throwable exc, ByteBuffer buffer)
    36. // TODO Auto-generated method stub 
    37. System.err.println("发送数据失败~~~"); 
    38. try
    39. clientChannel.close(); 
    40. latch.countDown(); 
    41. } catch (IOException e) { 
    42. // TODO Auto-generated catch block 
    43. e.printStackTrace(); 

    44.  

    45.  

    46.  

    ReadHandle

    1. //package aio; 
    2.  
    3. import java.io.IOException; 
    4. import java.io.UnsupportedEncodingException; 
    5. import java.nio.ByteBuffer; 
    6. import java.nio.channels.AsynchronousSocketChannel; 
    7. import java.nio.channels.CompletionHandler; 
    8. import java.util.concurrent.CountDownLatch; 
    9.  
    10. public class ReadHandle implements CompletionHandler<Integer,ByteBuffer>
    11. private AsynchronousSocketChannel clientChannel; 
    12. private CountDownLatch latch; 
    13. public ReadHandle(AsynchronousSocketChannel channel, CountDownLatch latch)
    14. // TODO Auto-generated constructor stub 
    15. this.latch = latch; 
    16. this.clientChannel = channel; 

    17. @Override 
    18. public void completed(Integer result, ByteBuffer buffer)
    19. // TODO Auto-generated method stub 
    20. //调整buffer的limit和position方便获取收到的数据 
    21. //buffer.flip(); 
    22. System.out.println("读取数据成功!!!"); 
    23. byte[] bytes = new byte[buffer.remaining()]; 
    24. buffer.get(bytes); 
    25. String res; 
    26. try
    27. res = new String(bytes, "UTF-8"); 
    28. System.out.println("收到的数据: " + res); 
    29. } catch (UnsupportedEncodingException e) { 
    30. // TODO Auto-generated catch block 
    31. e.printStackTrace(); 

    32.  
    33.  
    34.  
    35.  

    36. @Override 
    37. public void failed(Throwable exc, ByteBuffer attachment)
    38. // TODO Auto-generated method stub 
    39. System.err.println("读取数据失败~~~"); 
    40.  
    41. try
    42. clientChannel.close(); 
    43. this.latch.countDown(); 
    44. } catch (IOException e) { 
    45. // TODO Auto-generated catch block 
    46. e.printStackTrace(); 


    47.  
    48.  

    49.  
  • 相关阅读:
    ibatis $与#的区别
    (转载)Hibernate与Jpa的关系
    tomcat web工程 jar包冲突解决方法
    jquery 获取checkbox 选中值并拼接字符集
    mysql BLOB字段转String的方法
    Ajax工作原理
    Spring mvc 具体RequestMapping 参数含义
    覆盖bootstrap的样式
    开园啦,致曾经现在以后的自己~
    SimpleDateFormat 常规用法
  • 原文地址:https://www.cnblogs.com/chailinbo/p/9226651.html
Copyright © 2011-2022 走看看