zoukankan      html  css  js  c++  java
  • Java NIO教程 Selector

    这次我们开讲非阻塞I/O中的Selector,它需要配合非阻塞的TCP和UDP来使用。首先我们先简单讲一下TCP和UDP的非阻塞通道。

    非阻塞I/O通道

    在上代码前我们先讲解一些最基本的知识。TCP和UDP共对应着三种通道,分别是:SocketChannel、ServerSocketChannel、DatagramChannel 。它们都可以通过channel.open()方法来初始化;同时对于SocketChannel来说,当一个新连接到达ServerSocketChannel时,也会被创建(在代码中会有说明)。而且它们使用结束后都需要被关闭。

    首先让我们来看看SocketChannel的基本操作

    //通过open()打开SocketChannel
    SocketChannel socketChannel = SocketChannel.open();
    //绑定主机端口
    socketChannel.connect(new InetSocketAddress("127.0.0.1", 18888));
    //设置成非阻塞模式
    socketChannel.configureBlocking(false);
    while(! socketChannel.finishConnect() ){
    	//做点其他事
    }
    // 利用SocketChannel进行数据操作
    

    下面再来说说,如何用SocketChannel进行数据操作。它的数据读写和其他通道的读写方式是完全一致的,只是要注意的是,在非阻塞模式下,read()和write()没有进行任何操作就返回了,所以要在循环中调用,并注意返回值。

    ByteBuffer buf = ByteBuffer.allocate(48);
    while(socketChannel.read(buf)!=-1) {
    	buf.flip();
    	while(buf.hasRemaining()) {
    		socketChannel.write(buf);
    	}		
    	buf.clear();
    }
    

    SocketChannel相当于传统I/O中的Socket,而ServerSocketChannel相当于ServerSocket;而且整体形式都是一致的,都是利用多路复用思想,在服务器端收到连接后,产生一个专门的Socket,与客户端进行数据传输。具体形式就是"serverSocketChannel.accept()"在收到连接后,会返回一个SocketChannel,具体形式见代码

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    //绑定主机端口
    serverSocketChannel.socket().bind(new InetSocketAddress(9999));
    serverSocketChannel.configureBlocking(false);
    while (true) {
    	//accept()在非阻塞模式中,若建立连接,则返回SocketChannel;否则返回null
    	SocketChannel socketChannel = serverSocketChannel.accept();
    	if (socketChannel != null) {
    		// 利用SocketChannel进行数据操作
    	}
    }
    

    而DatagramChannel则是跟DatagramPacket十分相似的,只不过数据包由当初的byte数组换成了现在的ByteBuffer

    DatagramChannel channel = DatagramChannel.open();
    //绑定主机端口
    channel.socket().bind(new InetSocketAddress(9999));
    channel.configureBlocking(false);
    ByteBuffer buf = ByteBuffer.allocate(48);
    /*
     * 1.因为UDP是无连接的网络协议,所以不能像TCP那样读取和写入,它是发送和接收数据包。
     * 2.receive()在非阻塞模式中,若没有收到数据包,则返回null;
     * 		若收到了,则将内容写入byteBuffer,将发送方的SocketAddress返回(其中包含IP和端口)
     * 3.如果Buffer容不下收到的数据,多出的数据将被丢弃
     */
    while(channel.receive(buf)==null){
    	//做点其他事
    }
    buf.flip();
    //指定接收方的SocketAddress
    channel.send(buf, new InetSocketAddress("127.0.0.1", 8888));
    

    DatagramChannel还有一个特殊的地方,就是它可以“连接”到网络中的特定地址的,十分类似于一个TCP连接。但由于UDP是无连接的,连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel ,让其只能从特定地址收发数据。想实现这种功能,编写方式和TCP十分类似,就不写了,去看文档吧,讲解的十分清楚。

    Selector

    现在开始进入我们今天的主题Selector

    其实前言中已经简单的讲解过什么是Selector以及为什么要使用Selector了。这里就不再重复了(我猜你已经忘了,回去再看一眼吧),咱们还是从最基础的创建开讲。

    Selector的创建是通过调用Selector.open()方法完成的(这部分都是用open()创建的)

    Selector注册

    说完创建,就得说说如何让Channel和Selector配合使用了?一句话:“将channel注册到selector上”这个动作是通过SelectionKey channel.register(Selector sel,int ops,Object att)方法完成的。

    这里要强调一点,就是调用register的channel必须是非阻塞的。这就将FileChannel排除在外(充话费送的就是不行)。

    现在讲解register()中每一个参数的含义。第一个参数,就是要将channel注册到哪个Selector。第二个参数,它是一个“interest集合”,意思是在通过Selector监听Channel时对什么事件感兴趣,可以监听四种不同类型的事件,分别是Connect、Accept、Read和Write;它们四个分别代表的含义是:

    • Connect(SelectionKey.OP_CONNECT):一个channel成功连接到另一个服务器——“连接就绪”
    • Accept(SelectionKey.OP_ACCEPT):一个ServerSocketchannel准备好接收新进入的连接——“接收就绪”
    • Read(SelectionKey.OP_READ):一个通道的可读数据已准备好——“读就绪”
    • Write(SelectionKey.OP_WRITE):一个通道的可写数据已准备好——“写就绪”

    P.S:圆括号中的是要填在第二个参数ops位置上的int常量。我们把这四种叫做“感兴趣事件”,后文会多次提到这个概念

    如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如下:
    int ops = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

    register()方法的第三个参数为附加对象,它可有可无,是一个Object对象,它可以作为每个通道的标识符,用以区别注册在同一个Selector上的其他通道;也可以附加其他对象。

    最后再来看看register()方法的返回值。返回值为SelectionKey对象,这是一个重要的对象,接下来我们就主要讲解SelectionKey。

    SelectionKey

    当Selector发现某些channel中的感兴趣事件发生了,就会返回相对应channel的SelectionKey对象。

    SelectionKey对象包含着许多信息。比如所属通道的channel对象,通过selectionKey.channel()方法就可以得到;还有通道的附加对象,通过selectionKey.attachment()方法就可以得到;还可以得到通道那个感兴趣时间发生了通过下面四种方法获得:

    • boolean selectionKey.isAcceptable()
    • boolean selectionKey.isConnectable()
    • boolean selectionKey.isReadable()
    • boolean selectionKey.isWritable()

    还可以获得更多信息,具体内容可以去看文档

    Selector.select()

    之前的创建、注册等准备都完成之后,就可以坐等准备好的数据到来了。这时候需要知道有多少个通道感兴趣事件已经准备好了。这时候有下面三个方法帮你完成这项任务,分别是

    • int selector.select()
    • int selector.select(long timeout)
    • int selector.selectNow()

    首先讲一下这三个方法准确的作用,它们都是返回有多少个通道已经变成就绪状态。它们的区别是:

    • select()是阻塞的,它会一直等到有通道准备就绪、
    • select(long timeout)也是阻塞的,它会一直等到有通道准备就绪或者已经超出给定的timeout时间并返回0。
    • selectNow()是非阻塞的,如果没有通道就绪就直接返回0。

    Selector.selectedKeys()

    通过select()方法知道有若干个通道准备就绪,就可以调用下面的方法来返回相应若干个通道的selectedKey了
    Set<SelectionKey> selectedKeys = selector.selectedKeys()
    获得selectedKeys后,你就可以进行相应的处理了。需要强调的是,每次处理完一个selectionKey之后需要将它在Set中删除,这样下次它准备好以后就可以再次添加到Set中来。

    现在关于Selector的知识基本上就讲解完了,让我们在一个服务器端、客户端收发字符串的例子中结束本次的讲解吧。

    客户端

    public class HansClient {
    	// 定义检测SocketChannel的Selector对象
    	private Selector selector = null;
    	// 客户端SocketChannel
    	private SocketChannel sc = null;
    
    	public void init() throws IOException {
    		selector = Selector.open();
    		InetSocketAddress isa = new InetSocketAddress("127.0.0.1", 30000);
    		// 调用open静态方法创建连接到指定主机的SocketChannel
    		sc = SocketChannel.open(isa);
    		// 设置该sc以非阻塞方式工作
    		sc.configureBlocking(false);
    		// 将SocketChannel对象注册到指定Selector
    		sc.register(selector, SelectionKey.OP_READ);
    		// 启动读取服务器端数据的线程
    		new ClientThread().start();
    		// 创建键盘输入流
    		Scanner scan = new Scanner(System.in);
    		while (scan.hasNextLine()) {
    			// 读取键盘输入
    			String line = scan.nextLine();
    			// 将键盘输入的内容输出到SocketChannel中
    			sc.write(StandardCharsets.UTF_8.encode(line));
    		}
    	}
    
    	// 定义读取服务器数据的线程
    	private class ClientThread extends Thread {
    		public void run() {
    			try {
    				while (selector.select() > 0) {
    					// 遍历每个有可用IO操作Channel对应的SelectionKey
    					for (SelectionKey sk : selector.selectedKeys()) {
    						// 删除正在处理的SelectionKey
    						selector.selectedKeys().remove(sk);
    						// 如果该SelectionKey对应的Channel中有可读的数据
    						if (sk.isReadable()) {
    							// 使用NIO读取Channel中的数据
    							SocketChannel sc = (SocketChannel) sk.channel();
    							ByteBuffer buff = ByteBuffer.allocate(1024);
    							String content = "";
    							while (sc.read(buff) > 0) {
    								sc.read(buff);
    								buff.flip();
    								content += StandardCharsets.UTF_8.decode(buff);
    							}
    							// 打印输出读取的内容
    							System.out.println("聊天信息:" + content);
    						}
    					}
    				}
    			} catch (IOException ex) {
    				ex.printStackTrace();
    			}
    		}
    	}
    
    	public static void main(String[] args) throws IOException {
    		new HansClient().init();
    	}
    }
    

    服务器端

    public class HansServer {
    	// 用于检测所有Channel状态的Selector
    	private Selector selector = null;
    
    	public void init() throws IOException {
    		selector = Selector.open();
    		// 通过open方法来打开一个未绑定的ServerSocketChannel实例
    		ServerSocketChannel server = ServerSocketChannel.open();
    		InetSocketAddress isa = new InetSocketAddress("127.0.0.1", 30000);
    		// 将该ServerSocketChannel绑定到指定IP地址
    		server.socket().bind(isa);
    		// 设置ServerSocket以非阻塞方式工作
    		server.configureBlocking(false);
    		// 将server注册到指定Selector对象
    		server.register(selector, SelectionKey.OP_ACCEPT);
    		while (selector.select() > 0) {
    			// 依次处理selector上的每个已选择的SelectionKey
    			for (SelectionKey sk : selector.selectedKeys()) {
    				// 从selector上的已选择Key集中删除正在处理的SelectionKey
    				selector.selectedKeys().remove(sk);
    				// 如果sk对应的通道包含客户端的连接请求
    				if (sk.isAcceptable()) {
    					// 调用accept方法接受连接,产生服务器端对应的SocketChannel
    					SocketChannel sc = server.accept();
    					// 设置采用非阻塞模式
    					sc.configureBlocking(false);
    					// 将该SocketChannel也注册到selector
    					sc.register(selector, SelectionKey.OP_READ);
    				}
    				// 如果sk对应的通道有数据需要读取
    				if (sk.isReadable()) {
    					// 获取该SelectionKey对应的Channel,该Channel中有可读的数据
    					SocketChannel sc = (SocketChannel) sk.channel();
    					// 定义准备执行读取数据的ByteBuffer
    					ByteBuffer buff = ByteBuffer.allocate(1024);
    					String content = "";
    					// 开始读取数据
    					try {
    						while (sc.read(buff) > 0) {
    							buff.flip();
    							content += StandardCharsets.UTF_8.decode(buff);
    						}
    						// 打印从该sk对应的Channel里读取到的数据
    						System.out.println("=====" + content);
    					}
    					// 如果捕捉到该sk对应的Channel出现了异常,即表明该Channel
    					// 对应的Client出现了问题,所以从Selector中取消sk的注册
    					catch (IOException ex) {
    						// 从Selector中删除指定的SelectionKey
    						sk.cancel();
    						if (sk.channel() != null) {
    							sk.channel().close();
    						}
    					}
    					// 如果content的长度大于0,即聊天信息不为空
    					if (content.length() > 0) {
    						// 遍历该selector里注册的所有SelectKey
    						for (SelectionKey key : selector.keys()) {
    							// 获取该key对应的Channel
    							Channel targetChannel = key.channel();
    							// 如果该channel是SocketChannel对象
    							if (targetChannel instanceof SocketChannel) {
    								// 将读到的内容写入该Channel中
    								SocketChannel dest = (SocketChannel) targetChannel;
    								dest.write(StandardCharsets.UTF_8.encode(content));
    							}
    						}
    					}
    				}
    			}
    		}
    	}
    
    	public static void main(String[] args) throws IOException {
    		new HansServer().init();
    	}
    }
    

    本次讲解就到这里了,本系列的讲解也就到这里了。如果你能看到这里我真的很开心。有任何事都可以与我讨论。

  • 相关阅读:
    EXTJS 4.2 资料 控件之checkboxgroup的用法(静态数据)
    EXTJS 4.2 资料 控件之Window窗体相关属性的用法
    EXTJS 4.2 资料 控件之textfield文本框加事件的用法
    Entity Framework 学习笔记(一)之数据模型 数据库
    EXTJS 4.2 资料 控件之checkboxgroup的用法(动态数据)
    EXTJS 4.2 资料 控件之Grid 列鼠标悬停提示
    Entity Framework 学习笔记(二)之数据模型 Model 使用过程
    EXTJS 4.2 资料 控件之radiogroup 的用法
    EXTJS API
    vue移动端弹框组件,vue-layer-mobile
  • 原文地址:https://www.cnblogs.com/ironPhoenix/p/4206939.html
Copyright © 2011-2022 走看看