使用选择器实现服务端的socket数据接收的程序,是nio最精华最核心的部分
服务端
1 private static int PORT = 1234; 2 private static ByteBuffer bb = ByteBuffer.allocate(1024); 3 4 // 从通道中读取数据 5 protected static void readDataFromSocket(SelectionKey sk) throws Exception { 6 SocketChannel sc = (SocketChannel) sk.channel(); 7 bb.clear(); 8 while (sc.read(bb) > 0) { 9 bb.flip(); 10 while (bb.hasRemaining()) { 11 System.out.print((char) bb.get()); 12 } 13 System.out.println(); 14 bb.clear(); 15 } 16 } 17 18 public static void main(String[] args) throws Exception { 19 // 先确定端口号 20 int port = PORT; 21 if (args != null && args.length > 0) { 22 port = Integer.parseInt(args[0]); 23 } 24 // 打开一个ServerSocketChannel 25 ServerSocketChannel ssc = ServerSocketChannel.open(); 26 //绑定端口 27 ssc.bind(new InetSocketAddress(port)); 28 // 设置ServerSocketChannel为非阻塞模式 29 ssc.configureBlocking(false); 30 // 打开一个选择器 31 Selector selector = Selector.open(); 32 // 将ServerSocketChannel注册到选择器上去并监听accept事件 33 ssc.register(selector, SelectionKey.OP_ACCEPT); 34 while (true) { 35 // 这里会发生阻塞,等待就绪的通道 36 int n = selector.select(); 37 // 没有就绪的通道则什么也不做 38 if (n == 0) { 39 continue; 40 } 41 // 获取SelectionKeys上已经就绪的通道的集合 42 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); 43 // 遍历每一个Key 44 while (iterator.hasNext()) { 45 SelectionKey sk = iterator.next(); 46 // 通道上是否有可接受的连接 47 if (sk.isAcceptable()) { 48 ServerSocketChannel ssc1 = (ServerSocketChannel) sk.channel(); 49 SocketChannel sc = ssc1.accept(); 50 sc.configureBlocking(false); 51 sc.register(selector, SelectionKey.OP_READ); 52 } 53 // 通道上是否有数据可读 54 else if (sk.isReadable()) { 55 readDataFromSocket(sk); 56 } 57 iterator.remove(); 58 } 59 } 60 }
(1)第34行~第36行,调用select()方法等待来自客户端的Socket数据。程序会阻塞在这儿不会往下走,直到客户端有Socket数据的到来为止,所以严格意义上来说,NIO并不是一种非阻塞IO,因为NIO会阻塞在Selector的select()方法上
(2)第41行~第45行,获取到已经就绪的通道的迭代器进行迭代,泛型是选择键SelectionKey,前文讲过,选择键用于封装特定的通道
(3)第46行~第56行,这里是一个关键点、核心点,这里做了两件事情:
a)满足isAcceptable()则表示该通道上有数据到来了,此时我们做的事情不是获取该通道->创建一个线程来读取该通道上的数据,这么做就和前面一直讲的阻塞IO没有区别了,也无法发挥出NIO的优势来。我们做的事情只是简单地将对应的SocketChannel注册到选择器上,通过传入OP_READ标记,告诉选择器我们关心新的Socket通道什么时候可以准备好读数据
b)满足isReadable()则表示新注册的Socket通道已经可以读取数据了,此时调用readDataFromSocket方法读取SocketChannel中的数据,读取数据的方法前面通道的文章中已经详细讲过了,就不讲了
(8)第57行,将键移除,这一行很重要也是容易忘记的一步操作。加入不remove,将会导致45行中出现空指针异常,原因不难理解,可以自己思考一下。
客户端
1 private static final String STR = "Hello World!"; 2 private static final String REMOTE_IP = "127.0.0.1"; 3 private static final int THREAD_COUNT = 5; 4 5 private static class NonBlockingSocketThread extends Thread { 6 public void run() { 7 try { 8 int port = 1234; 9 SocketChannel sc = SocketChannel.open(); 10 sc.configureBlocking(false); 11 sc.connect(new InetSocketAddress(REMOTE_IP, port)); 12 while (!sc.finishConnect()) { 13 System.out.println("同" + REMOTE_IP + "的连接正在建立,请稍等!"); 14 Thread.sleep(10); 15 } 16 System.out.println("连接已建立,待写入内容至指定ip+端口!时间为"+ System.currentTimeMillis()); 17 String writeStr = STR + this.getName(); 18 ByteBuffer bb = ByteBuffer.allocate(writeStr.length()); 19 bb.put(writeStr.getBytes()); 20 bb.flip(); // 写缓冲区的数据之前一定要先反转(flip) 21 sc.write(bb); 22 bb.clear(); 23 sc.close(); 24 } catch (IOException e) { 25 e.printStackTrace(); 26 } catch (InterruptedException e) { 27 e.printStackTrace(); 28 } 29 } 30 } 31 32 public static void main(String[] args) throws Exception { 33 NonBlockingSocketThread[] nbsts = new NonBlockingSocketThread[THREAD_COUNT]; 34 for (int i = 0; i < THREAD_COUNT; i++) 35 nbsts[i] = new NonBlockingSocketThread(); 36 for (int i = 0; i < THREAD_COUNT; i++) 37 nbsts[i].start(); 38 // 一定要join保证线程代码先于sc.close()运行,否则会有AsynchronousCloseException 39 for (int i = 0; i < THREAD_COUNT; i++) 40 nbsts[i].join(); 41 }
总结一下Selector的执行两个关键点:
1、注册一个ServerSocketChannel到selector中,这个通道的作用只是为了监听客户端是否有数据到来(这里注意一下有数据到来,意思是假如需要接收100个字节,如果到来了1个字节就算数据到来了),只要有数据到来,就把特定通道注册到selector中,并指定其事件为读事件。
2、ServerSocketChannel和SocketChannel(通道里面的是客户端的数据)共同存在在Selector中,只要有注册的事件到来,Selector取消阻塞状态,遍历SelectionKey集合,继续注册读取数据的通道,或者是从通道中读取数据。
选择过程的可扩展性
从上面的代码以及之前对于Selector的解读可以看到,Selector可以简化用单线程同时管理多个可选择通道的实现。使用一个线程来为多个通道提供服务,通过消除管理各个线程的额外开销,可能会降低复杂性并可能大幅提升性能。但只使用一个线程来服务所有可选择的通道是不是一个好主意呢?这要看情况。
对单核CPU的系统而言这可能是一个好主意,因为在任何情况下都只有一个线程能够运行。通过消除在线程之间进行上下文切换带来的额外开销,总吞吐量可以提高。但对于一个多核CPU的系统而言呢?在一个有n个CPU的系统上,当一个单一的线程线性轮流地处理每一个线程时,可能有(n-1)个CPU处于空闲状态。
一种可行的解决办法是使用多个选择器。但是请尽量不要这么做,在大量通道上执行就绪选择并不会有很大的开销,大多数工作是由底层操作系统完成的,管理多个选择器并随机地将通道分派给它们当中的一个并不是这个问题的合理的解决方案。
一种更好的解决方案是对所有的可选择通道使用同一个选择器,并将对就绪选择通道的服务委托给其他线程。开发者只使用一个线程监控通道的就绪状态,至于通道处于就绪状态之后又如何做,有两种可行的做法:
1、使用一个协调好的工作线程池来处理接收到的数据,当然线程池的大小是可以调整的
2、通道根据功能由不同的工作线程来处理,它们可能是日志线程、命令/控制线程、状态请求线程等