代码转自 https://www.jianshu.com/p/a9d030fec081
服务端:
package nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.*; import java.util.concurrent.ConcurrentHashMap; public class Server { private Selector selector; private ByteBuffer readBuffer = ByteBuffer.allocate(1024);//调整缓存的大小可以看到打印输出的变化 private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);//调整缓存的大小可以看到打印输出的变化 String str; public void start() throws IOException { // 打开服务器套接字通道 ServerSocketChannel ssc = ServerSocketChannel.open(); // 服务器配置为非阻塞 ssc.configureBlocking(false); // 进行服务的绑定 ssc.bind(new InetSocketAddress("localhost", 8001)); // 通过open()方法找到Selector selector = Selector.open(); // 注册到selector,等待连接 ssc.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.currentThread().isInterrupted()) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = keys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { accept(key); } else if (key.isReadable()) { read(key); } else if (key.isWritable()) { write(key); } keyIterator.remove(); //该事件已经处理,可以丢弃 } } } private void write(SelectionKey key) throws IOException, ClosedChannelException { SocketChannel channel = (SocketChannel) key.channel(); System.out.println("write:"+str); sendBuffer.clear(); sendBuffer.put(str.getBytes()); sendBuffer.flip(); channel.write(sendBuffer); channel.register(selector, SelectionKey.OP_READ); } private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); // Clear out our read buffer so it's ready for new data this.readBuffer.clear(); // readBuffer.flip(); // Attempt to read off the channel int numRead; try { numRead = socketChannel.read(this.readBuffer); } catch (IOException e) { // The remote forcibly closed the connection, cancel // the selection key and close the channel. key.cancel(); socketChannel.close(); return; } str = new String(readBuffer.array(), 0, numRead); System.out.println(str); socketChannel.register(selector, SelectionKey.OP_WRITE); } private void accept(SelectionKey key) throws IOException { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel clientChannel = ssc.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); System.out.println("a new client connected "+clientChannel.getRemoteAddress()); } public static void main(String[] args) throws IOException { System.out.println("server started..."); new Server().start(); } } 作者:御风逍遥 链接:https://www.jianshu.com/p/a9d030fec081 來源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
客户端:
package nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class Client { ByteBuffer writeBuffer = ByteBuffer.allocate(1024); ByteBuffer readBuffer = ByteBuffer.allocate(1024); public void start() throws IOException { // 打开socket通道 SocketChannel sc = SocketChannel.open(); //设置为非阻塞 sc.configureBlocking(false); //连接服务器地址和端口 sc.connect(new InetSocketAddress("localhost", 8001)); //打开选择器 Selector selector = Selector.open(); //注册连接服务器socket的动作 sc.register(selector, SelectionKey.OP_CONNECT); Scanner scanner = new Scanner(System.in); while (true) { //选择一组键,其相应的通道已为 I/O 操作准备就绪。 //此方法执行处于阻塞模式的选择操作。 selector.select(); //返回此选择器的已选择键集。 Set<SelectionKey> keys = selector.selectedKeys(); System.out.println("keys=" + keys.size()); Iterator<SelectionKey> keyIterator = keys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); // 判断此通道上是否正在进行连接操作。 if (key.isConnectable()) { sc.finishConnect(); sc.register(selector, SelectionKey.OP_WRITE); System.out.println("server connected..."); break; } else if (key.isWritable()) { //写数据 System.out.print("please input message:"); String message = scanner.nextLine(); //ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes()); writeBuffer.clear(); writeBuffer.put(message.getBytes()); //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位 writeBuffer.flip(); sc.write(writeBuffer); //注册写操作,每个chanel只能注册一个操作,最后注册的一个生效 //如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来 //int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; //使用interest集合 sc.register(selector, SelectionKey.OP_READ); sc.register(selector, SelectionKey.OP_WRITE); sc.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()){//读取数据 System.out.print("receive message:"); SocketChannel client = (SocketChannel) key.channel(); //将缓冲区清空以备下次读取 readBuffer.clear(); int num = client.read(readBuffer); System.out.println(new String(readBuffer.array(),0, num)); //注册读操作,下一次读取 sc.register(selector, SelectionKey.OP_WRITE); } } } } public static void main(String[] args) throws IOException { new Client().start(); } } 作者:御风逍遥 链接:https://www.jianshu.com/p/a9d030fec081 來源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。