zoukankan      html  css  js  c++  java
  • NIO学习笔记

    NIO三大核心:Buffer、Channel、Selector

    1. 每个Channel都会对应一个Buffer
    2. Selector对应一个线程,多个Channel会注册到一个Selector上
    3. 程序切换到哪个Channel是由事件决定的,Selector会根据不同的事件,在各个Channel上切换
    4. Buffer就是一个内存块,底层是数组
    5. NIO数据的读取写入是通过Buffer,是双向的。而BIO的数据读写依靠的是输入输出流,是单向的(要么输入要么输出)
    6. Channel是双向的,可以更好地抽象底层操作系统的情况(底层的操作系统通道就是双向的)

    Buffer

    缓冲区其实就是数组的封装

    七种基本数据类型中,除了boolean外,系统都有对应类型的Buffer:

    • ByteBuffer
    • CharBuffer
    • ShortBuffer
    • IntBuffer
    • LongBuffer
    • FloatBuffer
    • DoubleBuffer
      上述缓冲区的管理方式几乎是一直的,创建都是通过一个allocate方法获取的

    缓冲区存取数据的两个核心方法

    • put() 存入数据到缓冲区中
    • get() 获取缓存冲区中的数据

    缓冲区中的四个核心属性

    • capacity 容量,表示缓冲区中最大存储数据的容量,一旦声明不能改变。
    • limit 界限,表示缓冲区中可以操作数据的大小。(limit后面的数据不能进行读写)
    • position 位置,表示缓冲区中正在操作的位置
    • mark 标记,表示记录当前position的位置,可以通过reset()恢复到mark的位置
      position <= limit <= capacity

    直接缓冲区与非直接缓冲区

    • 非直接缓冲器:通过allocate()方法分配,将缓冲区建立在JVM的内存中,属于堆内存
    • 直接缓冲区:通过allocateDirect()方法分配,将缓冲区建立在物理内存中,可以提高效率,属于堆外内存,只有ByteBuffer有这个东西

    代码示例

    package day01;
    
    import java.nio.ByteBuffer;
    
    import org.junit.Test;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class TestBuffer {
    	
    	@Test
    	public void test_1() {
    		
    		// 1. 分配一个指定大小的缓冲区
    		log.info("1. 分配一个指定大小的缓冲区");
    		ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    		
    		log.info("capacity: {}", byteBuffer.capacity());
    		log.info("limit: {}", byteBuffer.limit());
    		log.info("position: {}", byteBuffer.position());
    		
    		// 2. 使用put()存入数据到缓冲区
    		log.info("2. 使用put()存入数据到缓冲区");
    		byteBuffer.put("Hello Java".getBytes());
    		
    		log.info("capacity: {}", byteBuffer.capacity());
    		log.info("limit: {}", byteBuffer.limit()); // limit不变
    		log.info("position: {}", byteBuffer.position()); // position增加10
    		
    		// 3. 切换为读取数据模式
    		log.info("3. 切换为读取数据模式");
    		byteBuffer.flip();
    		
    		log.info("capacity: {}", byteBuffer.capacity());
    		log.info("limit: {}", byteBuffer.limit()); // limit变为position的值
    		log.info("position: {}", byteBuffer.position()); // position归0
    		
    		// 4. 使用get()读取缓冲区的数据
    		log.info("4. 使用get()读取缓冲区的数据");
    		byte[] dst = new byte[byteBuffer.limit()];
    		byteBuffer.get(dst, 3, 5);
    		log.info(new String(dst));
    		
    		log.info("capacity: {}", byteBuffer.capacity());
    		log.info("limit: {}", byteBuffer.limit()); // limit不变
    		log.info("position: {}", byteBuffer.position()); // position增加到读取的位置
    		
    		// 5. rewind()重复读数据
    		log.info("5. rewind()重复读数据");
    		byteBuffer.rewind();
    		
    		log.info("capacity: {}", byteBuffer.capacity());
    		log.info("limit: {}", byteBuffer.limit()); // limit不变
    		log.info("position: {}", byteBuffer.position()); // position回到0
    		
    		// 6. clear()清空缓冲区,但是缓冲区中的数据还在,会被重新添加的数据覆盖
    		log.info("6. clear()清空缓冲区");
    		byteBuffer.clear();
    		
    		log.info("capacity: {}", byteBuffer.capacity());
    		log.info("limit: {}", byteBuffer.limit()); // limit回到capacity
    		log.info("position: {}", byteBuffer.position()); // position回到0
    	}
    	
    	@Test
    	public void test_2() {
    		
    		ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    		
    		byteBuffer.put("123456789".getBytes());
    		
    		byteBuffer.flip();
    		
    		byte[] dst = new byte[byteBuffer.limit()];
    		
    		byteBuffer.get(dst, 0, 2);
    		log.info(new String(dst));
    		
    		byteBuffer.mark();
    		dst = new byte[byteBuffer.limit()];
    		byteBuffer.get(dst, 2, 2);
    		log.info(new String(dst));
    		
    		byteBuffer.reset();
    		dst = new byte[byteBuffer.limit()];
    		byteBuffer.get(dst, 2, 2);
    		log.info(new String(dst));
    		
    		if (byteBuffer.hasRemaining()) {
    			log.info(String.valueOf(byteBuffer.remaining()));
    		}
    	}
    	
    	@Test
    	public void test_3() {
    		
    		ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
    		
    		log.info(String.valueOf(byteBuffer.isDirect()));
    	}
    }
    

    Channel

    通道(Channel):用于源节点与目标节点的连接,在Java NIO中负责缓冲区中数据的传输。Channel本身不存储数据,因此需要配合缓冲区进行操作

    通道的主要实现类

    java.nio.channels.Channel接口
      |- FileChannel
      |- SocketChannel
      |- ServerSocketChannel
      |- DatagramChannel	用于UDP
    

    获取通道

    1. Java针对支持通道的类提供了getChannel()方法
      本地IO:
      FileInputStream/FileOutputStream
      RandomAccessFile/
      网络IO:
      Socket
      ServerSocket
      DatagramSocket

    2. JDK1.7之后,针对各个通道,都提供了静态方法open()

    3. JDK1.7之后,Files工具类的newByteChannel()方法

    通道之间的数据传输

    transferFrom()
    transferTo()

    分散(Scatter)与聚集(Gather)

    分散读取(Scatting Reads):将通道中的数据分散到多个缓冲区中
    聚集写入(Gathering Writes):将多个缓冲区的数据聚集到通道中

    字符集:Charset

    编码 与 解码

    代码示例

    package day01;
    
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.RandomAccessFile;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.MappedByteBuffer;
    import java.nio.channels.FileChannel;
    import java.nio.channels.FileChannel.MapMode;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    import java.nio.charset.CharsetEncoder;
    import java.nio.file.Paths;
    import java.nio.file.StandardOpenOption;
    import java.util.Map;
    import java.util.Map.Entry;
    
    import org.junit.Test;
    
    public class TestChannel {
    
    	/**
    	 * 利用通道完成文件的复制(非直接缓冲区)
    	 * @throws Exception
    	 */
    	@Test
    	public void test_1() throws Exception {
    		
    		FileInputStream fis = new FileInputStream("timg.jpg");
    		FileOutputStream fos = new FileOutputStream("timg_copy.jpg");
    		
    		// 1. 获取通道
    		FileChannel inChannel = fis.getChannel();
    		FileChannel outChannel = fos.getChannel();
    		
    		// 2. 分配指定大小的缓冲区
    		ByteBuffer buf = ByteBuffer.allocate(1024);
    		
    		// 3. 将通道中的数据写入缓冲区
    		while (inChannel.read(buf) != -1) {
    		
    			// 4. 将缓冲区中的数据写入通道
    			buf.flip(); // 切换为读取数据的模式
    			outChannel.write(buf);
    			buf.clear();
    		}
    		
    		inChannel.close();
    		outChannel.close();
    		
    		fis.close();
    		fos.close();
    	}
    	
    	/**
    	 * 使用直接缓冲区完成文件的复制(内存映射文件)
    	 * @throws Exception
    	 */
    	@Test
    	public void test_2() throws Exception {
    		
    		FileChannel inChannel = FileChannel.open(Paths.get("timg.jpg"), StandardOpenOption.READ);
    		FileChannel outChannel = FileChannel.open(Paths.get("timg_copy.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); // 存在就创建,不存在就报错
    		
    		// 这就是一个内存映射文件,属于直接缓冲区
    		MappedByteBuffer inBuffer = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
    		MappedByteBuffer outBuffer = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());
    		
    		// 直接对缓冲区进行读写操作
    		byte[] dst = new byte[inBuffer.limit()];
    		inBuffer.get(dst);
    		outBuffer.put(dst);
    		
    		inChannel.close();
    		outChannel.close();
    	}
    	
    	/**
    	 * 通道之间的数据传输
    	 * 属于直接缓冲区
    	 * @throws Exception
    	 */
    	@Test
    	public void test_3() throws Exception {
    		
    		FileChannel inChannel = FileChannel.open(Paths.get("timg.jpg"), StandardOpenOption.READ);
    		FileChannel outChannel = FileChannel.open(Paths.get("timg_copy.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); // 存在就创建,不存在就报错
    		
    		inChannel.transferTo(0, inChannel.size(), outChannel);
    		
    		inChannel.close();
    		outChannel.close();
    	}
    	
    	/**
    	 * 
    	 * @throws Exception
    	 */
    	@Test
    	public void test_4() throws Exception {
    		
    		RandomAccessFile raf = new RandomAccessFile("pom.xml", "rw");
    		RandomAccessFile waf = new RandomAccessFile("pom_copy.xml", "rw");
    		
    		// 1. 获取通道
    		FileChannel rfc = raf.getChannel();
    		FileChannel wfc = waf.getChannel();
    		
    		// 2. 分配指定大小缓冲区
    		ByteBuffer bf1 = ByteBuffer.allocate(100);
    		ByteBuffer bf2 = ByteBuffer.allocate(100);
    		
    		// 3. 分散读取
    		ByteBuffer[] bfs = {bf1, bf2};
    		
    		while ((rfc.read(bfs) != -1)) {
    			// 设置为读模式
    			for (int i = 0; i < bfs.length; i++) {
    				bfs[i].flip();
    			}
    			// 将buffer数据写入通道
    			wfc.write(bfs);
    			// 清空buffer
    			for (int i = 0; i < bfs.length; i++) {
    				bfs[i].clear();
    			}
    		}
    		
    		
    		rfc.close();
    		wfc.close();
    		
    		raf.close();
    		waf.close();
    	}
    	
    	@Test
    	public void test_5() throws Exception {
    		
    		Map<String, Charset> charsetMap = Charset.availableCharsets();
    		
    		for (Entry<String, Charset> entry : charsetMap.entrySet()) {
    			System.out.println("key: " + entry.getKey() + ", value: " + entry.getValue());
    		}
    		
    		Charset cs1 = Charset.forName("GBK");
    		
    		// 获取编码器
    		CharsetEncoder encoder = cs1.newEncoder();
    		// 获取解码器
    		CharsetDecoder decoder = cs1.newDecoder();
    		
    		CharBuffer cb = CharBuffer.allocate(100);
    		cb.put("你好");
    		cb.flip();
    		
    		// 编码操作
    		ByteBuffer bb = encoder.encode(cb);
    		
    		// 解码操作
    		bb.flip(); // 所有的读操作之前都要进行模式切换
    		CharBuffer cb2 = decoder.decode(bb);
    		System.out.println(cb2.toString());
    		
    		bb.flip(); // 所有的读操作之前都要进行模式切换
    		Charset cs2 = Charset.forName("UTF-8");
    		System.out.println(cs2.decode(bb).toString());
    	}
    }
    
    

    使用NIO完成网络通信

    阻塞

    package day02;
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.file.Paths;
    import java.nio.file.StandardOpenOption;
    
    import org.junit.Test;
    
    /**
     * 一、使用NIO完成网络通信的三个核心:
     * 
     * 1. 通道(Channel):负责连接
     * 
     * java.io.channels.Channel
     *   |-SelectableChannel
     *       |-SocketChannel
     *       |-ServerSocketChannel
     *       |-DatagramChannel
     *       
     *       |-Pipe.SinkChannel
     *       |-Pipe.SourceChannel
     *       
     * 2. 缓冲区(Buffer):负责数据的存取
     * 
     * 3. 选择器(Selector):是SelectableChannel的多路复用器。用于监控SelectableChannel的IO状况
     * 
     *    
     */
    public class TestBlockNIO {
    
    	
    	@Test
    	public void testClient() throws Exception {
    		
    		// 获取客户端通道
    		SocketChannel clientChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8899));
    		
    		// 测试客户端发送文件
    		FileChannel fc = FileChannel.open(Paths.get("timg.jpg"), StandardOpenOption.READ);
    		
    		// 创建一个字节缓冲区
    		ByteBuffer byteBuffer = ByteBuffer.allocate(100);
    		
    		// 循环读取数据到缓冲区
    		while (fc.read(byteBuffer) != -1) {
    			// 切换缓冲区为读模式
    			byteBuffer.flip();
    			// 将数据写入到客户端通道
    			clientChannel.write(byteBuffer);
    			// 清空缓冲区
    			byteBuffer.clear();
    		}
    		
    		// 告诉服务端发送完毕,阻塞模式下必须这样才能继续运行
    		clientChannel.shutdownOutput();
    		
    		// 接收服务端反馈
    		int len = 0;
    		while ((len = clientChannel.read(byteBuffer)) != -1) {
    			byteBuffer.flip();
    			System.out.println(new String(byteBuffer.array(), 0, len));
    			byteBuffer.clear();
    		}
    		
    		// 关闭通道
    		fc.close();
    		clientChannel.close();
    	}
    	
    	@Test
    	public void TestServer() throws Exception {
    		
    		// 获取服务端通道
    		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    		
    		// 测试文件接收
    		FileChannel fc = FileChannel.open(Paths.get("timg_net.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
    		
    		// 绑定服务端端口号
    		serverSocketChannel.bind(new InetSocketAddress(8899));
    		
    		// 监听客户端通道
    		SocketChannel socketChannel = serverSocketChannel.accept();
    		
    		// 创建缓冲区
    		ByteBuffer byteBuffer = ByteBuffer.allocate(100);
    		
    		// 循环读取数据
    		while (socketChannel.read(byteBuffer) != -1) {
    			// 切换缓冲区为读模式
    			byteBuffer.flip();
    			// 将数据写入到文件通道
    			fc.write(byteBuffer);
    			// 清空缓冲区
    			byteBuffer.clear();
    		}
    		
    		// 发送反馈给客户端
    		byteBuffer.put("接收完毕".getBytes());
    		byteBuffer.flip();
    		socketChannel.write(byteBuffer);
    		byteBuffer.clear();
    		
    		// 关闭通道
    		fc.close();
    		socketChannel.close();
    		serverSocketChannel.close();
    	}
    }
    

    非阻塞

    package day02;
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Scanner;
    
    import org.junit.Test;
    
    /**
     * 一、使用NIO完成网络通信的三个核心:
     * 
     * 1. 通道(Channel):负责连接
     * 
     * java.io.channels.Channel
     *   |-SelectableChannel
     *       |-SocketChannel
     *       |-ServerSocketChannel
     *       |-DatagramChannel
     *       
     *       |-Pipe.SinkChannel
     *       |-Pipe.SourceChannel
     *       
     * 2. 缓冲区(Buffer):负责数据的存取
     * 
     * 3. 选择器(Selector):是SelectableChannel的多路复用器。用于监控SelectableChannel的IO状况
     * 
     *    
     */
    public class TestNoneBlockNIO {
    
    	@Test
    	public void client() throws Exception {
    		
    		SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8899));
    		
    		// 切换为非阻塞模式
    		socketChannel.configureBlocking(false);
    		
    		Scanner scanner = new Scanner(System.in);
    		
    		while (scanner.hasNext()) {
    			
    			ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    			byteBuffer.put(scanner.next().getBytes());
    			byteBuffer.flip();
    			
    			socketChannel.write(byteBuffer);
    		}
    		
    		scanner.close();
    		
    		socketChannel.close();
    	}
    	
    	@Test
    	public void server() throws Exception {
    		
    		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    		
    		// 切换为非阻塞模式
    		serverSocketChannel.configureBlocking(false);
    		
    		serverSocketChannel.bind(new InetSocketAddress(8899));
    		
    		// 获取选择器
    		Selector selector = Selector.open();
    		
    		// 将通道户注册到选择器上,并指定监听某个事件
    		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    		
    		// 轮询式的获取选择器上已经准备就绪的事件
    		while (selector.select() > 0) {
    			
    			// 获取当前选择器中所有注册的 已就绪监听事件
    			Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    			
    			while (iterator.hasNext()) {
    				
    				// 获取准备准备就绪的事件
    				SelectionKey selectionKey = iterator.next();
    				iterator.remove();
    				
    				// 如果是接收事件就绪
    				if (selectionKey.isAcceptable()) {
    					
    					// 获取客户端连接
    					SocketChannel socketChannel = serverSocketChannel.accept();
    					
    					// 切换为非阻塞模式
    					socketChannel.configureBlocking(false);
    					
    					// 将通道注册到选择器上
    					socketChannel.register(selector, SelectionKey.OP_READ);
    				} else if (selectionKey.isReadable()) {
    					
    					// 获取当前接收器上的读就绪状态
    					SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    					
    					ByteBuffer buffer = ByteBuffer.allocate(1024);
    					
    					int len = 0;
    					while ((len = socketChannel.read(buffer)) > 0) {
    						buffer.flip();
    						System.out.println(":::" + new String(buffer.array(), 0, len));
    						buffer.clear();
    					}
    				}
    			}
    		}
    	}
    }
    

    管道

    是单向的,这边发,另一边收

    package day02;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.Pipe;
    import java.util.Scanner;
    
    import org.junit.Test;
    
    public class TestPipe {
    
    	private Pipe pipe;
    	
    	{
    		try {
    			pipe = Pipe.open();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	@Test
    	public void test() throws Exception {
    		test_1();
    		test_2();
    		Thread.currentThread().join();
    	}
    	
    	public void test_1() {
    		
    		new Thread(() -> {
    			
    			try {
    				
    				Pipe.SinkChannel sinkChannel = pipe.sink();
    				
    				Scanner scanner = new Scanner(System.in);
    				
    				ByteBuffer buffer = ByteBuffer.allocate(1024);
    				
    				while (scanner.hasNext()) {
    					
    					buffer.put(scanner.next().getBytes());
    					buffer.flip();
    					sinkChannel.write(buffer);
    					buffer.clear();
    				}
    				
    				scanner.close();
    				sinkChannel.close();
    				
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    			
    		}).start();
    	}
    	
    	public void test_2() {
    		
    		new Thread(() -> {
    			
    			try {
    				
    				Pipe.SourceChannel sourceChannel = pipe.source();
    				
    				ByteBuffer buffer = ByteBuffer.allocate(1024);
    				
    				// != -1 或者 > 0 的写法都可以
    				while((sourceChannel.read(buffer)) != -1) {
    					buffer.flip();
    					System.out.println(new String(buffer.array(), 0, buffer.limit()));
    					buffer.clear();
    				}
    				
    				sourceChannel.close();
    				
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    			
    		}).start();
    	}
    }
    

    UDPNIO

    package day02;
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.DatagramChannel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.util.Iterator;
    import java.util.Scanner;
    
    import org.junit.Test;
    
    public class TestUDPNIO {
    
    	@Test
    	public void client() throws Exception {
    		
    		DatagramChannel datagramChannel = DatagramChannel.open();
    		
    		datagramChannel.configureBlocking(false);
    		
    		Scanner scanner = new Scanner(System.in);
    		
    		ByteBuffer buffer = ByteBuffer.allocate(1024);
    		
    		while (scanner.hasNext()) {
    			
    			// 不考虑buffer长度不够的情况
    			buffer.put(scanner.next().getBytes());
    			
    			buffer.flip();
    			
    			datagramChannel.send(buffer, new InetSocketAddress("127.0.0.1", 8899));
    			
    			buffer.clear();
    		}
    		
    		scanner.close();
    		
    		datagramChannel.close();
    	}
    	
    	@Test
    	public void server() throws Exception {
    		
    		DatagramChannel datagramChannel = DatagramChannel.open();
    		
    		datagramChannel.configureBlocking(false);
    		
    		datagramChannel.bind(new InetSocketAddress(8899));
    		
    		Selector selector = Selector.open();
    		
    		datagramChannel.register(selector, SelectionKey.OP_READ);
    		
    		ByteBuffer buffer = ByteBuffer.allocate(1024);
    		
    		while (selector.select() > 0) {
    			
    			Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    			
    			while(iterator.hasNext()) {
    				
    				SelectionKey selectionKey = iterator.next();
    				
    				iterator.remove();
    				
    				if (selectionKey.isReadable()) {
    					
    					// 不考虑buffer长度不够的情况
    					datagramChannel.receive(buffer);
    					
    					buffer.flip();
    					System.out.println(new String(buffer.array(), 0, buffer.limit()));
    					buffer.clear();
    				}
    			}
    		}
    	}
    }
    
  • 相关阅读:
    读大道至简有感
    动手动脑7
    大道至简第七章第八章
    课后作业(接口与继承)
    大道至简第六章
    课后题以及动手动脑1 3题
    大道至简第五章
    课后作业和动手动脑(字符串)
    大道至简第四章
    课后作业及动手动脑
  • 原文地址:https://www.cnblogs.com/CSunShine/p/12567429.html
Copyright © 2011-2022 走看看