zoukankan      html  css  js  c++  java
  • java NIO原理及实例

    1、reactor(反应器)模式

      使用单线程模拟多线程,提高资源利用率和程序的效率,增加系统吞吐量。下面例子比较形象的说明了什么是反应器模式:

      一个老板经营一个饭店,

      传统模式 - 来一个客人安排一个服务员招呼,客人很满意;(相当于一个连接一个线程)

      后来客人越来越多,需要的服务员越来越多,资源条件不足以再请更多的服务员了,传统模式已经不能满足需求。老板之所以为老板自然有过人之处,老板发现,服务员在为客人服务时,当客人点菜的时候,服务员基本处于等待状态,(阻塞线程,不做事)。

      于是乎就让服务员在客人点菜的时候,去为其他客人服务,当客人菜点好后再招呼服务员即可。 --反应器(reactor)模式诞生了

      饭店的生意红红火火,几个服务员就足以支撑大量的客流量,老板用有限的资源赚了更多的money~~~~^_^

    2、NIO中的重要概念 通道、缓冲区、选择器

      通道:类似于流,但是可以异步读写数据(流只能同步读写),通道是双向的,(流是单向的),通道的数据总是要先读到一个buffer 或者 从一个buffer写入,即通道与buffer进行数据交互。

      通道类型:  

      • FileChannel:从文件中读写数据。  
      • DatagramChannel:能通过UDP读写网络中的数据。  
      • SocketChannel:能通过TCP读写网络中的数据。  
      • ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。  

      FileChannel比较特殊,它可以与通道进行数据交互, 不能切换到非阻塞模式,套接字通道可以切换到非阻塞模式;

      缓冲区 - 本质上是一块可以存储数据的内存,被封装成了buffer对象而已!

      缓冲区类型:

      • ByteBuffer  
      • MappedByteBuffer  
      • CharBuffer  
      • DoubleBuffer  
      • FloatBuffer  
      • IntBuffer  
      • LongBuffer  
      • ShortBuffer  

      常用方法:

      •   allocate() - 分配一块缓冲区  
      •   put() -  向缓冲区写数据
      •   get() - 向缓冲区读数据  
      •   filp() - 将缓冲区从写模式切换到读模式  
      •     clear() - 从读模式切换到写模式,不会清空数据,但后续写数据会覆盖原来的数据,即使有部分数据没有读,也会被遗忘;  
      •       compact() - 从读数据切换到写模式,数据不会被清空,会将所有未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据之后写数据
      •   mark() - 对position做出标记,配合reset使用
      •       reset() - 将position置为标记值    

    缓冲区的一些属性:

      •     capacity - 缓冲区大小,无论是读模式还是写模式,此属性值不会变;
      •     position - 写数据时,position表示当前写的位置,每写一个数据,会向下移动一个数据单元,初始为0;最大为capacity - 1

            切换到读模式时,position会被置为0,表示当前读的位置

      •     limit - 写模式下,limit 相当于capacity 表示最多可以写多少数据,切换到读模式时,limit 等于原先的position,表示最多可以读多少数据。

      选择器:相当于一个观察者,用来监听通道感兴趣的事件,一个选择器可以绑定多个通道;

       通道向选择器注册时,需要指定感兴趣的事件,选择器支持以下事件:

      • SelectionKey.OP_CONNECT
      • SelectionKey.OP_ACCEPT
      • SelectionKey.OP_READ
      • SelectionKey.OP_WRITE  

       如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如下:

         int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; 

       通道向选择器注册时,会返回一个 SelectionKey对象,具有如下属性

      • interest集合
      • ready集合  
      • Channel  
      • Selector
      • 附加的对象(可选)  

      用“位与”操作interest 集合和给定的SelectionKey常量,可以确定某个确定的事件是否在interest 集合中。

    int interestSet = selectionKey.interestOps();  
     
    boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
    boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

      ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。Selection将在下一小节进行解释。可以这样访问ready集合: 
      int readySet = selectionKey.readyOps();

       也可以使用以下四个方法获取已就绪事件,返回值为boolean:

    selectionKey.isAcceptable();  
    selectionKey.isConnectable();  
    selectionKey.isReadable();  
    selectionKey.isWritable();  

       可以将一个对象或者更多信息附着到SelectionKey上,即记录在附加对象上,方法如下:

    selectionKey.attach(theObject);  
    Object attachedObj = selectionKey.attachment();  

       可以通过选择器的select方法获取是否有就绪的通道;

      • int select()  
      • int select(long timeout)  
      • int selectNow()

      返回值表示上次执行select之后,就绪通道的个数。 

      可以通过selectedKeySet获取已就绪的通道。返回值是SelectionKey 的集合,处理完相应的通道之后,需要removed 因为Selector不会自己removed

      select阻塞后,可以用wakeup唤醒;执行wakeup时,如果没有阻塞的select  那么执行完wakeup后下一个执行select就会立即返回。

      调用close() 方法关闭selector

     下面是一个简单的实例代码,帮助理解上面的内容:

      1 package com.pt.nio;
      2 
      3 import java.net.InetAddress;
      4 import java.net.InetSocketAddress;
      5 import java.nio.ByteBuffer;
      6 import java.nio.CharBuffer;
      7 import java.nio.channels.SelectionKey;
      8 import java.nio.channels.Selector;
      9 import java.nio.channels.ServerSocketChannel;
     10 import java.nio.channels.SocketChannel;
     11 import java.nio.charset.Charset;
     12 import java.nio.charset.CharsetDecoder;
     13 import java.util.Iterator;
     14 import java.util.Set;
     15 
     16 public class Reactor implements Runnable {
     17     public int id = 100001;
     18     public int bufferSize = 2048;
     19     @Override
     20     public void run() {
     21         // TODO Auto-generated method stub
     22         init();
     23     }
     24 
     25     public void init() {
     26         try {
     27             // 创建通道和选择器
     28             ServerSocketChannel socketChannel = ServerSocketChannel.open();
     29             Selector selector = Selector.open();
     30             InetSocketAddress inetSocketAddress = new InetSocketAddress(
     31                     InetAddress.getLocalHost(), 4700);
     32             socketChannel.socket().bind(inetSocketAddress);
     33             // 设置通道非阻塞 绑定选择器
     34             socketChannel.configureBlocking(false);
     35             socketChannel.register(selector, SelectionKey.OP_ACCEPT).attach(
     36                     id++);
     37             System.out.println("Server started .... port:4700");
     38             listener(selector);
     39 
     40         } catch (Exception e) {
     41             // TODO: handle exception
     42         }
     43     }
     44 
     45     public void listener(Selector in_selector) {
     46         try {
     47             while (true) {
     48                 Thread.sleep(1*1000);
     49                 in_selector.select(); // 阻塞 直到有就绪事件为止
     50                 Set<SelectionKey> readySelectionKey = in_selector
     51                         .selectedKeys();
     52                 Iterator<SelectionKey> it = readySelectionKey.iterator();
     53                 while (it.hasNext()) {
     54                     SelectionKey selectionKey = it.next();
     55                     // 判断是哪个事件
     56                     if (selectionKey.isAcceptable()) {// 客户请求连接
     57                         System.out.println(selectionKey.attachment()
     58                                 + " - 接受请求事件");
     59                         // 获取通道 接受连接,
     60                         // 设置非阻塞模式(必须),同时需要注册 读写数据的事件,这样有消息触发时才能捕获
     61                         ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey
     62                                 .channel();
     63                         serverSocketChannel
     64                                 .accept()
     65                                 .configureBlocking(false)
     66                                 .register(
     67                                         in_selector,
     68                                         SelectionKey.OP_READ
     69                                                 | SelectionKey.OP_WRITE).attach(id++);
     70                         System.out
     71                                 .println(selectionKey.attachment() + " - 已连接");
     72 
     73                         // 下面这种写法是有问题的 不应该在serverSocketChannel上面注册
     74                         /*
     75                          * serverSocketChannel.configureBlocking(false);
     76                          * serverSocketChannel.register(in_selector,
     77                          * SelectionKey.OP_READ);
     78                          * serverSocketChannel.register(in_selector,
     79                          * SelectionKey.OP_WRITE);
     80                          */
     81                     }
     82                     if (selectionKey.isReadable()) {// 读数据
     83                         System.out.println(selectionKey.attachment()
     84                                 + " - 读数据事件");
     85                         SocketChannel clientChannel=(SocketChannel)selectionKey.channel();
     86                         ByteBuffer receiveBuf = ByteBuffer.allocate(bufferSize);
     87                         clientChannel.read(receiveBuf);
     88                         System.out.println(selectionKey.attachment()
     89                                 + " - 读取数据:" + getString(receiveBuf));
     90                     }
     91                     if (selectionKey.isWritable()) {// 写数据
     92                         System.out.println(selectionKey.attachment()
     93                                 + " - 写数据事件");
     94                         SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
     95                         ByteBuffer sendBuf = ByteBuffer.allocate(bufferSize);
     96                         String sendText = "hello
    ";
     97                         sendBuf.put(sendText.getBytes());
     98                         sendBuf.flip();        //写完数据后调用此方法
     99                         clientChannel.write(sendBuf);
    100                     }
    101                     if (selectionKey.isConnectable()) {
    102                         System.out.println(selectionKey.attachment()
    103                                 + " - 连接事件");
    104                     }
    105                     // 必须removed 否则会继续存在,下一次循环还会进来,
    106                     // 注意removed 的位置,针对一个.next() remove一次
    107                     it.remove(); 
    108                 }
    109             }
    110         } catch (Exception e) {
    111             // TODO: handle exception
    112             System.out.println("Error - " + e.getMessage());
    113             e.printStackTrace();
    114         }
    115 
    116     }
    117     /**
    118      * ByteBuffer 转换 String
    119      * @param buffer
    120      * @return
    121      */
    122     public static String getString(ByteBuffer buffer)
    123     {
    124         String string = "";
    125         try
    126         {
    127             for(int i = 0; i<buffer.position();i++){
    128                 string += (char)buffer.get(i);
    129             }
    130             return string;
    131         }
    132         catch (Exception ex)
    133         {
    134             ex.printStackTrace();
    135             return "";
    136         }
    137     }
    138 }
    139 
    140 NIO服务器端
    NIO 服务器端
     1 package com.pt.bio;
     2 
     3 import java.io.*;
     4 import java.net.*;
     5 
     6 public class BioServer implements Runnable {
     7 
     8     @Override
     9     public void run() {
    10         // TODO Auto-generated method stub
    11         System.out.println("Hello Server!!");
    12 
    13         try {
    14             ServerSocket server = null;
    15             try {
    16                 server = new ServerSocket(4700);
    17                 // 创建一个ServerSocket在端口4700监听客户请求
    18             } catch (Exception e) {
    19                 System.out.println("can not listen to:" + e);
    20                 // 出错,打印出错信息
    21             }
    22             Socket socket = null;
    23             try {
    24                 socket = server.accept();
    25                 // 使用accept()阻塞等待客户请求,有客户
    26                 // 请求到来则产生一个Socket对象,并继续执行
    27             } catch (Exception e) {
    28                 System.out.println("Error." + e);
    29                 // 出错,打印出错信息
    30             }
    31             String line;
    32             BufferedReader is = new BufferedReader(new InputStreamReader(
    33                     socket.getInputStream()));
    34             // 由Socket对象得到输入流,并构造相应的BufferedReader对象
    35             
    36             // 由Socket对象得到输出流,并构造PrintWriter对象
    37 //            BufferedReader sin = new BufferedReader(new InputStreamReader(
    38 //                    System.in));
    39             // 由系统标准输入设备构造BufferedReader对象
    40             System.out.println("Client:" + is.readLine());
    41             PrintWriter os = new PrintWriter(socket.getOutputStream());
    42             // 在标准输出上打印从客户端读入的字符串
    43             line = "hello";
    44             // 从标准输入读入一字符串
    45 //            while (!line.equals("bye")) {
    46                 // 如果该字符串为 "bye",则停止循环
    47                 os.println(line);
    48                 // 向客户端输出该字符串
    49                 os.flush();
    50                 // 刷新输出流,使Client马上收到该字符串
    51 //                System.out.println("Server:" + line);
    52                 // 在系统标准输出上打印读入的字符串
    53 //                System.out.println("Client:" + is.readLine());
    54                 // 从Client读入一字符串,并打印到标准输出上
    55 //                line = sin.readLine();
    56                 // 从系统标准输入读入一字符串
    57 //            } // 继续循环
    58 //            os.close(); // 关闭Socket输出流
    59             is.close(); // 关闭Socket输入流
    60             socket.close(); // 关闭Socket
    61             server.close(); // 关闭ServerSocket
    62         } catch (Exception e) {
    63             System.out.println("Error." + e);
    64             // 出错,打印出错信息
    65         }
    66 
    67     }
    68 
    69 }
    70 
    71 BIO服务器端
    BIO 服务器端
     1 package com.pt;
     2 
     3 import java.io.BufferedReader;
     4 import java.io.InputStreamReader;
     5 import java.io.PrintWriter;
     6 import java.net.Socket;
     7 
     8 import org.junit.Test;
     9 
    10 import com.pt.bio.BioServer;
    11 import com.pt.nio.Reactor;
    12 
    13 public class TestReactor {
    14 
    15     @Test
    16     public void testConnect() throws Exception{
    17         Socket socket=new Socket("192.168.82.35",4700);//BIO 阻塞
    18         System.out.println("连接成功");
    19         BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    20          
    21         //下面这种写法,不用关闭客户端,服务器端也是可以收到的
    22         {
    23             PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
    24             printWriter.println("hi");
    25             printWriter.flush();
    26         }
    27         //这种写法必须关闭客户端,服务器端才可以收到 NIO不用
    28         {
    29 //        socket.getOutputStream().write(new byte[]{'h','i'});
    30 //        socket.getOutputStream().flush();
    31         //必须关闭BIO服务器才能收到消息.NIO服务器不需要关闭
    32         //socket.close();
    33         }
    34         byte[] buf = new byte[2048];
    35         System.out.println("准备读取数据~~");
    36         
    37         while(true){
    38             try {
    39                 //两种读取数据方式
    40                 int count = socket.getInputStream().read(buf);        //会阻塞
    41                 //String readFromServer = bufferedReader.readLine();//可以读取到数据 会阻塞,直到遇见
    
    42                 //System.out.println("方式二: 读取数据" + readFromServer);    
    43                 System.out.println("方式一: 读取数据" + new String(buf) + " count = " + count);
    44                 Thread.sleep(1*1000);
    45             } catch (InterruptedException e) {
    46                 // TODO Auto-generated catch block
    47                 e.printStackTrace();
    48             }
    49             //break;
    50         }
    51         
    52     }
    53     
    54     @Test 
    55     public void testNioServer(){
    56         Thread server = new Thread(new Reactor());
    57         server.start();
    58 
    59         while(true){
    60             try {
    61                 Thread.sleep(3*1000);
    62             } catch (InterruptedException e) {
    63                 // TODO Auto-generated catch block
    64                 e.printStackTrace();
    65             }
    66         }
    67     }
    68     
    69     @Test
    70     public void testBioServer(){
    71         Thread server = new Thread(new BioServer());
    72         server.start();
    73 
    74         while(true){
    75             try {
    76                 Thread.sleep(3*1000);
    77             } catch (InterruptedException e) {
    78                 // TODO Auto-generated catch block
    79                 e.printStackTrace();
    80             }
    81         }
    82     }
    83 
    84 }
    85 
    86 BIO客户端及测试类
    BIO客户端及测试类

     其中 testNioServer()方法,是启动NIO服务器端;

    testBioServer()方法是启动BIO服务器端

    testConnect()是BIO的一个连接

    基于NIO实现的时钟服务器:http://www.cnblogs.com/tengpan-cn/p/6529628.html

    一篇写的比较详细的JAVA NIO的文章:http://www.iteye.com/magazines/132-Java-NIO

  • 相关阅读:
    青岛公交查询
    Windows8应用生命周期 Metro Style Apps Lifecycle
    Mac下发布Qt应用程序
    Git命令笔记本
    IOS中 自定义访问用户Location时的提示信息
    使用OpenSSL发送IOS推送通知 Apple Push Notification
    最简单的iOS MapView标记点纠偏的方法
    罗技产品序列号追溯条码扫描系统
    展厅样品条形码报价管理系统
    Denso条码采集器程序开发2编译环境的搭建
  • 原文地址:https://www.cnblogs.com/xifenglou/p/9182020.html
Copyright © 2011-2022 走看看