1. 服务端
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; public class GroupChatServer { private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 8888; // 构造 器 public GroupChatServer() { try { selector = Selector.open(); listenChannel = ServerSocketChannel.open(); // 绑定端口 listenChannel.socket().bind(new InetSocketAddress(PORT)); // 设置非阻塞模式 listenChannel.configureBlocking(false); // 将listenChannel注册到selector listenChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (Exception e) { e.printStackTrace(); } } /** * 监听 */ public void listen() { try { while (true) { //select(2000) 只阻塞2秒 // int count = selector.select(2000); // select()方法一直阻塞 int count = selector.select(); if (count > 0) { // 遍历得到所有的SelectionKey集合 Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator(); while (keysIterator.hasNext()) { // 处理每个selectionKey SelectionKey selectionKey = keysIterator.next(); // 监听到连接事件 if (selectionKey.isAcceptable()) { SocketChannel socketChannel = listenChannel.accept(); socketChannel.configureBlocking(false); //注册 socketChannel.register(selector, SelectionKey.OP_READ); // 提示上线 System.out.println(socketChannel.getRemoteAddress() + "上线"); } // 通道可读事件 if (selectionKey.isReadable()) { // todo 处理读 readMessage(selectionKey); } // 删除key,防止重复处理 keysIterator.remove(); } } else { // System.out.println("等待中。。。。"); } } } catch (Exception e) { e.printStackTrace(); } } /** * 读取客户端消息 * * @param key */ public void readMessage(SelectionKey key) { SocketChannel socketChannel = null; try { socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int read = socketChannel.read(byteBuffer); // 根据read值做处理,如果read > 0 说明真的读取到数据了 if (read > 0) { String message = new String(byteBuffer.array()); System.out.println("Form Client : " + message); // todo 向其它客户端 转发消息 sendInfoToOtherClient(message, socketChannel); } } catch (IOException e) { try { System.out.println(socketChannel.getRemoteAddress() + "离线了。。。"); // 离线之后取消注册 key.cancel(); // 关闭channel socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } } } /** * 转发消息到其它客户端 * * @param message 转发的消息 * @param self : 排除的channel */ public void sendInfoToOtherClient(String message, SocketChannel self) throws IOException { System.out.println("服务器转发消息中。。。"); // 遍历所有注册到selector上的socketChannel,并排除自己 Set<SelectionKey> keys = selector.keys(); for (SelectionKey key : keys) { // 通过key取出对应的SocketChannel Channel targetChannel = key.channel(); // 排除自己 if (targetChannel instanceof SocketChannel && targetChannel != self) { SocketChannel target = (SocketChannel) targetChannel; // 将message 存储到buffer ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); // 将buffer中的数据写入通道 target.write(buffer); } } } public static void main(String[] args) { //创建对象 GroupChatServer chatServer = new GroupChatServer(); chatServer.listen(); } }
2. 客户端
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class GroupChatClient { // 服务器ip private final String HOST = "127.0.0.1"; // 服务器端口 private final int PORT = 8888; private Selector selector; private SocketChannel socketChannel; private String username; public GroupChatClient() { try { selector = Selector.open(); socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " ok!"); } catch (Exception e) { e.printStackTrace(); } } /** * 向服务端发送消息 * * @param info 消息内容 */ public void sendInfo(String info) { info = username + "说:" + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); } catch (IOException e) { e.printStackTrace(); } } /** * 从服务端 读取消息 */ public void readInfo() { try { int readChannels = selector.select(2000); if (readChannels > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> keyInterator = keys.iterator(); while (keyInterator.hasNext()) { SelectionKey selectionKey = keyInterator.next(); if (selectionKey.isReadable()) { // 得到读相关通道 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); //把buffer中的数据转成字符 串 String messge = new String(byteBuffer.array()); System.out.println( messge.trim()); } //删除当前selectionKey keyInterator.remove(); } } else { // System.out.println("没有可用通道。。。。") } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { // 启动客户端 GroupChatClient chatClient = new GroupChatClient(); // 启动一个线程每隔2秒读取从服务端发送过来的数据 new Thread() { @Override public void run() { while (true) { chatClient.readInfo(); try { Thread.currentThread().sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); // 发送数据到服务端 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String info = scanner.next(); chatClient.sendInfo(info); } } }