server:
1 /** 2 * 选择器服务端 3 * Created by ascend on 2017/6/9 9:30. 4 */ 5 public class SelectorServer { 6 // public final static String REMOTE_IP = "192.168.0.44"; 7 public final static String REMOTE_IP = "127.0.0.1"; 8 public final static int PORT = 17531; 9 private static ByteBuffer bb = ByteBuffer.allocate(1024); 10 private static ServerSocketChannel ssc; 11 private static boolean closed = false; 12 13 public static void main(String[] args) throws IOException { 14 //先确定端口号 15 int port = PORT; 16 if (args != null && args.length > 0) { 17 port = Integer.parseInt(args[0]); 18 } 19 //打开一个ServerSocketChannel 20 ssc = ServerSocketChannel.open(); 21 //获取ServerSocketChannel绑定的Socket 22 ServerSocket ss = ssc.socket(); 23 //设置ServerSocket监听的端口 24 ss.bind(new InetSocketAddress(port)); 25 //设置ServerSocketChannel为非阻塞模式 26 ssc.configureBlocking(false); 27 //打开一个选择器 28 Selector selector = Selector.open(); 29 //将ServerSocketChannel注册到选择器上去并监听accept事件 30 SelectionKey selectionKey = ssc.register(selector, SelectionKey.OP_ACCEPT); 31 32 33 while (!closed) { 34 //这里会发生阻塞,等待就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。 35 int n = selector.select(); 36 //没有就绪的通道则什么也不做 37 if (n == 0) { 38 continue; 39 } 40 //获取SelectionKeys上已经就绪的集合 41 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); 42 43 //遍历每一个Key 44 while (iterator.hasNext()) { 45 SelectionKey sk = iterator.next(); 46 //通道上是否有可接受的连接 47 if (sk.isAcceptable()) { 48 ServerSocketChannel sscTmp = (ServerSocketChannel) sk.channel(); 49 SocketChannel sc = sscTmp.accept(); // accept()方法会一直阻塞到有新连接到达。 50 sc.configureBlocking(false); 51 sc.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); 52 } else if (sk.isReadable()) { //通道上是否有数据可读 53 try { 54 readDataFromSocket(sk); 55 } catch (IOException e) { 56 sk.cancel(); 57 continue; 58 } 59 } 60 if (sk.isWritable()) { //测试写入数据,若写入失败在会自动取消注册该键 61 try { 62 writeDataToSocket(sk); 63 } catch (IOException e) { 64 sk.cancel(); 65 continue; 66 } 67 } 68 //必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。 69 iterator.remove(); 70 }//. end of while 71 72 } 73 74 } 75 76 77 78 /** 79 * 发送测试数据包,若失败则认为该socket失效 80 * 81 * @param sk SelectionKey 82 * @throws IOException IOException 83 */ 84 private static void writeDataToSocket(SelectionKey sk) throws IOException { 85 SocketChannel sc = (SocketChannel) sk.channel(); 86 bb.clear(); 87 String str = "server data"; 88 bb.put(str.getBytes()); 89 while (bb.hasRemaining()) { 90 sc.write(bb); 91 } 92 } 93 94 /** 95 * 从通道中读取数据 96 * 97 * @param sk SelectionKey 98 * @throws IOException IOException 99 */ 100 private static void readDataFromSocket(SelectionKey sk) throws IOException { 101 SocketChannel sc = (SocketChannel) sk.channel(); 102 bb.clear(); 103 List<Byte> list = new ArrayList<>(); 104 while (sc.read(bb) > 0) { 105 bb.flip(); 106 while (bb.hasRemaining()) { 107 list.add(bb.get()); 108 } 109 bb.clear(); 110 } 111 byte[] bytes = new byte[list.size()]; 112 for (int i = 0; i < bytes.length; i++) { 113 bytes[i] = list.get(i); 114 } 115 String s = (new String(bytes)).trim(); 116 if (!s.isEmpty()) { 117 if ("exit".equals(s)){ 118 ssc.close(); 119 closed = true; 120 } 121 System.out.println("服务器收到:" + s); 122 } 123 } 124 125 }
client:
1 /** 2 * 3 * Created by ascend on 2017/6/13 10:36. 4 */ 5 public class Client { 6 7 @org.junit.Test 8 public void test(){ 9 Socket socket = new Socket(); 10 try { 11 socket.connect(new InetSocketAddress(SelectorServer.REMOTE_IP,SelectorServer.PORT)); 12 DataOutputStream out = new DataOutputStream(socket.getOutputStream()); 13 out.write("exit".getBytes()); 14 out.flush(); 15 out.close(); 16 socket.close(); 17 } catch (IOException e) { 18 e.printStackTrace(); 19 } 20 } 21 22 public static void main(String[] args) { 23 new Thread(new ClientThread()).start(); 24 } 25 26 public void checkStatus(String input){ 27 if ("exit".equals(input.trim())) { 28 System.out.println("系统即将退出,bye~~"); 29 System.exit(0); 30 } 31 } 32 33 34 } 35 36 class ClientThread implements Runnable { 37 private SocketChannel sc; 38 private boolean isConnected = false; 39 Client client = new Client(); 40 41 public ClientThread(){ 42 try { 43 sc = SocketChannel.open(); 44 sc.configureBlocking(false); 45 sc.connect(new InetSocketAddress(SelectorServer.REMOTE_IP,SelectorServer.PORT)); 46 while (!sc.finishConnect()) { 47 System.out.println("同" + SelectorServer.REMOTE_IP + "的连接正在建立,请稍等!"); 48 Thread.sleep(10); 49 } 50 System.out.println("连接已建立,待写入内容至指定ip+端口!时间为" + System.currentTimeMillis()); 51 } catch (IOException | InterruptedException e) { 52 e.printStackTrace(); 53 } 54 } 55 56 @Override 57 public void run() { 58 try { 59 while (true){ 60 Scanner scanner = new Scanner(System.in); 61 System.out.print("请输入要发送的内容:"); 62 String writeStr = scanner.nextLine(); 63 client.checkStatus(writeStr); 64 ByteBuffer bb = ByteBuffer.allocate(writeStr.length()); 65 bb.put(writeStr.getBytes()); 66 bb.flip(); // 写缓冲区的数据之前一定要先反转(flip) 67 while (bb.hasRemaining()){ 68 sc.write(bb); 69 } 70 bb.clear(); 71 } 72 } catch (IOException e) { 73 e.printStackTrace(); 74 if (Objects.nonNull(sc)) { 75 try { 76 sc.close(); 77 } catch (IOException e1) { 78 e1.printStackTrace(); 79 } 80 } 81 }finally { 82 if (Objects.nonNull(sc)) { 83 try { 84 sc.close(); 85 } catch (IOException e1) { 86 e1.printStackTrace(); 87 } 88 } 89 } 90 } 91 }