zoukankan      html  css  js  c++  java
  • (六)NIO聊天室实战

    1. NIO模型分析

    对于服务端,创建Selector,需要监听ServerSocketChannel的ACCEPT事件,监听注册的每一个SocketChannel的READ事件。

      • Selector监听ACCEPT事件,如果有客户发出连接请求,服务端要为该客户的通道在服务器Selector注册READ事件。
      • Selector监听READ事件,即监听所注册客户的SocketChannel上是否有可读的数据。

    对于客户端,也要创建Selector,这里监听的事件为SocketChannel的CONNECT事件 和 READ事件。

      • Selector监听CONNECT事件,直到连接完全建立后,为客户在selector上注册READ事件。
      • Selector监听READ事件,目的是监听服务器转发来的消息。

    2.实验结果

     

      

    3.完整代码

       3.1服务器

    public class ChatServer {
        private static final int DEFAULT_PORT = 8888;
        private static final String QUIT = "quit";
        private static final int BUFFER = 1024;
    
        private ServerSocketChannel server;
        private Selector selector;
        // 从ServerSocketChannel中读取客户端发来的消息。相对应,从客户角度,即要把数据写入该buffer
        private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
        // 实现消息转发时,把消息写入到其他客户的SocketChannel
        private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
        private Charset charset = Charset.forName("UTF-8");
        private int port; // 存储用户自定义的服务器端口
    
        public ChatServer() {
            this(DEFAULT_PORT);
        }
        public ChatServer(int port) {
            this.port = port;
        }
        private boolean readyToQuit(String msg) {
            return QUIT.equals(msg);
        }
        private void close(Closeable closeable) {
            if (closeable != null) {
                try {
                    closeable.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private String getClientName(SocketChannel client) {
            return "客户端[" + client.socket().getPort() + "]";
        }
    
        // 从客户通道上读取消息
        private String receive(SocketChannel client) throws IOException {
            rBuffer.clear(); // 因为rBuffer是类变量,每次调整指针,以清空消息
            // 写入到rBuffer,read()返回的是本次读到了多少字节
            while (client.read(rBuffer) > 0);
            rBuffer.flip(); // 写模式变为读模式
            return String.valueOf(charset.decode(rBuffer));
        }
    
        // 转发消息
        private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {
            // 注意区分selectedKeys() 和 keys(),
            // 前者返回所有触发了的事件所对应的SelectionKey的集合,后者返回目前所有注册到selector上的SelectionKey的集合
            for (SelectionKey key : selector.keys()) {
                Channel connectedClient = key.channel();
                if (connectedClient instanceof ServerSocketChannel) {
                    continue;
                }
                if (key.isValid() && !client.equals(connectedClient)) {
                    wBuffer.clear();
                    wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg)); // 向wBuffer写数据
                    wBuffer.flip(); // 变为读模式
                    while (wBuffer.hasRemaining()) {
                        ((SocketChannel)connectedClient).write(wBuffer); // 疑问:为什么不用检测客户channel是否处于WRITE状态?
                    }
                }
            }
        }
    
        // 服务器需要处理两种事件,ACCEPT 和 READ
        // 疑问:为什么不用监听WRITE事件???
        private void handles(SelectionKey key) throws IOException {
            // ACCEPT事件 —— 和客户端建立了连接
            if (key.isAcceptable()) {
                // 获取ServerSocketChannel
                ServerSocketChannel server = (ServerSocketChannel) key.channel();
                // 获取对应客户端的SocketChannel
                SocketChannel client = server.accept();
                client.configureBlocking(false); // 由默认的阻塞式转换为非阻塞式
                // 把客户注册在Selector上,监听READ事件
                client.register(selector, SelectionKey.OP_READ);
                System.out.println(getClientName(client) + "已连接");
    
            }else if (key.isReadable()) {
                // READ事件 —— 客户端发送了消息
                SocketChannel client = (SocketChannel) key.channel();
                String fwdMsg = receive(client);
    
                if (fwdMsg.isEmpty()) {
                    // 客户端异常
                    key.cancel(); // 取消掉,使selector不要再监听这个通道的READ事件了
                    selector.wakeup(); // 通知selector,监听的状态发生了改变
                } else {
                    System.out.println(getClientName(client) + ":" + fwdMsg);
                    forwardMessage(client, fwdMsg);
    
                    // 检查用户是否退出
                    if (readyToQuit(fwdMsg)) {
                        key.cancel();
                        selector.wakeup();
                        System.out.println(getClientName(client) + "已断开");
                    }
                }
            }
        }
    
        private void start() {
            try {
                // 创建ServerSocketChannel,默认处于阻塞式调用
                server = ServerSocketChannel.open();
                // 确保ServerSocketChannel处于非阻塞式状态
                server.configureBlocking(false);
                // 把通道所关联的socket绑定监听端口
                server.socket().bind(new InetSocketAddress(port));
    
                selector = Selector.open();
                // 让selector对象监听创建ServerSocketChannel上的ACCEPT事件
                server.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("启动服务器,监听端口:" + port + "...");
    
                while (true) {
                    selector.select(); // select()函数的调用是阻塞式的,直到所监听的通道的事件触发了,才会返回
                    // selectionKeys中包含了一次调用select(),selector监听到的所触发所有事件的信息
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    for (SelectionKey key : selectionKeys) {
                        // 处理被触发的事件
                        handles(key);
                    }
                    selectionKeys.clear(); // 已经处理过的,要手动清空
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                close(selector); // 关闭selector之前,也自动关闭了注册到selector上的事件对应的通道
            }
        }
    
        public static void main(String[] args) {
            ChatServer chatServer = new ChatServer(7777);
            chatServer.start();
        }
    }

       3.2客户端

    public class ChatClient {
        private static final String DEFAULT_SERVER_HOST = "127.0.0.1";
        private static final int DEFAULT_SERVER_PORT = 8888;
        private static final String QUIT = "quit";
        private static final int BUFFER = 1024;
    
        private String host;
        private int port;
        private SocketChannel client;
        private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
        private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
        private Selector selector;
        private Charset charset = Charset.forName("UTF-8");
    
        public ChatClient() {
            this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
        }
        public ChatClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public boolean readyToQuit(String msg) {
            return QUIT.equals(msg);
        }
        private void close(Closeable closeable) {
            if (closeable != null) {
                try {
                    closeable.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void handles(SelectionKey key) throws IOException {
            // 处理CONNECT事件 - 连接就绪事件
            if (key.isConnectable()) {
                SocketChannel client = (SocketChannel) key.channel();
                if (client.isConnectionPending()) { // 如果为false,说明连接还不能完全建立,需要等待
                    client.finishConnect(); // 完成建立连接的过程
                    // 使用额外线程处理用户的输入
                    new Thread(new UserinputHandler(this)).start();
                }
                client.register(selector, SelectionKey.OP_READ);
            }else if (key.isReadable()) {
                // READ事件 - 服务器转发消息
                SocketChannel client = (SocketChannel) key.channel();
                String msg = receive(client);
                if (msg.isEmpty()) {
                    // 服务器异常
                    close(selector);
                }else {
                    System.out.println(msg);
                }
            }
    
        }
    
        private String receive(SocketChannel client) throws IOException {
            rBuffer.clear();
            // 写入到rBuffer,read()返回的是本次读到了多少字节
            while (client.read(rBuffer) > 0);
            rBuffer.flip();
            return String.valueOf(charset.decode(rBuffer));
        }
    
        // 发送消息给服务器,让服务器转发给其他人
        public void send(String msg) throws IOException {
            if (msg.isEmpty()) {
                return;
            }
            wBuffer.clear();
            wBuffer.put(charset.encode(msg));
            wBuffer.flip();
            while (wBuffer.hasRemaining()) {
                client.write(wBuffer);
            }
    
            // 检查用户是否退出
            if (readyToQuit(msg)) {
                close(selector);
            }
        }
    
        private void start() {
            try {
                client = SocketChannel.open();
                client.configureBlocking(false); // 改为非阻塞状态
    
                selector = Selector.open();
                // CONNECT: SocketChannel已经与服务器建立连接的状态
                client.register(selector, SelectionKey.OP_CONNECT);
                // 向服务器发送连接请求
                client.connect(new InetSocketAddress(host,port));
    
    
                while (true) {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    for (SelectionKey key : selectionKeys) {
                        handles(key);
                    }
                    selectionKeys.clear();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (ClosedSelectorException e) {
                // 用户正常关闭
            } finally {
                close(selector);
            }
        }
    
        public static void main(String[] args) {
            ChatClient client = new ChatClient("127.0.0.1",7777);
            client.start();
        }
    }
    public class UserinputHandler implements Runnable{
    
        private ChatClient chatClient;
        public UserinputHandler(ChatClient chatClient) {
            this.chatClient = chatClient;
        }
    
        @Override
        public void run() {
            try {
                // 等待用户输入消息
                BufferedReader consoleReader = new BufferedReader(
                        new InputStreamReader(System.in)
                );
                while (true) {
                    String input = consoleReader.readLine();
    
                    // 向服务器发送消息
                    chatClient.send(input);
    
                    // 检查用户是否准备退出
                    if (chatClient.readyToQuit(input)) {
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    参考

    一站式学习Java网络编程 全面理解BIO_NIO_AIO,学习手记(六)

  • 相关阅读:
    指针
    Centos6.5 安装Vim7.4
    C++ Prime:指针和const
    C++ Prime:const的引用
    C++ Prime:函数
    C++ Prime:范围for语句
    python的oop概述
    脚本单独调用django模块
    xtrabackup备份之xbstream压缩
    MySQL8.0安装
  • 原文地址:https://www.cnblogs.com/HuangYJ/p/14457177.html
Copyright © 2011-2022 走看看