zoukankan      html  css  js  c++  java
  • (四)BIO聊天室实战

    1.BIO编程模型

    BIO模型:对每一个建立连接的客户端,服务端都要创建一个线程单独处理和这个客户的通信,典型的一请求一应答。

       1.1 优化——伪异步IO编程模型

    • 思路:使用线程池来管理服务器端所有可用线程,即通过一个线程池来处理多个客户端的请求接入。
    • 好处:通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致服务器线程资源耗尽。 

    2. 多人聊天室功能概述及设计

    • 功能概述:支持多人同时在线,每个用户的发言都被转发给其他在线用户
    • 服务器端设计:
      • 使用主线程做Acceptor,等待客户端连接,并为每一个客户分配一个Handle线程;
      • 此外,服务器端需要使用容器存储目前在线的所有客户列表,以便转发消息给其他用户。
      • Handle线程任务是,维护客户列表(添加 or 移除);接收用户消息,并转发给其他在线用户。
    • 客户端设计:
      • 要有两个线程,其中主线程与服务器建立连接,并接收来自服务器的消息;
      • 另一个线程则用来处理用户的输入,并将消息发送到服务器。因为等待用户键盘输入时,是阻塞的,此时不能及时显示其他客户的消息。

    3. 测试结果

    4. 完整代码

       4.1服务器

    public class ChatServer {
    
        private int DEFAULT_PORT = 8888;
        private final String QUIT = "quit";
    
        private ExecutorService executorService;  // 线程池
        private ServerSocket serverSocket;
        private Map<Integer, Writer> connectedClients; // 键:端口号;值:用于写入客户端的Writer
    
        public ChatServer() {
            executorService = Executors.newFixedThreadPool(2); // 创建固定数目的线程池
            connectedClients = new HashMap<>();
        }
    
        // 添加客户方法
        public synchronized void addClient(Socket socket) throws IOException {
            if (socket != null) {
                int port = socket.getPort();
                BufferedWriter writer = new BufferedWriter(
                        new OutputStreamWriter(socket.getOutputStream())
                );
                connectedClients.put(port, writer); // 要处理线程不安全的情况
                System.out.println("客户端[" + port + "]已连接到服务器");
            }
        }
    
        // 移除客户方法
        public synchronized void removeClient(Socket socket) throws IOException {
            if (socket != null) {
                int port = socket.getPort();
                if (connectedClients.containsKey(port)) {
                    // 关闭外层流writer后,内层的socket也会自动关闭
                    connectedClients.get(port).close();
                }
                connectedClients.remove(port); // 要处理线程不安全的情况
                System.out.println("客户端[" + port + "]已断开连接");
            }
        }
    
        // 转发消息方法
        public synchronized void forwardMessage(Socket socket, String fwdMsg) throws IOException {
            for (Integer id : connectedClients.keySet()) {
                if (!id.equals(socket.getPort())) {
                    Writer writer = connectedClients.get(id);
                    writer.write(fwdMsg);
                    writer.flush();
                }
            }
        }
    
        public boolean readyToQuit(String msg) {
            return QUIT.equals(msg);
        }
    
        //关闭资源
        public synchronized void close() {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                    System.out.println("关闭serverSocket");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        // 服务器主线程任务:等待客户端连接,为其分配处理线程
        public void start() {
            try {
                // 绑定监听端口
                serverSocket = new ServerSocket(DEFAULT_PORT);
                System.out.println("启动服务器,监听端口:" + DEFAULT_PORT + "...");
    
                while (true) {
                    // 等待客户端连接
                    Socket socket = serverSocket.accept();
    
                    // bio模型为每个客户都创建一个ChatHandler线程
    //                new Thread(new ChatHandler(this,socket)).start();
    
                    // 优化:伪异步IO编程
                    executorService.execute(new ChatHandler(this,socket));
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                close();
            }
        }
    
        public static void main(String[] args) {
            ChatServer server = new ChatServer();
            server.start();
        }
    }
    public class ChatHandler implements Runnable{
        private ChatServer server;
        private Socket socket;
    
        public ChatHandler(ChatServer server, Socket socket) {
            this.server = server;
            this.socket = socket;
        }
    
        @Override
        public void run() {
            try {
                // 存储新上线用户
                server.addClient(socket);
    
                // 读取用户发送的消息
                BufferedReader reader = new BufferedReader(
                        new InputStreamReader(socket.getInputStream())
                );
    
                String msg = null;
                // 如果客户端的IO流被关闭时,读取的就是null
                while ((msg = reader.readLine()) != null) {
                    String fwdMsg = "客户端[" + socket.getPort() + "]:" + msg + "
    ";
                    System.out.print(fwdMsg);
    
                    // 把消息转发给聊天室在线的其他用户
                    server.forwardMessage(socket, fwdMsg);
    
                    // 检查用户是否准备退出
                    if (server.readyToQuit(msg)) {
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    server.removeClient(socket);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

       4.2客户端

    public class ChatClient {
    
        private final String DEFAULT_SERVER_HOST = "127.0.0.1";
        private final int DEFAULT_SERVER_PORT = 8888;
        private final String QUIT = "quit";
    
        private Socket socket;
        private BufferedReader reader;
        private BufferedWriter writer;
    
        // 发送消息给服务器,让服务器转发给其他人
        public void send(String msg) throws IOException {
            // 确定socket的输出流仍然是开放的状态
            if (!socket.isOutputShutdown()) {
                writer.write(msg + "
    ");
                writer.flush();
            }
        }
    
        // 从服务器接收消息
        public String receive() throws IOException {
            String msg = null;
            if (!socket.isInputShutdown()) {
                msg = reader.readLine();
            }
            return msg;
        }
    
        // 检查用户是否准备退出
        public boolean readyToQuit(String msg) {
            return QUIT.equals(msg);
        }
    
        // 关闭资源
        public void close() {
            if (writer != null) {
                try {
                    System.out.println("关闭socket");
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public void start() {
            try {
                // 创建socket
                socket = new Socket(DEFAULT_SERVER_HOST,DEFAULT_SERVER_PORT);
                // 创建IO流
                reader = new BufferedReader(
                        new InputStreamReader(socket.getInputStream())
                );
                writer = new BufferedWriter(
                        new OutputStreamWriter(socket.getOutputStream())
                );
    
                // 创建线程:处理用户的输入
                new Thread(new UserinputHandler(this)).start();
    
                // 主线程:时刻读取服务器转发的消息
                String msg = null;
                // 不为null说明和服务器连接的流依然是有效的
                while ((msg = receive()) != null) {
                    System.out.println(msg);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                close();
            }
        }
    
        public static void main(String[] args) {
            ChatClient chatClient = new ChatClient();
            chatClient.start();
        }
    }
    public class UserinputHandler implements Runnable{
    
        private ChatClient chatClient;
        public UserinputHandler(ChatClient chatClient) {
            this.chatClient = chatClient;
        }
    
        @Override
        public void run() {
            try {
                // 等待用户输入消息
                BufferedReader consoleReader = new BufferedReader(
                        new InputStreamReader(System.in)
                );
                while (true) {
                    String input = consoleReader.readLine();
    
                    // 向服务器发送消息
                    chatClient.send(input);
    
                    // 检查用户是否准备退出
                    if (chatClient.readyToQuit(input)) {
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    参考

    一站式学习Java网络编程 全面理解BIO_NIO_AIO,学习手记(四)

  • 相关阅读:
    Deep learning:三十八(Stacked CNN简单介绍)
    机器学习&数据挖掘笔记_11(高斯过程回归)
    Deep learning:三十九(ICA模型练习)
    机器学习&数据挖掘笔记_9(一些svm基础知识)
    机器学习&数据挖掘笔记_10(高斯过程简单理解)
    本人部分博客导航(ing...)
    算法设计和数据结构学习_3(《数据结构和问题求解》part2笔记)
    Deep learning:三十七(Deep learning中的优化方法)
    C++笔记(2):《数据结构和问题求解》part1笔记
    算法设计和数据结构学习_4(《数据结构和问题求解》part4笔记)
  • 原文地址:https://www.cnblogs.com/HuangYJ/p/14457171.html
Copyright © 2011-2022 走看看