2 选择器( Selector )
选择器( Selector ) 是 SelectableChannle 对象的多路复用器, Selector 可以同时监控多个 SelectableChannel 的 IO 状况 ,
也就是说,利用 Selector 可使一个单独的线程管理多个 Channel 。Selector 是非阻塞 IO 的核心。
SelectableChannle 的结构如下图:
3. 选择 器( Selector )的应用
a 当调用 register(Selector sel, int ops) 将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数 ops 指定。
b 可以监听的事件类型( 可使用 SelectionKey 的四个常量 表示 )
读 :SelectionKey.OP_READ
写 : SelectionKey.OP_WRITE
连接 :SelectionKey.OP_CONNECT
接收 : SelectionKey.OP_ACCEPT
c 若注册时不止监听一个事件,则可以使用“位或”操作符连接。 例:
5 SocketChannel
Java NIO 中的 SocketChannel 是一个连接到 TCP 网 络套接字的通道 。
操作步骤:
打开SocketChannel
读写数据
关闭
6 SocketChannelSocketChannel
Java NIO 中的ServerSocketChannel 是一个可以 监听新进来的 TCP 连接的通道,就像标准 IO 中 的 ServerSocket 一样
package com.atguigu.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import org.junit.Test; /* * 一、使用 NIO 完成网络通信的三个核心: * * 1. 通道(Channel):负责连接 * * java.nio.channels.Channel 接口: * |--SelectableChannel * |--SocketChannel * |--ServerSocketChannel * |--DatagramChannel * * |--Pipe.SinkChannel * |--Pipe.SourceChannel * * 2. 缓冲区(Buffer):负责数据的存取 * * 3. 选择器(Selector):是 SelectableChannel 的多路复用器。用于监控 SelectableChannel 的 IO 状况 * */ public class TestBlockingNIO { //客户端 @Test public void client() throws IOException{ //1. 获取通道 SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898)); FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); //2. 分配指定大小的缓冲区 ByteBuffer buf = ByteBuffer.allocate(1024); //3. 读取本地文件,并发送到服务端 while(inChannel.read(buf) != -1){ buf.flip(); sChannel.write(buf); buf.clear(); } //4. 关闭通道 inChannel.close(); sChannel.close(); } //服务端 @Test public void server() throws IOException{ //1. 获取通道 ServerSocketChannel ssChannel = ServerSocketChannel.open(); FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); //2. 绑定连接 ssChannel.bind(new InetSocketAddress(9898)); //3. 获取客户端连接的通道 SocketChannel sChannel = ssChannel.accept(); //4. 分配指定大小的缓冲区 ByteBuffer buf = ByteBuffer.allocate(1024); //5. 接收客户端的数据,并保存到本地 while(sChannel.read(buf) != -1){ buf.flip(); outChannel.write(buf); buf.clear(); } //6. 关闭通道 sChannel.close(); outChannel.close(); ssChannel.close(); } }
package com.atguigu.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import org.junit.Test; public class TestBlockingNIO2 { //客户端 @Test public void client() throws IOException{ SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898)); FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); ByteBuffer buf = ByteBuffer.allocate(1024); while(inChannel.read(buf) != -1){ buf.flip(); sChannel.write(buf); buf.clear(); } sChannel.shutdownOutput(); //接收服务端的反馈 int len = 0; while((len = sChannel.read(buf)) != -1){ buf.flip(); System.out.println(new String(buf.array(), 0, len)); buf.clear(); } inChannel.close(); sChannel.close(); } //服务端 @Test public void server() throws IOException{ ServerSocketChannel ssChannel = ServerSocketChannel.open(); FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); ssChannel.bind(new InetSocketAddress(9898)); SocketChannel sChannel = ssChannel.accept(); ByteBuffer buf = ByteBuffer.allocate(1024); while(sChannel.read(buf) != -1){ buf.flip(); outChannel.write(buf); buf.clear(); } //发送反馈给客户端 buf.put("服务端接收数据成功".getBytes()); buf.flip(); sChannel.write(buf); sChannel.close(); outChannel.close(); ssChannel.close(); } }
package com.atguigu.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Scanner; import org.junit.Test; /* * 一、使用 NIO 完成网络通信的三个核心: * * 1. 通道(Channel):负责连接 * * java.nio.channels.Channel 接口: * |--SelectableChannel * |--SocketChannel * |--ServerSocketChannel * |--DatagramChannel * * |--Pipe.SinkChannel * |--Pipe.SourceChannel * * 2. 缓冲区(Buffer):负责数据的存取 * * 3. 选择器(Selector):是 SelectableChannel 的多路复用器。用于监控 SelectableChannel 的 IO 状况 * */ public class TestNonBlockingNIO { //客户端 @Test public void client() throws IOException{ //1. 获取通道 SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898)); //2. 切换非阻塞模式 sChannel.configureBlocking(false); //3. 分配指定大小的缓冲区 ByteBuffer buf = ByteBuffer.allocate(1024); //4. 发送数据给服务端 Scanner scan = new Scanner(System.in); while(scan.hasNext()){ String str = scan.next(); buf.put((new Date().toString() + " " + str).getBytes()); buf.flip(); sChannel.write(buf); buf.clear(); } //5. 关闭通道 sChannel.close(); } //服务端 @Test public void server() throws IOException{ //1. 获取通道 ServerSocketChannel ssChannel = ServerSocketChannel.open(); //2. 切换非阻塞模式 ssChannel.configureBlocking(false); //3. 绑定连接 ssChannel.bind(new InetSocketAddress(9898)); //4. 获取选择器 Selector selector = Selector.open(); //5. 将通道注册到选择器上, 并且指定“监听接收事件” ssChannel.register(selector, SelectionKey.OP_ACCEPT); //6. 轮询式的获取选择器上已经“准备就绪”的事件 while(selector.select() > 0){ //7. 获取当前选择器中所有注册的“选择键(已就绪的监听事件)” Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while(it.hasNext()){ //8. 获取准备“就绪”的是事件 SelectionKey sk = it.next(); //9. 判断具体是什么事件准备就绪 if(sk.isAcceptable()){ //10. 若“接收就绪”,获取客户端连接 SocketChannel sChannel = ssChannel.accept(); //11. 切换非阻塞模式 sChannel.configureBlocking(false); //12. 将该通道注册到选择器上 sChannel.register(selector, SelectionKey.OP_READ); }else if(sk.isReadable()){ //13. 获取当前选择器上“读就绪”状态的通道 SocketChannel sChannel = (SocketChannel) sk.channel(); //14. 读取数据 ByteBuffer buf = ByteBuffer.allocate(1024); int len = 0; while((len = sChannel.read(buf)) > 0 ){ buf.flip(); System.out.println(new String(buf.array(), 0, len)); buf.clear(); } } //15. 取消选择键 SelectionKey it.remove(); } } } }
package com.atguigu.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Date; import java.util.Iterator; import java.util.Scanner; import org.junit.Test; public class TestNonBlockingNIO2 { @Test public void send() throws IOException{ DatagramChannel dc = DatagramChannel.open(); dc.configureBlocking(false); ByteBuffer buf = ByteBuffer.allocate(1024); Scanner scan = new Scanner(System.in); while(scan.hasNext()){ String str = scan.next(); buf.put((new Date().toString() + ": " + str).getBytes()); buf.flip(); dc.send(buf, new InetSocketAddress("127.0.0.1", 9898)); buf.clear(); } dc.close(); } @Test public void receive() throws IOException{ DatagramChannel dc = DatagramChannel.open(); dc.configureBlocking(false); dc.bind(new InetSocketAddress(9898)); Selector selector = Selector.open(); dc.register(selector, SelectionKey.OP_READ); while(selector.select() > 0){ Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey sk = it.next(); if(sk.isReadable()){ ByteBuffer buf = ByteBuffer.allocate(1024); dc.receive(buf); buf.flip(); System.out.println(new String(buf.array(), 0, buf.limit())); buf.clear(); } } it.remove(); } } }
7 DatagramChannel
Java NIO 中的 DatagramChannel 是一个能收发 UDP 包的通道 。
操作步骤:
打开DatagramChannel
接收 / 发送数据
8 管道 (Pipe)
Java NIO 管道是 2 个线程之间的单向数据连接。 Pipe 有一个
source 通道和一个 sink 通道。数据会 被写到 sink 通道,从 source 通道读取。
package com.atguigu.nio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Pipe; import org.junit.Test; public class TestPipe { @Test public void test1() throws IOException{ //1. 获取管道 Pipe pipe = Pipe.open(); //2. 将缓冲区中的数据写入管道 ByteBuffer buf = ByteBuffer.allocate(1024); Pipe.SinkChannel sinkChannel = pipe.sink(); buf.put("通过单向管道发送数据".getBytes()); buf.flip(); sinkChannel.write(buf); //3. 读取缓冲区中的数据 Pipe.SourceChannel sourceChannel = pipe.source(); buf.flip(); int len = sourceChannel.read(buf); System.out.println(new String(buf.array(), 0, len)); sourceChannel.close(); sinkChannel.close(); } }