zoukankan      html  css  js  c++  java
  • NIO

    同步、异步、阻塞、非阻塞

    这四个概念,一般是“网络IO”领域的概念。

    • 同步:等着对方做完,自己接着做。

    • 异步:不等对方做完,自己做自己的事。(实际上是把操作委托给了平台,比如OS)

    • 阻塞:对方在数据IO时,我方不能做其它事情。

    • 非阻塞:对方在数据IO时,我方去做其它事情,直到对方IO操作完成(轮询),再去进行处理。


    先举例说明一下各种四者组合的情况:

    阻塞 非阻塞
    同步 烧开水,站着等 烧开水,去忙别的,叫秘书隔一会儿过去看一下,烧好了叫你去亲自处理。
    异步 X 无意义 X 烧开水,剩下的委托给秘书,自己去忙别的。

    接下来,从计算机的角度解释。

    同步阻塞:一个请求过来,应用程序开了一个线程,自己处理IO。

    若要提高效率,可以开多线程+线程池进行处理,这种也被称为“伪异步”。

    同步非阻塞:不等对方IO操作完毕,去做其它处理;轮询对方IO是否完毕,若完毕,则我方进行处理;

    异步非阻塞: 向操作系统注册IO监听,由操作系统进行IO处理。


    • 同步阻塞,称为BIO,Blocking IO。基础的Socket示例,就是BIO的实现。

    • 同步非阻塞,称为NIO,Non-blocking IO,也有地方称为New IO。采用轮询的方式,不停询问数据是否准备好,如果准备好了就处理。

    • 异步非阻塞,称为AIO,Asynchronous IO,在 NIO 的基础上,引入异步通道的概念,向操作系统注册 IO 监听,操作系统完成 IO 操作了之后,主动通知,触发响应的函数。

    AIO也叫NIO.2或NIO2。

    NIO在JDK1.4即出现,NIO.2也属于nio的包,在JDK1.7引入。

    NIO示例

    NIO有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)。

    • Channel是数据流通的管道,相当于BIO中的I/O流,但Channel是双向的,输入输出通用。

    • Selector(选择区)用于监听多个Channel的事件,比如:连接打开,数据到达等。一个线程可以监听多个Channel。

    package nio;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Set;
    
    public class NioServer {
    	public static void main(String[] args) throws Exception {
    		ServerSocketChannel ssChannel = ServerSocketChannel.open();
    		// backlog:连接队列
    		ssChannel.bind(new InetSocketAddress(8888), 1024);
    		// 设为非阻塞模式,是成为NIO
    		ssChannel.configureBlocking(false);
    		Selector selector = Selector.open();
    		// OP_CONNECT:连接就绪事件,表示C与S连接成功
    		// OP_ACCEPT:接收连接进行事件,表示S与C可进行IO
    		// OP_READ:读就绪事件,表示通道中已有可读的数据,可以进行读操作了
    		// OP_WRITE:写就绪事件,表示通道可以用于写操作
    		ssChannel.register(selector, SelectionKey.OP_ACCEPT);
    
    		// 循环从Selector中获取准备就绪的Channel
    		while (true) {
    			// 这一步是设置阻塞,参数是timeout
    			// -- 对于整体而言,非阻塞,但selector可以设置阻塞:当有Channel准备就绪时,或超过1000秒后返回。
    			selector.select(1000);
    			// 获取“SelectionKey”,可以理解为“获取已就绪的Channel”。
    			// -- 每个Channel向Selector注册时,都有对应的selectionKey
    			Set<SelectionKey> selectionKeySet = selector.selectedKeys();
    			// 遍历SelectionKey,实际上是遍历每个Channel
    			for (SelectionKey selectionKey : selectionKeySet) {
    				if (!selectionKey.isValid()) {
    					// 若Channel无效,则跳过。(如Channel已关闭)
    					continue;
    				}
    				SocketChannel sChannel = null;
    				if (selectionKey.isAcceptable()) {
    					// 判断Channel具体的就绪事件
    					// 获取IO用的Channel
    					sChannel = ssChannel.accept();
    					System.out.println("accept Channel:" + sChannel.hashCode());
    					// 也需要配置成非阻塞模式
    					sChannel.configureBlocking(false);
    					// 把客户端的Channel交给Selector监控,之后如果有数据可以读取时,会被select出来
    					sChannel.register(selector, SelectionKey.OP_READ);
    
    				}
    				if (selectionKey.isReadable()) {
    					// 重新获取Channel,因为是非阻塞,程序不是顺序执行,可能Channel还没有accept,这段代码已经被加载
    					// 其实这个Channel和前面的是同一个对象
    					sChannel = (SocketChannel) selectionKey.channel();
    					// 读取
    					_read(sChannel);
    
    					// 回复
    					_write(sChannel, "Hello, This is Server");
    				}
    			}
    			// 处理完成后,把Set清空,如否则下次还会返回这些Key,导致重复处理
    			selectionKeySet.clear();
    		}
    	}
    
    	public static void _read(SocketChannel sChannel) throws Exception {
    		// 读取数据到ByteBuffer中
    		ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    		int len = sChannel.read(byteBuffer);
    		if (len <= 0) {
    			System.out.println("len = " + len);
    			// 数据传输完成:-1(0表示没有内容)
    			sChannel.close();
    			// selectionKey.cancel();
    		}
    		byteBuffer.flip();// 反转,就是将Buffer的指针归零。
    		String strRead = new String(byteBuffer.array(), 0, len);
    		System.out.println("RECV:" + strRead);
    	}
    
    	public static void _write(SocketChannel channel, String respMsg) throws Exception {
    		byte[] bytes = respMsg.getBytes();
    		ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
    		byteBuffer.put(bytes);
    		byteBuffer.flip();
    		channel.write(byteBuffer);
    	}
    }
    

    客户端,使用多线程:

    package nio;
    import java.net.InetSocketAddress;
    import java.nio.channels.*;
    import java.util.Set;
    public class NioClient {
    	public static void main(String[] args) throws Exception {
    		for (int i = 0; i < 100; i++) {
    			new NioClientThread().start();
    		}
    	}
    }
    
    class NioClientThread extends Thread {
    
    	@Override
    	public void run() {
    		try {
    			SocketChannel channel = SocketChannel.open();
    			channel.configureBlocking(false);
    			Selector selector = Selector.open();
    			channel.register(selector, SelectionKey.OP_CONNECT);
    			channel.connect(new InetSocketAddress("localhost", 8888));
    			while (true) {
    				selector.select(1000);
    				Set<SelectionKey> selectionKeySet = selector.selectedKeys();
    
    				for (SelectionKey selectionKey : selectionKeySet) {
    					if (!selectionKey.isValid()) {
    						continue;
    					}
    					if (selectionKey.isConnectable()) {
                            // 连接建立成功,直接发送请求数据
    						if (channel.finishConnect()) {
    							channel.register(selector, SelectionKey.OP_READ);
    							Thread.sleep(1000);
    							NioServer._write(channel, "Hello!Server");
    						}
    					}
    					if (selectionKey.isReadable()) {
    						NioServer._read(channel);
    					}
    				}
    				// 清除所有的Key
    				selectionKeySet.removeAll(selectionKeySet);
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    AIO示例

    AIO提供了两种方式控制异步操作(connect、accept、read、write等)。

    • 第一种方式是返回java.util.concurrent.Future对象,检查Future的状态可以得到操作是否完成还是失败,还是进行中, future.get阻塞当前进程。

    Future表示一个可能还没有完成的异步任务的结果。

    • 第二种方式为操作提供一个回调参数java.nio.channels.CompletionHandler。

    该类包含completed、failed两个方法。

    以下示例仅演示服务端给客户端发送一段消息。同时演示Future和CompletionHandler两种方式。

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.concurrent.*;
    
    public class AioServer {
    	public static void main(String[] args) throws Exception {
    		// AsynchronousChannelGroup:用于资源共享的一组异步通道
    		AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
    		// 此处的参数group即使不加,也需要group的操作,否则服务器端执行完就退出
    		AsynchronousServerSocketChannel aServerChannel = AsynchronousServerSocketChannel.open(group);
    		aServerChannel.bind(new InetSocketAddress(8888));
    		aServerChannel.accept(null, new AioServerCompletionHandler(aServerChannel));
    		group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    	}
    }
    
    // 【回调类】CompletionHandler
    class AioServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
    	private AsynchronousServerSocketChannel aServerChannel;
    
    	public AioServerCompletionHandler(AsynchronousServerSocketChannel aServerChannel) {
    		this.aServerChannel = aServerChannel;
    	}
    
    	@Override
    	// result:I/O操作的结果
    	// attachment:启动时附加到I/O操作的对象
    	public void completed(AsynchronousSocketChannel result, Void attachment) {
    		aServerChannel.accept(null, this);
    		try {
    			byte[] bytes = "This is Aio Server".getBytes();
    			ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
    			byteBuffer.put(bytes);
    			byteBuffer.flip();
    			// 【Future类】表示一个可能还没有完成的异步任务的结果
    			Future<Integer> f = result.write(byteBuffer);
    			f.get();
    
    			result.close();
    		} catch (IOException | InterruptedException | ExecutionException e) {
    			e.printStackTrace();
    		}
    	}
    
    	@Override
    	public void failed(Throwable exc, Void attachment) {
    		exc.printStackTrace();
    	}
    }
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.util.concurrent.Future;
    
    public class AioClient {
    	public static void main(String[] args) throws Exception {
    		AsynchronousSocketChannel aChannel = AsynchronousSocketChannel.open();
    		Future<Void> future = aChannel.connect(new InetSocketAddress("localhost", 8888));
    		future.get();
    
    		ByteBuffer buffer = ByteBuffer.allocate(100);
    		Future<Integer> read = aChannel.read(buffer);
    		read.get();
    		System.out.println(new String(buffer.array()));
    	}
    }
    
  • 相关阅读:
    单向链表
    野指针和空指针
    指针数组和数组指针,指针函数和函数指针
    python开发笔记-变长字典Series的使用
    python开发笔记-DataFrame的使用
    python开发笔记-如何做数据准备
    python开发笔记-通过xml快捷获取数据
    destoon系统结构大全
    Python应用之-修改通讯录
    Python开发应用-操作excel
  • 原文地址:https://www.cnblogs.com/tigerlion/p/12950723.html
Copyright © 2011-2022 走看看