简单的编写一个群聊系统,实现的功能如下:
1. 实现服务器端和客户端的群聊系统,实现非阻塞方式的数据简单通讯
2. 实现多人聊天
3. 服务器端可以实现感应到用户上线、离线,并实现消息转发
4. 客户端:通过channel可以无阻塞发送消息给其他用户,同时可以接受其他用户发送的消息(由服务器转发得到)
5. 在此基础上改造下实现单聊天,由服务器转发(也就是转发的时候转发给特定的用户)
1. 群聊实现
1. 服务器端代码
package nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; public class GroupChatServer { /** * Selector 选择器 */ private Selector selector; /** * ServerSocketChannel */ private ServerSocketChannel serverSocketChannel; /** * 端口 */ private static final int PORT = 6667; public GroupChatServer() { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); // 绑定端口 serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); // 设置非阻塞模式 serverSocketChannel.configureBlocking(false); // 注册连接请求事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } /** * 监听 */ public void listen() { System.out.println("监听线程: " + Thread.currentThread().getName()); try { while (true) { int count = selector.select(); // 有事件处理 if (count > 0) { // 处理监听到的事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); SelectionKey key = null; while (iterator.hasNext()) { key = iterator.next(); // 连接请求事件 if (key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); // 设置非阻塞 socketChannel.configureBlocking(false); // 注册读取事件 socketChannel.register(selector, SelectionKey.OP_READ); // 提示 System.out.println(socketChannel.getRemoteAddress() + " 上线 "); } // 数据读取事件 if (key.isReadable()) { readData(key); } // 删除当前的key,防止重复处理 iterator.remove(); } } } } catch (Throwable e) { e.printStackTrace(); } } private void readData(SelectionKey key) { SocketChannel channel = null; try { channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int read = channel.read(byteBuffer); if (read > 0) { //把缓存区的数据转成字符串 String msg = new String(byteBuffer.array()); System.out.println("form 客户端: " + msg); //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理 sendInfoToOtherClients(msg, channel); } } catch (Exception e) { e.printStackTrace(); try { System.out.println(channel.getRemoteAddress() + " 离线了.."); //取消注册 key.cancel(); //关闭通道 channel.close(); } catch (IOException e2) { e2.printStackTrace(); } } } private void sendInfoToOtherClients(String msg, SocketChannel channel) throws IOException { System.out.println("服务器转发消息中..."); for (SelectionKey key : selector.keys()) { // 通过 key 取出对应的 SocketChannel Channel targetChannel = key.channel(); // 排除掉自己 if (targetChannel instanceof SocketChannel && targetChannel != channel) { // 转型 SocketChannel dest = (SocketChannel) targetChannel; // 将msg 存储到buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); // 将buffer 的数据写入 通道 dest.write(buffer); System.out.println("转发给: " + dest.getRemoteAddress()); } } } public static void main(String[] args) { //创建服务器对象 GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); } }
2. 客户端代码:
package nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; public class GroupChatClient { /** * 服务器地址 */ private final String HOST = "127.0.0.1"; /** * 服务器端口 */ private final int PORT = 6667; private Selector selector; private SocketChannel socketChannel; private String username; public GroupChatClient() { try { selector = Selector.open(); // 连接服务器 socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT)); // 设置非阻塞 socketChannel.configureBlocking(false); // 将channel 注册到selector, 注册读取事件 socketChannel.register(selector, SelectionKey.OP_READ); // 得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " is ok!"); } catch (Exception e) { e.printStackTrace(); } } public void sendMsg(String msg) { msg = username + " 说:" + msg; try { socketChannel.write(ByteBuffer.wrap(msg.getBytes())); } catch (IOException e) { e.printStackTrace(); } } private void readMsg() { try { int count = selector.select(); // 有可用的通道 if (count > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); // 开启一个buffer ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); //把读到的缓冲区的数据转成字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); } } // 删除当前的selectionKey, 防止重复操作 iterator.remove(); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { GroupChatClient chatClient = new GroupChatClient(); // 启动一个线程, 每3秒,读取从服务器发送数据 new Thread() { public void run() { while (true) { chatClient.readMsg(); try { Thread.currentThread().sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); //发送数据给服务器端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); chatClient.sendMsg(s); } } }
3.测试: 启动一个服务端,两个客户端
服务器端控制台如下:
监听线程: main /127.0.0.1:51500 上线 /127.0.0.1:51687 上线
(1) 客户端51500 发送消息
127.0.0.1:51500 is ok!
hello 中国
hello China
(2) 查看服务器端日志
监听线程: main /127.0.0.1:51500 上线 /127.0.0.1:51687 上线 form 客户端: 127.0.0.1:51500 说:hello 中国 服务器转发消息中... 转发给: /127.0.0.1:51687 form 客户端: 127.0.0.1:51500 说:hello China 服务器转发消息中... 转发给: /127.0.0.1:51687
(3) 查看51687 日志
127.0.0.1:51687 is ok!
127.0.0.1:51500 说:hello 中国
127.0.0.1:51500 说:hello China
补充: selectedKeys和keys 的区别:
(1) selectedKeys() : 向selected第一次注册的,获取后要删掉的,没连接、发送数据一次就会注入一个selectkey。也可以理解为这个是发生数据的SelectionKey 集合,处理完就要删掉。
(2) 当前连接的所有keys。可以理解为这个是 所有存在的SelectionKey 集合。
对应到SelectorImpl 如下属性:
private Set<SelectionKey> publicKeys; private Set<SelectionKey> publicSelectedKeys;
2.单聊实现
单聊其实就是在上面进行改造,将消息经服务器转发到特定的channel。我们改造一下,将用户信息存到本地的txt文件中,客户端需要先选择用户再编写消息
1. 服务器端代码
package nio; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Collections; import java.util.Iterator; import java.util.List; public class GroupChatServer { /** * Selector 选择器 */ private Selector selector; /** * ServerSocketChannel */ private ServerSocketChannel serverSocketChannel; /** * 端口 */ private static final int PORT = 6667; public GroupChatServer() { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); // 绑定端口 serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); // 设置非阻塞模式 serverSocketChannel.configureBlocking(false); // 注册连接请求事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 添加* 代表发送给所有用户 addUser("*"); } catch (IOException e) { e.printStackTrace(); } } /** * 监听 */ public void listen() { System.out.println("监听线程: " + Thread.currentThread().getName()); try { while (true) { int count = selector.select(); // 有事件处理 if (count > 0) { // 处理监听到的事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); SelectionKey key = null; while (iterator.hasNext()) { key = iterator.next(); // 连接请求事件 if (key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); // 设置非阻塞 socketChannel.configureBlocking(false); // 注册读取事件 socketChannel.register(selector, SelectionKey.OP_READ); // 提示 System.out.println(socketChannel.getRemoteAddress() + " 上线 "); // 加到保持姓名的集合 addUser(socketChannel.getRemoteAddress().toString()); } // 数据读取事件 if (key.isReadable()) { readData(key); } // 删除当前的key,防止重复处理 iterator.remove(); } } } } catch (Throwable e) { e.printStackTrace(); } } private void readData(SelectionKey key) { SocketChannel channel = null; try { channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int read = channel.read(byteBuffer); if (read > 0) { //把缓存区的数据转成字符串 String msg = new String(byteBuffer.array()); System.out.println("form 客户端: " + msg); //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理 sendInfoToOtherClients(msg, channel); } } catch (Exception e) { e.printStackTrace(); try { System.out.println(channel.getRemoteAddress() + " 离线了.."); //取消注册 key.cancel(); //关闭通道 channel.close(); // 从姓名集合删除 removeUser(channel.getRemoteAddress().toString()); } catch (IOException e2) { e2.printStackTrace(); } } } private void sendInfoToOtherClients(String msg, SocketChannel channel) throws IOException { System.out.println("服务器转发消息中..."); for (SelectionKey key : selector.keys()) { // 通过 key 取出对应的 SocketChannel Channel targetChannel = key.channel(); // 排除掉自己 if (targetChannel instanceof SocketChannel && targetChannel != channel) { // 转型 SocketChannel dest = (SocketChannel) targetChannel; // 解析消息 sendUsername#@#msg String[] split = StringUtils.split(msg, "#@#"); String remoteAddress = dest.getRemoteAddress().toString(); String sendTarget = split[0]; if (!"*".equals(sendTarget) && !sendTarget.equals(remoteAddress)) { continue; } String sendMsg = split[1]; // 将msg 存储到buffer ByteBuffer buffer = ByteBuffer.wrap(sendMsg.getBytes()); // 将buffer 的数据写入 通道 dest.write(buffer); System.out.println("转发给: " + dest.getRemoteAddress()); } } } // 用户存到本地文件内部 public static void addUser(String username) { try { String encoding = "utf-8"; File file = new File("F:/user.txt"); if (!file.exists()) { file.createNewFile(); } List<String> list = IOUtils.readLines(new FileInputStream(file), encoding); if (!list.contains(username)) { list.add(username); } IOUtils.writeLines(list, null, new FileOutputStream(file)); } catch (Exception exception) { // ignore } } // 用户存到本地文件内部 public static void removeUser(String username) { try { String encoding = "utf-8"; File file = new File("F:/user.txt"); if (!file.exists()) { file.createNewFile(); } List<String> list = IOUtils.readLines(new FileInputStream(file), encoding); list.remove(username); IOUtils.writeLines(list, encoding, new FileOutputStream(file)); } catch (Exception exception) { // ignore } } public static List<String> listAllClient() { String encoding = "utf-8"; File file = new File("F:/user.txt"); try { return IOUtils.readLines(new FileInputStream(file), encoding); } catch (IOException e) { } return Collections.emptyList(); } public static void main(String[] args) { new GroupChatServer().listen(); } }
2. 客户端代码
package nio; import org.apache.commons.lang3.StringUtils; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.List; import java.util.Scanner; public class GroupChatClient { /** * 服务器地址 */ private final String HOST = "127.0.0.1"; /** * 服务器端口 */ private final int PORT = 6667; private Selector selector; private SocketChannel socketChannel; private String username; public GroupChatClient() { try { selector = Selector.open(); // 连接服务器 socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT)); // 设置非阻塞 socketChannel.configureBlocking(false); // 将channel 注册到selector, 注册读取事件 socketChannel.register(selector, SelectionKey.OP_READ); // 得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " is ok!"); } catch (Exception e) { e.printStackTrace(); } } public void sendMsg(String msg, String target) { msg = username + ":" + msg; msg = target + "#@#" + msg; try { socketChannel.write(ByteBuffer.wrap(msg.getBytes())); } catch (IOException e) { e.printStackTrace(); } } private void readMsg() { try { int count = selector.select(); // 有可用的通道 if (count > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); // 开启一个buffer ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); //把读到的缓冲区的数据转成字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); } } // 删除当前的selectionKey, 防止重复操作 iterator.remove(); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { GroupChatClient chatClient = new GroupChatClient(); // 启动一个线程, 每3秒,读取从服务器发送数据 new Thread() { public void run() { while (true) { chatClient.readMsg(); try { Thread.currentThread().sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); //发送数据给服务器端 Scanner scanner = new Scanner(System.in); while (true) { List<String> users = GroupChatServer.listAllClient(); System.out.println("清选择发送的用户:(下标即可)"); System.out.println(StringUtils.join(users, " ")); int userIndex = scanner.nextInt(); if (userIndex >= users.size()) { System.out.println("输入有误"); continue; } System.out.println("您选择发送给: " + users.get(userIndex) + ", 请输入消息: "); String s = scanner.next(); chatClient.sendMsg(s, users.get(userIndex)); } } }
至此基于NIO基本简单的实现了群聊和单聊。