zoukankan      html  css  js  c++  java
  • 转:Java NIO(2)

    java.nio包是Java在1.4之后增加的,用来提高I/O操作的效率。在nio包中主要包括以下几个类或接口:

    * Buffer:缓冲区,用来临时存放输入或输出数据。

    * Charset:用来把Unicode字符编码和其它字符编码互转。

    * Channel:数据传输通道,用来把Buffer中的数据写入到数据源,或者把数据源中的数据读入到Buffer。

    * Selector:用来支持异步I/O操作,也叫非阻塞I/O操作。


    nio包中主要通过下面两个方面来提高I/O操作效率:

    * 通过Buffer和Channel来提高I/O操作的速度。

    * 通过Selector来支持非阻塞I/O操作。


    下面来看一下程序中是怎么通过这些类库实现Socket功能。


    首先介绍一下几个辅助类

    辅助类SerializableUtil,这个类用来把java对象序列化成字节数组,或者把字节数组反序列化成java对象。

    1. package com.googlecode.garbagecan.test.socket;  
    2.   
    3. import java.io.ByteArrayInputStream;  
    4. import java.io.ByteArrayOutputStream;  
    5. import java.io.IOException;  
    6. import java.io.ObjectInputStream;  
    7. import java.io.ObjectOutputStream;  
    8.   
    9. public class SerializableUtil {  
    10.       
    11.     public static byte[] toBytes(Object object) {  
    12.         ByteArrayOutputStream baos = new ByteArrayOutputStream();  
    13.         ObjectOutputStream oos = null;  
    14.         try {  
    15.             oos = new ObjectOutputStream(baos);  
    16.             oos.writeObject(object);  
    17.             byte[] bytes = baos.toByteArray();  
    18.             return bytes;  
    19.         } catch(IOException ex) {  
    20.             throw new RuntimeException(ex.getMessage(), ex);  
    21.         } finally {  
    22.             try {  
    23.                 oos.close();  
    24.             } catch (Exception e) {}  
    25.         }  
    26.     }  
    27.       
    28.     public static Object toObject(byte[] bytes) {  
    29.         ByteArrayInputStream bais = new ByteArrayInputStream(bytes);  
    30.         ObjectInputStream ois = null;  
    31.         try {  
    32.             ois = new ObjectInputStream(bais);  
    33.             Object object = ois.readObject();  
    34.             return object;  
    35.         } catch(IOException ex) {  
    36.             throw new RuntimeException(ex.getMessage(), ex);  
    37.         } catch(ClassNotFoundException ex) {  
    38.             throw new RuntimeException(ex.getMessage(), ex);  
    39.         } finally {  
    40.             try {  
    41.                 ois.close();  
    42.             } catch (Exception e) {}  
    43.         }  
    44.     }  
    45. }  

    辅助类MyRequestObject和MyResponseObject,这两个类是普通的java对象,实现了Serializable接口。MyRequestObject类是Client发出的请求,MyResponseObject是Server端作出的响应。

    1. package com.googlecode.garbagecan.test.socket.nio;  
    2.   
    3. import java.io.Serializable;  
    4.   
    5. public class MyRequestObject implements Serializable {  
    6.   
    7.     private static final long serialVersionUID = 1L;  
    8.   
    9.     private String name;  
    10.       
    11.     private String value;  
    12.   
    13.     private byte[] bytes;  
    14.       
    15.     public MyRequestObject(String name, String value) {  
    16.         this.name = name;  
    17.         this.value = value;  
    18.         this.bytes = new byte[1024];  
    19.     }  
    20.       
    21.     public String getName() {  
    22.         return name;  
    23.     }  
    24.   
    25.     public void setName(String name) {  
    26.         this.name = name;  
    27.     }  
    28.   
    29.     public String getValue() {  
    30.         return value;  
    31.     }  
    32.   
    33.     public void setValue(String value) {  
    34.         this.value = value;  
    35.     }  
    36.       
    37.     @Override  
    38.     public String toString() {  
    39.         StringBuffer sb = new StringBuffer();  
    40.         sb.append("Request [name: " + name  + ", value: " + value + ", bytes: " + bytes.length+ "]");  
    41.         return sb.toString();  
    42.     }  
    43. }  
    44.   
    45. package com.googlecode.garbagecan.test.socket.nio;  
    46.   
    47. import java.io.Serializable;  
    48.   
    49. public class MyResponseObject implements Serializable {  
    50.   
    51.     private static final long serialVersionUID = 1L;  
    52.   
    53.     private String name;  
    54.       
    55.     private String value;  
    56.   
    57.     private byte[] bytes;  
    58.       
    59.     public MyResponseObject(String name, String value) {  
    60.         this.name = name;  
    61.         this.value = value;  
    62.         this.bytes = new byte[1024];  
    63.     }  
    64.       
    65.     public String getName() {  
    66.         return name;  
    67.     }  
    68.   
    69.     public void setName(String name) {  
    70.         this.name = name;  
    71.     }  
    72.   
    73.     public String getValue() {  
    74.         return value;  
    75.     }  
    76.   
    77.     public void setValue(String value) {  
    78.         this.value = value;  
    79.     }  
    80.       
    81.     @Override  
    82.     public String toString() {  
    83.         StringBuffer sb = new StringBuffer();  
    84.         sb.append("Response [name: " + name  + ", value: " + value + ", bytes: " + bytes.length+ "]");  
    85.         return sb.toString();  
    86.     }  
    87. }  


    下面主要看一下Server端的代码,其中有一些英文注释对理解代码很有帮助,注释主要是来源jdk的文档和例子,这里就没有再翻译

    1. package com.googlecode.garbagecan.test.socket.nio;  
    2.   
    3. import java.io.ByteArrayOutputStream;  
    4. import java.io.IOException;  
    5. import java.net.InetSocketAddress;  
    6. import java.nio.ByteBuffer;  
    7. import java.nio.channels.ClosedChannelException;  
    8. import java.nio.channels.SelectionKey;  
    9. import java.nio.channels.Selector;  
    10. import java.nio.channels.ServerSocketChannel;  
    11. import java.nio.channels.SocketChannel;  
    12. import java.util.Iterator;  
    13. import java.util.logging.Level;  
    14. import java.util.logging.Logger;  
    15.   
    16. import com.googlecode.garbagecan.test.socket.SerializableUtil;  
    17.   
    18. public class MyServer3 {  
    19.   
    20.     private final static Logger logger = Logger.getLogger(MyServer3.class.getName());  
    21.       
    22.     public static void main(String[] args) {  
    23.         Selector selector = null;  
    24.         ServerSocketChannel serverSocketChannel = null;  
    25.           
    26.         try {  
    27.             // Selector for incoming time requests  
    28.             selector = Selector.open();  
    29.   
    30.             // Create a new server socket and set to non blocking mode  
    31.             serverSocketChannel = ServerSocketChannel.open();  
    32.             serverSocketChannel.configureBlocking(false);  
    33.               
    34.             // Bind the server socket to the local host and port  
    35.             serverSocketChannel.socket().setReuseAddress(true);  
    36.             serverSocketChannel.socket().bind(new InetSocketAddress(10000));  
    37.               
    38.             // Register accepts on the server socket with the selector. This  
    39.             // step tells the selector that the socket wants to be put on the  
    40.             // ready list when accept operations occur, so allowing multiplexed  
    41.             // non-blocking I/O to take place.  
    42.             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  
    43.       
    44.             // Here's where everything happens. The select method will  
    45.             // return when any operations registered above have occurred, the  
    46.             // thread has been interrupted, etc.  
    47.             while (selector.select() > 0) {  
    48.                 // Someone is ready for I/O, get the ready keys  
    49.                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();  
    50.       
    51.                 // Walk through the ready keys collection and process date requests.  
    52.                 while (it.hasNext()) {  
    53.                     SelectionKey readyKey = it.next();  
    54.                     it.remove();  
    55.                       
    56.                     // The key indexes into the selector so you  
    57.                     // can retrieve the socket that's ready for I/O  
    58.                     execute((ServerSocketChannel) readyKey.channel());  
    59.                 }  
    60.             }  
    61.         } catch (ClosedChannelException ex) {  
    62.             logger.log(Level.SEVERE, null, ex);  
    63.         } catch (IOException ex) {  
    64.             logger.log(Level.SEVERE, null, ex);  
    65.         } finally {  
    66.             try {  
    67.                 selector.close();  
    68.             } catch(Exception ex) {}  
    69.             try {  
    70.                 serverSocketChannel.close();  
    71.             } catch(Exception ex) {}  
    72.         }  
    73.     }  
    74.   
    75.     private static void execute(ServerSocketChannel serverSocketChannel) throws IOException {  
    76.         SocketChannel socketChannel = null;  
    77.         try {  
    78.             socketChannel = serverSocketChannel.accept();  
    79.             MyRequestObject myRequestObject = receiveData(socketChannel);  
    80.             logger.log(Level.INFO, myRequestObject.toString());  
    81.               
    82.             MyResponseObject myResponseObject = new MyResponseObject(  
    83.                     "response for " + myRequestObject.getName(),   
    84.                     "response for " + myRequestObject.getValue());  
    85.             sendData(socketChannel, myResponseObject);  
    86.             logger.log(Level.INFO, myResponseObject.toString());  
    87.         } finally {  
    88.             try {  
    89.                 socketChannel.close();  
    90.             } catch(Exception ex) {}  
    91.         }  
    92.     }  
    93.       
    94.     private static MyRequestObject receiveData(SocketChannel socketChannel) throws IOException {  
    95.         MyRequestObject myRequestObject = null;  
    96.         ByteArrayOutputStream baos = new ByteArrayOutputStream();  
    97.         ByteBuffer buffer = ByteBuffer.allocate(1024);  
    98.           
    99.         try {  
    100.             byte[] bytes;  
    101.             int size = 0;  
    102.             while ((size = socketChannel.read(buffer)) >= 0) {  
    103.                 buffer.flip();  
    104.                 bytes = new byte[size];  
    105.                 buffer.get(bytes);  
    106.                 baos.write(bytes);  
    107.                 buffer.clear();  
    108.             }  
    109.             bytes = baos.toByteArray();  
    110.             Object obj = SerializableUtil.toObject(bytes);  
    111.             myRequestObject = (MyRequestObject)obj;  
    112.         } finally {  
    113.             try {  
    114.                 baos.close();  
    115.             } catch(Exception ex) {}  
    116.         }  
    117.         return myRequestObject;  
    118.     }  
    119.   
    120.     private static void sendData(SocketChannel socketChannel, MyResponseObject myResponseObject) throws IOException {  
    121.         byte[] bytes = SerializableUtil.toBytes(myResponseObject);  
    122.         ByteBuffer buffer = ByteBuffer.wrap(bytes);  
    123.         socketChannel.write(buffer);  
    124.     }  
    125. }  

    下面是Client的代码,代码比较简单就是启动了100个线程来访问Server

    1. package com.googlecode.garbagecan.test.socket.nio;  
    2.   
    3. import java.io.ByteArrayOutputStream;  
    4. import java.io.IOException;  
    5. import java.net.InetSocketAddress;  
    6. import java.net.SocketAddress;  
    7. import java.nio.ByteBuffer;  
    8. import java.nio.channels.SocketChannel;  
    9. import java.util.logging.Level;  
    10. import java.util.logging.Logger;  
    11.   
    12. import com.googlecode.garbagecan.test.socket.SerializableUtil;  
    13.   
    14. public class MyClient3 {  
    15.   
    16.     private final static Logger logger = Logger.getLogger(MyClient3.class.getName());  
    17.       
    18.     public static void main(String[] args) throws Exception {  
    19.         for (int i = 0; i < 100; i++) {  
    20.             final int idx = i;  
    21.             new Thread(new MyRunnable(idx)).start();  
    22.         }  
    23.     }  
    24.       
    25.     private static final class MyRunnable implements Runnable {  
    26.           
    27.         private final int idx;  
    28.   
    29.         private MyRunnable(int idx) {  
    30.             this.idx = idx;  
    31.         }  
    32.   
    33.         public void run() {  
    34.             SocketChannel socketChannel = null;  
    35.             try {  
    36.                 socketChannel = SocketChannel.open();  
    37.                 SocketAddress socketAddress = new InetSocketAddress("localhost", 10000);  
    38.                 socketChannel.connect(socketAddress);  
    39.   
    40.                 MyRequestObject myRequestObject = new MyRequestObject("request_" + idx, "request_" + idx);  
    41.                 logger.log(Level.INFO, myRequestObject.toString());  
    42.                 sendData(socketChannel, myRequestObject);  
    43.                   
    44.                 MyResponseObject myResponseObject = receiveData(socketChannel);  
    45.                 logger.log(Level.INFO, myResponseObject.toString());  
    46.             } catch (Exception ex) {  
    47.                 logger.log(Level.SEVERE, null, ex);  
    48.             } finally {  
    49.                 try {  
    50.                     socketChannel.close();  
    51.                 } catch(Exception ex) {}  
    52.             }  
    53.         }  
    54.   
    55.         private void sendData(SocketChannel socketChannel, MyRequestObject myRequestObject) throws IOException {  
    56.             byte[] bytes = SerializableUtil.toBytes(myRequestObject);  
    57.             ByteBuffer buffer = ByteBuffer.wrap(bytes);  
    58.             socketChannel.write(buffer);  
    59.             socketChannel.socket().shutdownOutput();  
    60.         }  
    61.   
    62.         private MyResponseObject receiveData(SocketChannel socketChannel) throws IOException {  
    63.             MyResponseObject myResponseObject = null;  
    64.             ByteArrayOutputStream baos = new ByteArrayOutputStream();  
    65.               
    66.             try {  
    67.                 ByteBuffer buffer = ByteBuffer.allocateDirect(1024);  
    68.                 byte[] bytes;  
    69.                 int count = 0;  
    70.                 while ((count = socketChannel.read(buffer)) >= 0) {  
    71.                     buffer.flip();  
    72.                     bytes = new byte[count];  
    73.                     buffer.get(bytes);  
    74.                     baos.write(bytes);  
    75.                     buffer.clear();  
    76.                 }  
    77.                 bytes = baos.toByteArray();  
    78.                 Object obj = SerializableUtil.toObject(bytes);  
    79.                 myResponseObject = (MyResponseObject) obj;  
    80.                 socketChannel.socket().shutdownInput();  
    81.             } finally {  
    82.                 try {  
    83.                     baos.close();  
    84.                 } catch(Exception ex) {}  
    85.             }  
    86.             return myResponseObject;  
    87.         }  
    88.     }  
    89. }  


    最后测试上面的代码,首先运行Server类,然后运行Client类,就可以分别在Server端和Client端控制台看到发送或接收到的MyRequestObject或MyResponseObject对象了。

  • 相关阅读:
    长篇专访科比:成功没秘诀 只有不断努力
    生活哲理
    8个让程序员追悔莫及的职业建议
    优秀程序员必备十大习惯
    回顾马云屌丝岁月的惨状:多次被拒失声痛哭
    程序员,究竟该怎么赚钱?
    洛杉矶凌晨4点-------启航
    iOS越狱开发
    iOS越狱开发中遇到的坑
    macOS上搭建RabbitMQ+MQTT服务器
  • 原文地址:https://www.cnblogs.com/1995hxt/p/5811567.html
Copyright © 2011-2022 走看看