zoukankan      html  css  js  c++  java
  • 3--Java NIO基础1

    一、NIO概述

    1. BIO带来的挑战

    BIO即堵塞式I/O,数据在写入或读取时都有可能堵塞,一旦有堵塞,线程将失去CPU的使用权,性能较差。

    2. NIO工作机制

    Java NIO由Channel、Buffer、Selector三个核心组成,NIO框架类结构图如下:

    其中,Buffer主要负责存取数据,Channel用于数据传输,获取数据,然后流入Buffer;或从Buffer取数据,发送出去。

    Selector允许单线程处理多个Channel,如果打开了多个连接(Channel),但每个连接的数据流量很小,使用Selector则很方便。

    二、Channel

    Channel类主要位于java.nio.channels包下,类结构图如下:

    Channel跟流相似,但流是单向的,而Channel是双向的。Channel总是从Buffer获取数据(写文件)或将数据写入Buffer(读文件时)。常用的Channel如下:

    • FileChannel,从文件中读写数据。
    • DatagramChannel,能通过UDP读写网络中的数据。
    • SocketChannel,能通过TCP读写网络中的数据。
    • ServerSocketChannel,可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

      

    package com.yyn.nio;
    
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    
    public class ByteBufferTest {
        
         public static void main(String [] args) throws IOException{
            // testRead(); //从文件读取数据,chanel向buffer中写数据
             testWrite();
         }
        
         
         //写文件
         public static void testWrite() throws IOException{
             RandomAccessFile raFile = new RandomAccessFile("byte_buffer_write.txt", "rw");
             FileChannel fChannel = raFile.getChannel();
             String data = "天王盖地虎
    小鸡炖蘑菇
    要从此路过
    就得跳支舞";
             byte[] dataByte = data.getBytes("UTF-8");
             System.out.println(dataByte.length);
             ByteBuffer buf = ByteBuffer.allocate(dataByte.length); 
             buf.put(dataByte, 0, dataByte.length);
             buf.flip();  //切换buffer到读模式
             fChannel.write(buf);  //从buffer读取数据到channel
             fChannel.force(true); //强制将数据刷新到磁盘,不一定有用
             buf.clear();
             buf.put(dataByte, 0, 10);
             buf.mark();
             buf.put(dataByte,10,10);
             buf.reset();
             buf.put(dataByte, 0, 10);
             buf.flip();  //切换buffer到读模式
             fChannel.write(buf);  //从buffer读取数据到channel
             fChannel.close();
             System.out.println("write over!!");
             
             
         }
        
         //读文件
         public static void testRead() throws IOException {
             RandomAccessFile raFile = new RandomAccessFile("test.txt", "rw");
             FileChannel fChannel = raFile.getChannel();
             ByteBuffer buf = ByteBuffer.allocate(10);
             int byteRead = fChannel.read(buf);
             StringBuffer sBuffer = new StringBuffer();
             while(byteRead != -1){
                 buf.flip(); //change to read mode
                 byte [] bs = null;
                 int limite = buf.limit();
                 if(buf.hasArray()){
                     bs = buf.array();
                 }
                 if(bs != null){
                     System.out.println("bs length: "+limite);
                     sBuffer.append(new String(bs,0,limite ,"UTF-8"));
                 }        
                 buf.clear();  // make buffer ready for write,clear all buffer
                 //buf.compact();  // make buffer ready for write,clear data readed in buffer 
                 byteRead = fChannel.read(buf);
             }
             fChannel.close();
             System.out.println("####:"+sBuffer.toString());
         }
           
    }
    View Code

     

    2.NIO优化方法

    2.1 FileChannel.transformXXX方法

    2.2 FileChannel.map方法

    三、Buffer

    Buffer是一片缓冲区,可读可写,非线程安全的,NIO包中针对常用的类型设置了Buffer,类结构图如下:

    要使用Buffer,需记住3个方法和4个特性

    • flip()方法,切换Buffer为读状态,此时Buffer可读。limit设置为position,position设置为0
    • clear()方法,切换Buffer为写状态,会清空Buffer里所有数据。position为0,limit置为capacity
    • compact()方法,切换Buffer为写状态,清空Buffer里所有已读数据,将未读数据剪切到Buffer前端。position设置为limit,limit设置为capacity

    要理解其4个特性,

    • capacity,Buffer的总长度,该值总是保持不变。A buffer's capacity is the number of elements it contains. The capacity of a buffer is never negative and never changes
    • position,下一个要操作的数据元素的位置,该值总是小于等于capacity和limit。Buffer为读状态时,表示下一个要读的位置,Buffer为写状态时,表示下一个要写的位置。

                    A buffer's position is the index of the next element to be read or written. A buffer's position is never negative and is never greater than its limit.

    • limit,Buffer中第一个不可操作元素的位置,limit<=capacity。A buffer's limit is the index of the first element that should not be read or written. A buffer's limit is never negative and is                 never greater than its capacity.
    • mark,用于记录当前position的前一个位置

    Buffer状态转换过程描述

    从Buffer中读数据方式:buffer.get()方法和channel.write()方法。

    向Buffer中写数据方式:buffer.put()方法和channel.read()方法。

    package com.yyn.nio;
    
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    
    public class ByteBufferTest {
        
         public static void main(String [] args) throws IOException{
            // testRead(); //从文件读取数据,chanel向buffer中写数据
             testWrite();
         }
        
         
         //写文件
         public static void testWrite() throws IOException{
             RandomAccessFile raFile = new RandomAccessFile("byte_buffer_write.txt", "rw");
             FileChannel fChannel = raFile.getChannel();
             String data = "天王盖地虎
    小鸡炖蘑菇
    要从此路过
    就得跳支舞";
             byte[] dataByte = data.getBytes("UTF-8");
             System.out.println(dataByte.length);
             ByteBuffer buf = ByteBuffer.allocate(dataByte.length); 
             buf.put(dataByte, 0, dataByte.length);
             buf.flip();  //切换buffer到读模式
             fChannel.write(buf);  //从buffer读取数据到channel
             fChannel.force(true); //强制将数据刷新到磁盘,不一定有用
             buf.clear();
             buf.put(dataByte, 0, 10);
             buf.mark();
             buf.put(dataByte,10,10);
             buf.reset();
             buf.put(dataByte, 0, 10);
             buf.flip();  //切换buffer到读模式
             fChannel.write(buf);  //从buffer读取数据到channel
             fChannel.close();
             System.out.println("write over!!");
             
             
         }
        
         //读文件
         public static void testRead() throws IOException {
             RandomAccessFile raFile = new RandomAccessFile("test.txt", "rw");
             FileChannel fChannel = raFile.getChannel();
             ByteBuffer buf = ByteBuffer.allocate(10);
             int byteRead = fChannel.read(buf);
             StringBuffer sBuffer = new StringBuffer();
             while(byteRead != -1){
                 buf.flip(); //change to read mode
                 byte [] bs = null;
                 int limite = buf.limit();
                 if(buf.hasArray()){
                     bs = buf.array();
                 }
                 if(bs != null){
                     System.out.println("bs length: "+limite);
                     sBuffer.append(new String(bs,0,limite ,"UTF-8"));
                 }        
                 buf.clear();  // make buffer ready for write,clear all buffer
                 //buf.compact();  // make buffer ready for write,clear dat a readed in buffer 
                 byteRead = fChannel.read(buf);
             }
             fChannel.close();
             System.out.println("####:"+sBuffer.toString());
         }
           
    }
    View Code

    2. Buffer其他方法介绍

    2.1 rewind()方法

    Buffer.rewind()将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少。

    2.2 equals()方法

    当满足下列条件时,表示两个Buffer相等:

    有相同的类型(byte、char、int等)。
    Buffer中剩余的byte、char等的个数相等。
    Buffer中所有剩余的byte、char等都相同。
    如你所见,equals只是比较Buffer的一部分,不是每一个在它里面的元素都比较。实际上,它只比较Buffer中的剩余元素。

    2.3 compareTo()方法

    compareTo()方法比较两个Buffer的剩余元素(byte、char等), 如果满足下列条件,则认为一个Buffer“小于”另一个Buffer:
    第一个不相等的元素小于另一个Buffer中对应的元素 。
    所有元素都相等,但第一个Buffer比另一个先耗尽(第一个Buffer的元素个数比另一个少)。

    3. Buffer的Scatter/Gather

     scatter(分散)是指从Channel读取数据后,写入到多个Buffer中。

    gather(聚集)是指写操作时,从多个Buffer读取数据并写入到一个Channel中。

    四、Selector 

    Selector在NIO编程中充当一个调度器的角色,轮训在其注册的channel是否ready,若ready则开始执行操作。

    仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。

    但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。不管怎么说,关于那种设计的讨论应该放在另一篇不同的文章中。在这里,只要知道使用Selector能够处理多个通道就足够了。

    1. Selector介绍

    Selector包含3个Set对象来管理SelectionKey对象,分别是以下三种:

    使用Selector前,需要确保以下操作已经执行完成:

    • Selector selector = Selector.open(); //调用open方法,获取一个Selector实例。
    • channel.configureBlocking(false); // 设置Channel为非堵塞模式
    • channel.register(selector , SelectionKey.OP_ACCEPT); //将Channel注册到selector中,并设置需监听的事件

     可以监听四种不同类型的事件:

    1. SelectionKey.OP_CONNECT
    2. SelectionKey.OP_ACCEPT
    3. SelectionKey.OP_READ
    4. SelectionKey.OP_WRITE

    如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如下:

    int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

    2. Selector常用方法

    2.1 selectXXX()方法

    • int select(),返回就绪channel的个数,会堵塞。
    • int select(long timeout),返回就绪channel个数,堵塞timeout
    • int selectNow(),返回就绪channel个数,不堵塞

    注意:每次调用selectXXX方法时,会返回此次就绪数量,例如,有一个channel就绪,则返回1,但未对这个channel的数据进行处理。接下来又有一个channel就绪,调用selectXXX方法还是返回1,但实际上此时有2个channel就绪但未被处理。

    2.2 selectedKeys()

    一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。
    当像Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。

    注意:Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。

    2.3 wakeUp()

    某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。
    如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。

    2.4 close()

    用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。

    3. 基于NIO的网络Demo

    3.1 单线程版,主线程负责处理accept和read

    package com.yyn.nio.net;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 本例子服务端只处理accept和read事件,单线程版
     * 
     * @author Michael
     *
     */
    public class NIOSingleServer {
    
        private Selector selector = null;
        //private ExecutorService pool;
        public static Charset charset = Charset.forName("UTF-8");
    
        public NIOSingleServer init(int port) throws IOException {
            //pool = Executors.newFixedThreadPool(5);
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false); // 设置为非堵塞模式
            ssc.socket().bind(new InetSocketAddress(port));
            selector = Selector.open(); // 获取一个selector
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            return this;
        }
    
        public void listen() throws IOException {
            System.out.println("Server started.....");
            while (true) {
                int n = 0;
                n = selector.select(); // 获取就绪操作的个数
                if(n == 0){
                    continue;
                }
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    it.remove(); // 每次使用后需要手工移除
                    SocketChannel channel = null;
                    if (key.isAcceptable()) {
                        try {
                            // init函数中注册的是ServerSocketChannel
                            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                            // 获取实际的SocketChannel,类似于Socket IO中的Socket
                            channel = serverSocketChannel.accept();
                            System.out.println("客户端:" + channel.getRemoteAddress() + "已连接");
                            channel.configureBlocking(false);
                            SelectionKey k = channel.register(selector, SelectionKey.OP_READ); // 注册read监听,监听客户端发过来的数据
                            //Worker worker = new Worker(k);
                            //k.attach(worker);
                        } catch (Exception e) {
                            if (channel != null) {
                                channel.close();
                            }
                        }
    
                    } else {
                        if (key.isReadable()) {
                            System.out.println("begin to process read!!!!");
                            channel = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            buffer.clear(); //切换buffer为写模式
                            int len = 0;
                            try{
                                while ((len = channel.read(buffer)) > 0) {
                                    buffer.flip(); //切换buffer为read模式
                                    System.out.println("客户端数据:"+charset.decode(buffer).toString());
                                    buffer.clear();
                                }
                                if(len == -1){  // The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream 
                                    System.out.println("客户端断开");
                                    channel.close();
                                    continue;
                                }
                            }catch(Exception e){
                                System.out.println("客户端异常啦");
                            }
                        }
                        if (key.isWritable()) {
    
                        }
                    }
    
                }
            }
        }
    
        public static void main(String[] args) throws IOException {
            // TODO Auto-generated method stub
            NIOSingleServer server = new NIOSingleServer();
            server.init(12003).listen();
        }
    
    }
    View Code
    package com.yyn.nio.net;
    
    import java.io.BufferedInputStream;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.OutputStream;
    import java.io.OutputStreamWriter;
    import java.io.PrintWriter;
    import java.net.Socket;
    import java.net.UnknownHostException;
    
    public class NIOSingleClient {
    
        public static void main(String[] args) throws UnknownHostException, IOException {
            // TODO Auto-generated method stub
            Socket socket = new Socket("127.0.0.1", 12003);
            OutputStreamWriter osw = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");
            PrintWriter out = new PrintWriter(osw);
            
            InputStreamReader isr = new InputStreamReader(new BufferedInputStream(System.in), "UTF-8");
            BufferedReader in = new BufferedReader(isr);
            String data = "";
            while(true){
                data = in.readLine();
                data = data.trim().toUpperCase();
                if(data.equals("EIXT")){
                    out.close();
                    socket.close();
                    System.exit(0);
                }
                System.out.println("read data from comsole:" + data);
                out.println(data);
                out.flush();
                System.out.println("sending data to server:" + data);
            }
            
        }
    
    }
    View Code

    3.2 多线程版,主线程负责accept,子线程负责read

    package com.yyn.nio.net;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    import java.util.logging.LoggingMXBean;
    
    /**
     * 本例子服务端只处理accept和read事件,多线程版
     * @author Michael
     *
     */
    public class NIOMultiServer {
    
        private Selector acceptSelector = null;
        private Selector readSelector = null;
        public static Charset charset = Charset.forName("UTF-8");
        
        public NIOMultiServer init(int port) throws IOException {
            //pool = Executors.newFixedThreadPool(5);
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false); // 设置为非堵塞模式
            ssc.socket().bind(new InetSocketAddress(port));
            acceptSelector = Selector.open(); // 获取一个selector
            readSelector = Selector.open();
            ssc.register(acceptSelector, SelectionKey.OP_ACCEPT);
            return this;
        }
        
        public void listen() throws IOException {
            System.out.println("Server started.....");
            new Worker(this.readSelector).start(); 
            while (true) {
                int n = 0;
                n = acceptSelector.select();
                if(n == 0)
                    continue;
                Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    it.remove();
                    // init函数中注册的是ServerSocketChannel
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                    // 获取实际的SocketChannel,类似于Socket IO中的Socket,必须accept后才有SocketChannel
                    SocketChannel channel = serverSocketChannel.accept();
                    channel.configureBlocking(false);
                    System.out.println("客户端:" + channel.getRemoteAddress() + "已连接");
                    if(key.isAcceptable()){
                        channel.register(this.readSelector, SelectionKey.OP_READ);
                    }
                }
            }
        }
        
        private static class Worker extends Thread{
            
            private Selector readSelector = null;
            
            public Worker(Selector selector){
                this.readSelector = selector;
            }
            
            public void run(){
                System.out.println("Read thread started....");
                while (true) {
                    int n = 0;
                    SocketChannel channel = null;
                    try {
                        n= this.readSelector.select(10);
                        if(n == 0)
                            continue;
                        System.out.println("read thread, n is: " + n);
                        Iterator<SelectionKey> it = readSelector.selectedKeys().iterator();
                        while (it.hasNext()) {
                            SelectionKey key = it.next();
                            it.remove();
                            if(key.isReadable()){
                                channel = (SocketChannel) key.channel();
                                System.out.println("begin to process read at: " + channel.getRemoteAddress());
                                ByteBuffer buffer = ByteBuffer.allocate(1024);
                                buffer.clear(); //将buffer切换为写模式
                                long len = 0;
                                while((len = channel.read(buffer)) > 0){
                                    buffer.flip();  //将buffer切换为读模式
                                    System.out.println("客户端数据:"+charset.decode(buffer).toString());
                                    buffer.clear();
                                }
                                if(len == -1){
                                    System.out.println("客户端断开");
                                    channel.close();
                                    continue;
                                }
                            }
                        }
                        
                    } catch (IOException e) {
                        System.out.println("客户端异常啦");
                        try {
                            channel.close();
                        } catch (IOException e1) {
                            // TODO Auto-generated catch block
                            System.out.println("关闭channel发生异常");
                        }
                    }
    
                    
                }
            }
        }
        
        
        
        public static void main(String[] args) throws IOException {
            NIOMultiServer server = new NIOMultiServer();
            server.init(12003);
            server.listen();
        }
    
    }
    View Code
    package com.yyn.nio.net;
    
    import java.io.BufferedInputStream;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.OutputStream;
    import java.io.OutputStreamWriter;
    import java.io.PrintWriter;
    import java.net.Socket;
    import java.net.UnknownHostException;
    
    public class NIOSingleClient {
    
        public static void main(String[] args) throws UnknownHostException, IOException {
            // TODO Auto-generated method stub
            Socket socket = new Socket("127.0.0.1", 12003);
            OutputStreamWriter osw = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");
            PrintWriter out = new PrintWriter(osw);
            
            InputStreamReader isr = new InputStreamReader(new BufferedInputStream(System.in), "UTF-8");
            BufferedReader in = new BufferedReader(isr);
            String data = "";
            while(true){
                data = in.readLine();
                data = data.trim().toUpperCase();
                if(data.equals("EIXT")){
                    out.close();
                    socket.close();
                    System.exit(0);
                }
                System.out.println("read data from comsole:" + data);
                out.println(data);
                out.flush();
                System.out.println("sending data to server:" + data);
            }
            
        }
    
    }
    View Code

    dd

  • 相关阅读:
    解决代码冲突问题
    一些自己踩到的坑
    鼠标加特效
    在django里写一个脚本,脚本里可以使用django里的model
    在linux 上用系统命令systemctl 执行python脚本
    scp 传输命令
    使用django-cors-headers 来解决跨域问题
    访问 Django 项目的静态资源
    如何用ORM自定义排序
    Mac 安装 Novicat
  • 原文地址:https://www.cnblogs.com/ulli/p/6291740.html
Copyright © 2011-2022 走看看