1.BIO编程模型
BIO模型:对每一个建立连接的客户端,服务端都要创建一个线程单独处理和这个客户的通信,典型的一请求一应答。
1.1 优化——伪异步IO编程模型
- 思路:使用线程池来管理服务器端所有可用线程,即通过一个线程池来处理多个客户端的请求接入。
- 好处:通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致服务器线程资源耗尽。
2. 多人聊天室功能概述及设计
- 功能概述:支持多人同时在线,每个用户的发言都被转发给其他在线用户
- 服务器端设计:
- 使用主线程做Acceptor,等待客户端连接,并为每一个客户分配一个Handle线程;
- 此外,服务器端需要使用容器存储目前在线的所有客户列表,以便转发消息给其他用户。
- Handle线程任务是,维护客户列表(添加 or 移除);接收用户消息,并转发给其他在线用户。
- 客户端设计:
- 要有两个线程,其中主线程与服务器建立连接,并接收来自服务器的消息;
- 另一个线程则用来处理用户的输入,并将消息发送到服务器。因为等待用户键盘输入时,是阻塞的,此时不能及时显示其他客户的消息。
3. 测试结果
4. 完整代码
4.1服务器
public class ChatServer { private int DEFAULT_PORT = 8888; private final String QUIT = "quit"; private ExecutorService executorService; // 线程池 private ServerSocket serverSocket; private Map<Integer, Writer> connectedClients; // 键:端口号;值:用于写入客户端的Writer public ChatServer() { executorService = Executors.newFixedThreadPool(2); // 创建固定数目的线程池 connectedClients = new HashMap<>(); } // 添加客户方法 public synchronized void addClient(Socket socket) throws IOException { if (socket != null) { int port = socket.getPort(); BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(socket.getOutputStream()) ); connectedClients.put(port, writer); // 要处理线程不安全的情况 System.out.println("客户端[" + port + "]已连接到服务器"); } } // 移除客户方法 public synchronized void removeClient(Socket socket) throws IOException { if (socket != null) { int port = socket.getPort(); if (connectedClients.containsKey(port)) { // 关闭外层流writer后,内层的socket也会自动关闭 connectedClients.get(port).close(); } connectedClients.remove(port); // 要处理线程不安全的情况 System.out.println("客户端[" + port + "]已断开连接"); } } // 转发消息方法 public synchronized void forwardMessage(Socket socket, String fwdMsg) throws IOException { for (Integer id : connectedClients.keySet()) { if (!id.equals(socket.getPort())) { Writer writer = connectedClients.get(id); writer.write(fwdMsg); writer.flush(); } } } public boolean readyToQuit(String msg) { return QUIT.equals(msg); } //关闭资源 public synchronized void close() { if (serverSocket != null) { try { serverSocket.close(); System.out.println("关闭serverSocket"); } catch (IOException e) { e.printStackTrace(); } } } // 服务器主线程任务:等待客户端连接,为其分配处理线程 public void start() { try { // 绑定监听端口 serverSocket = new ServerSocket(DEFAULT_PORT); System.out.println("启动服务器,监听端口:" + DEFAULT_PORT + "..."); while (true) { // 等待客户端连接 Socket socket = serverSocket.accept(); // bio模型为每个客户都创建一个ChatHandler线程 // new Thread(new ChatHandler(this,socket)).start(); // 优化:伪异步IO编程 executorService.execute(new ChatHandler(this,socket)); } } catch (IOException e) { e.printStackTrace(); } finally { close(); } } public static void main(String[] args) { ChatServer server = new ChatServer(); server.start(); } }
public class ChatHandler implements Runnable{ private ChatServer server; private Socket socket; public ChatHandler(ChatServer server, Socket socket) { this.server = server; this.socket = socket; } @Override public void run() { try { // 存储新上线用户 server.addClient(socket); // 读取用户发送的消息 BufferedReader reader = new BufferedReader( new InputStreamReader(socket.getInputStream()) ); String msg = null; // 如果客户端的IO流被关闭时,读取的就是null while ((msg = reader.readLine()) != null) { String fwdMsg = "客户端[" + socket.getPort() + "]:" + msg + " "; System.out.print(fwdMsg); // 把消息转发给聊天室在线的其他用户 server.forwardMessage(socket, fwdMsg); // 检查用户是否准备退出 if (server.readyToQuit(msg)) { break; } } } catch (IOException e) { e.printStackTrace(); } finally { try { server.removeClient(socket); } catch (IOException e) { e.printStackTrace(); } } } }
4.2客户端
public class ChatClient { private final String DEFAULT_SERVER_HOST = "127.0.0.1"; private final int DEFAULT_SERVER_PORT = 8888; private final String QUIT = "quit"; private Socket socket; private BufferedReader reader; private BufferedWriter writer; // 发送消息给服务器,让服务器转发给其他人 public void send(String msg) throws IOException { // 确定socket的输出流仍然是开放的状态 if (!socket.isOutputShutdown()) { writer.write(msg + " "); writer.flush(); } } // 从服务器接收消息 public String receive() throws IOException { String msg = null; if (!socket.isInputShutdown()) { msg = reader.readLine(); } return msg; } // 检查用户是否准备退出 public boolean readyToQuit(String msg) { return QUIT.equals(msg); } // 关闭资源 public void close() { if (writer != null) { try { System.out.println("关闭socket"); writer.close(); } catch (IOException e) { e.printStackTrace(); } } } public void start() { try { // 创建socket socket = new Socket(DEFAULT_SERVER_HOST,DEFAULT_SERVER_PORT); // 创建IO流 reader = new BufferedReader( new InputStreamReader(socket.getInputStream()) ); writer = new BufferedWriter( new OutputStreamWriter(socket.getOutputStream()) ); // 创建线程:处理用户的输入 new Thread(new UserinputHandler(this)).start(); // 主线程:时刻读取服务器转发的消息 String msg = null; // 不为null说明和服务器连接的流依然是有效的 while ((msg = receive()) != null) { System.out.println(msg); } } catch (IOException e) { e.printStackTrace(); }finally { close(); } } public static void main(String[] args) { ChatClient chatClient = new ChatClient(); chatClient.start(); } }
public class UserinputHandler implements Runnable{ private ChatClient chatClient; public UserinputHandler(ChatClient chatClient) { this.chatClient = chatClient; } @Override public void run() { try { // 等待用户输入消息 BufferedReader consoleReader = new BufferedReader( new InputStreamReader(System.in) ); while (true) { String input = consoleReader.readLine(); // 向服务器发送消息 chatClient.send(input); // 检查用户是否准备退出 if (chatClient.readyToQuit(input)) { break; } } } catch (IOException e) { e.printStackTrace(); } } }