1 /** 2 * Server,NIO本质是非阻塞。 Selector SelectionKey ServerSocketChannel SocketChannel 3 */ 4 public class MyServerSocketChannel { 5 public static void main(String[] args) { 6 Selector sel = null ; 7 // 8 try { 9 sel = Selector.open(); //开启挑选器 10 ServerSocketChannel ssc = ServerSocketChannel.open(); //开启服务器SocketChannel 11 InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 8888); 12 ssc.bind(addr); 13 ssc.configureBlocking(false); //配置非阻塞 14 ssc.register(sel, SelectionKey.OP_ACCEPT); //***** 在挑选器中注册ssc通道 ***** 15 } catch (Exception e) { 16 e.printStackTrace(); 17 } 18 19 SocketChannel sc0 = null; 20 ByteBuffer buf = null; 21 // CharBuffer cbuf = CharBuffer.allocate(1024); 22 while (true) { 23 // 开始挑选 24 try { 25 sel.select(); //开始挑选 26 } catch (Exception e) { 27 e.printStackTrace(); 28 } 29 // 处理发生的事件 30 Set<SelectionKey> keys = sel.selectedKeys(); //获得selectedKey集合 31 for (SelectionKey key : keys) { 32 try { 33 // 是否是accept事件 34 if (key.isAcceptable()) { 35 // 得到服务器通道 36 ServerSocketChannel ssc0 = (ServerSocketChannel) key.channel();//ssc0和ssc是同一个服务器通道,只是不同的引用 37 // 接受客户端连接,返回SocketChannel 38 sc0 = ssc0.accept(); 39 System.out.println(getClientInfo(sc0.socket()) + " : 上线了!" ); 40 // 注册非阻塞 41 sc0.configureBlocking(false); 42 // 在挑选器中注册新产生的SocketChannel 43 sc0.register(sel, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT); 44 // 45 } 46 // 可以读取数据了。 47 if (key.isReadable()) { 48 // 得到SocketChannel 49 sc0 = (SocketChannel) key.channel(); 50 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 51 52 // 53 buf = ByteBuffer.allocate(1024); 54 // 读取客户端发送的数据 55 while (sc0.read(buf) != 0) { 56 buf.flip(); 57 baos.write(buf.array(), 0, buf.limit()); 58 buf.clear(); //重置buf 59 } 60 // 构造缓冲区,写回去client. 61 String str = "hello : " + new String(baos.toByteArray()); 62 // 输出内容 63 System.out.println(getClientInfo(sc0.socket()) + str); 64 65 //回传信息 66 buf = ByteBuffer.allocate(str.getBytes().length); 67 buf.put(str.getBytes()); 68 buf.flip(); 69 sc0.write(buf); 70 buf.clear(); 71 } 72 } catch (Exception e) { 73 //从selector注销通道 74 key.cancel(); 75 // 76 if(key.channel() instanceof SocketChannel){ 77 Socket s = ((SocketChannel)key.channel()).socket(); 78 System.out.println(getClientInfo(s) + " : 下线了!"); 79 } 80 } 81 } 82 keys.clear(); 83 } 84 } 85 86 /** 87 * 获得客户端信息 88 */ 89 private static String getClientInfo(Socket socket) { 90 InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress(); 91 String ip = addr.getAddress().getHostAddress(); 92 String port = addr.getPort() + ""; 93 return "[" + ip + ":" + port + "] "; 94 } 95 }
2、客户端
1 /** 2 * 客户端 3 */ 4 public class MyClientSocketChannel { 5 public static void main(String[] args) throws Exception { 6 Selector sel = Selector.open(); //挑选器 7 SocketChannel sc = SocketChannel.open(); //开启通道 8 InetSocketAddress addr = new InetSocketAddress("localhost", 8888);//服务器地址 9 sc.connect(addr); //连接 10 11 sc.configureBlocking(false); //*****非阻塞模式 12 sc.register(sel, SelectionKey.OP_READ); //注册read事件 13 14 new Sender(sc).start(); //开启线程发送消息 15 16 // 17 ByteBuffer buf = ByteBuffer.allocate(1024); 18 //开始挑选 19 while(true){ 20 sel.select(); 21 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 22 while(sc.read(buf) != 0){ 23 buf.flip(); 24 baos.write(buf.array(),0,buf.limit()); 25 buf.clear(); 26 } 27 String str = new String(baos.toByteArray()); 28 System.out.println(str); 29 } 30 } 31 }
3、多线程分支读取数据的输入
1 /** 2 * 发送器线程 3 */ 4 public class Sender extends Thread{ 5 private SocketChannel sc ; 6 public Sender(SocketChannel sc){ 7 this.sc = sc ; 8 } 9 public void run() { 10 try { 11 //读取console内容,写入到sc 12 BufferedReader br = new BufferedReader( 13 new InputStreamReader(System.in)); 14 String line = null ; 15 ByteBuffer buf = null ; 16 while((line = br.readLine()) != null){ 17 buf = ByteBuffer.allocate(1024); 18 buf.put(line.getBytes()); 19 buf.flip(); 20 sc.write(buf); 21 buf.clear(); 22 } 23 } catch (Exception e) { 24 e.printStackTrace(); 25 } 26 } 27 }
2、
1 public class MyServer { 2 public static void main(String[] args) { 3 //创建挑选器 4 try { 5 Selector sel = Selector.open(); 6 //获取服务器通道 7 ServerSocketChannel ssc = ServerSocketChannel.open(); 8 //设置非阻塞模式 9 ssc.configureBlocking(false); 10 //绑定地址 11 ssc.socket().bind(new InetSocketAddress("localhost", 8888)); 12 13 //在挑选器中注册通道,指定感兴趣事件 14 ssc.register(sel, SelectionKey.OP_ACCEPT); 15 16 while(true){ 17 //不断地挑选 18 sel.select(); 19 //被挑选出的key的集合 20 Set<SelectionKey> keys = sel.selectedKeys(); 21 Iterator<SelectionKey> it = keys.iterator(); 22 23 while(it.hasNext()){ 24 SelectionKey key = it.next(); 25 if(key.isAcceptable()){ 26 SocketChannel sc = ssc.accept(); 27 System.out.println("有人连进来了"); 28 sc.configureBlocking(false); 29 sc.register(sel, SelectionKey.OP_READ|SelectionKey.OP_WRITE); 30 System.out.println("有人注册了"); 31 } 32 if(key.isReadable()){ 33 SocketChannel sc = (SocketChannel) key.channel(); 34 System.out.println(readStrFromChannel(sc)); 35 } 36 if(key.isWritable()){ 37 ByteBuffer buf = ByteBuffer.allocate(1024); 38 buf.put("hello world".getBytes()); 39 buf.flip(); 40 SocketChannel sc = (SocketChannel) key.channel(); 41 sc.write(buf); 42 key.cancel();//取消连接 43 } 44 it.remove(); 45 } 46 //keys.clear(); 47 } 48 } catch (Exception e) { 49 50 51 } 52 } 53 private static String readStrFromChannel(SocketChannel sc) { 54 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 55 ByteBuffer buffer = ByteBuffer.allocate(1024); 56 try { 57 while(sc.read(buffer)>0){ 58 buffer.flip(); 59 bos.write(buffer.array(), 0, buffer.limit()); 60 } 61 return new String(bos.toByteArray()); 62 } catch (IOException e) { 63 e.printStackTrace(); 64 } 65 return null; 66 } 67 }
1 public class MyClient { 2 3 public static void main(String[] args) { 4 try { 5 Selector sel = Selector.open(); 6 SocketChannel sc = SocketChannel.open(); 7 sc.configureBlocking(false); 8 SelectionKey key = sc.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE); 9 sc.connect(new InetSocketAddress("localhost", 8888)); 10 int i=0; 11 while (true) { 12 sel.select(); 13 // 挑选器中最多只有一个对象 14 if (sel.selectedKeys().isEmpty()) { 15 continue; 16 } 17 if (key.isConnectable()) { 18 sc.finishConnect(); 19 } 20 if (key.isReadable()) { 21 String src=(readStrFromChannel(sc)); 22 if(src !=null){ 23 System.out.println(src); 24 } 25 } 26 if(key.isWritable()){ 27 ByteBuffer buffer = ByteBuffer.allocate(11); 28 buffer.put("hello wokd".getBytes()); 29 buffer.flip();//重新定位指针,用于写 30 sc.write(buffer); 31 } 32 i++; 33 } 34 35 } catch (Exception e) { 36 e.printStackTrace(); 37 } 38 } 39 40 private static String readStrFromChannel(SocketChannel sc) { 41 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 42 ByteBuffer buffer = ByteBuffer.allocate(20); 43 try { 44 if(sc.read(buffer)>0){ 45 buffer.flip(); 46 bos.write(buffer.array(), 0, buffer.limit()); 47 //buffer.clear(); 48 } 49 return new String(bos.toByteArray()); 50 } catch (IOException e) { 51 e.printStackTrace(); 52 } 53 return null; 54 } 55 56 }