zoukankan      html  css  js  c++  java
  • IO、NIO实现简单聊天室,附带问题解析

      本篇文章主要使用IO和NIO的形式来实现一个简单的聊天室,并且说明IO方法存在的问题,而NIO又是如何解决的。

      大概的框架为,先提供思路和大概框架图——代码——问题及解决方式,这样会容易看一点。

    1. IO写法

    1.1 思路框架

      下面编写一个简单的聊天室,大概需要的功能就是服务端维护一个聊天室,里边的客户端发送消息之后服务将其消息转发给其他客户端,达到一个聊天室的效果。

      大致的思路:服务端区分职责,分成两部分,主线程负责接收连接并把连接放入到线程池中处理,维护一个线程池,所有对于socket的处理都交给线程池中的线程来处理。如下图。

    socket架构图

      下面贴上demo代码(代码中有几处为了方便并没有采用最规范的定义方式,如线程池的创建和Map初始化的时候未设置初始容量等)

      代码分五个类,服务端(ChatServer,监听作用,为服务端主线程)、客户端(ChatClient)、服务端处理器(ServerHandler,可以理解为线程池中要执行的事情)、客户端处理器(ClientHandler,客户端读写服务器消息的处理),工具类(SocketUtils,只有一个发送消息方法)。

    1.2 demo代码

    服务端:

    /**
     * 服务端启动类
     * 主要负责监听客户端连接
     */
    public class ChatServer {
    
        public static void main(String[] args) {
            ServerSocket serverSocket = null;
            /*----------为了方便使用Executors创建线程-------------*/
            ExecutorService handlerThreadPool = Executors.newFixedThreadPool(100);
            try {
                serverSocket = new ServerSocket(8888);
                while (true) {
                    System.out.println("-----------阻塞等待连接------------");
                    Socket socket = serverSocket.accept();
                    String key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
                    System.err.println(key + "已连接");
                    // 主线程只接收,处理直接交给处理线程池
                    handlerThreadPool.execute(new ServerHandler(socket));
                }
            } catch (IOException e) {
                e.printStackTrace();
                if (Objects.nonNull(serverSocket)) {
                    try {
                        serverSocket.close();
                    } catch (IOException ioException) {
                        ioException.printStackTrace();
                    }
                }
            }
        }
    
    }
    

    服务端处理类:

    
    /**
     * 服务端socket事件处理类
     * 负责处理对应socket中的读写操作
     */
    public class ServerHandler implements Runnable {
    
        /**
         * 连接到服务端的所有连接 socket的地址端口->socket
         */
        private static final Map<String, Socket> socketMap = new ConcurrentHashMap<>();
    
        /**
         * 维护名称和地址的map
         */
        private static final Map<String, String> nameMap = new ConcurrentHashMap<>();
    
        private Socket socket;
    
        /**
         * 每个socket的标识,使用地址+端口构成
         */
        private String key;
    
        public ServerHandler() {
        }
    
        public ServerHandler(Socket socket) {
            this.socket = socket;
            this.key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
        }
    
        @Override
        public void run() {
            Socket s = socket;
            // 根据消息执行不同操作
            InputStream inputStream;
            // debug查看数据用
            // Map<String, Socket> tmpMap = socketMap;
            try {
                inputStream = s.getInputStream();
                Scanner scanner = new Scanner(inputStream);
                while (true) {
                    String line = scanner.nextLine();
                    if (line.startsWith("register")) {
                        // 登记
                        String[] split = line.split(":");
                        String name = split[1];
                        String msg;
                        // 校验是否存在
                        if (socketMap.containsKey(key)) {
                            msg = "请勿重复登记";
                            sendMsg(s, msg);
                            return;
                        }
    
                        if (nameMap.containsValue(name)) {
                            msg = "名称已被登记,请换一个名称";
                            sendMsg(s, msg);
                            return;
                        }
    
                        // 通知自己已连接
                        sendMsg(s, "已连接到服务器");
    
                        msg = name + "进入聊天室";
                        // 将消息转发给其他客户端
                        sendMsgToClients(msg);
    
                        // 放入socket池
                        socketMap.put(key, s);
                        nameMap.put(key, name);
                        System.err.println(name + "已登记");
                    } else if (line.trim().equalsIgnoreCase("end")) {
                        if (notPassRegisterValidate()) {
                            continue;
                        }
    
                        // 断开连接
                        socketMap.remove(key);
                        String name = nameMap.get(key);
                        String msg = name + "离开聊天室";
                        System.err.println(msg);
                        // 将消息转发给其他客户端
                        sendMsgToClients(msg);
    
                        msg = "已断开连接";
                        // 发送给对应的连接断开信息
                        sendMsg(s, msg);
                        inputStream.close();
                        break;
                    } else {
                        if (notPassRegisterValidate()) {
                            continue;
                        }
    
                        // 正常通信
                        String name = nameMap.get(key);
                        String msg = name + ":" + line;
                        // 将消息转发给其他客户端
                        sendMsgToClients(msg);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 是否已登录校验
         *
         * @return 是否已登录
         */
        private boolean notPassRegisterValidate() {
            boolean hasRegister = nameMap.containsKey(key);
            if (hasRegister) {
                return false;
            }
    
            String msg = "您还未登录,请先登录";
            sendMsg(socket, msg);
            return true;
        }
    
        /**
         * 往连接发送消息
         *
         * @param socket 客户端连接
         * @param msg    消息
         */
        private void sendMsg(Socket socket, String msg) {
            SocketUtils.sendMsg(socket, msg);
            if (socket.isClosed()) {
                socketMap.remove(key);
            }
        }
    
        /**
         * 发送给其他客户端信息
         *
         * @param msg 信息
         */
        private void sendMsgToClients(String msg) {
            for (Map.Entry<String, Socket> entry : socketMap.entrySet()) {
                if (this.key.equals(entry.getKey())) {
                    continue;
                }
    
                sendMsg(entry.getValue(), msg);
            }
        }
    
    }
    

    工具类(一个发送消息的方法):

    public class SocketUtils {
    
        private SocketUtils() {
        }
    
        public static void sendMsg(Socket socket, String msg) {
            Socket s = socket;
            OutputStream outputStream = null;
            msg += "
    ";
            try {
                outputStream = s.getOutputStream();
                outputStream.write(msg.getBytes(StandardCharsets.UTF_8));
                outputStream.flush();
            } catch (IOException e) {
                System.err.println("发送消息失败, 连接已断开");
                try {
                    if (Objects.nonNull(outputStream)) {
                        outputStream.close();
                    }
                    socket.close();
                } catch (IOException ioException) {
                    ioException.printStackTrace();
                }
    
            }
        }
    
    }
    

    客户端:

    /**
     * 客户端读和写各自使用一个线程
     */
    public class ChatClient {
    
        public static void main(String[] args) {
            Socket socket;
            ExecutorService clientHandlerPool = Executors.newFixedThreadPool(2);
            try {
                socket = new Socket("localhost", 8888);
    
                // 写线程
                clientHandlerPool.execute(new ClientHandler(socket, 1));
                // 读线程
                clientHandlerPool.execute(new ClientHandler(socket, 0));
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    客户端处理器:

    /**
     * 客户端处理器
     * 根据type来区分是做读工作还是写工作
     */
    public class ClientHandler implements Runnable {
    
        private Socket socket;
    
        /**
         * 处理类型,0-读、1-写
         */
        private int type;
    
        public ClientHandler() {
            throw new IllegalArgumentException("不能使用没有参数的构造函数");
        }
    
        public ClientHandler(Socket socket, int type) {
            this.socket = socket;
            this.type = type;
        }
    
        @Override
        public void run() {
            if (type == 1) {
                // 进行写操作
                doWriteJob();
                return;
            }
    
            // 默认读操作
            doReadJob();
        }
    
        /**
         * 读操作
         */
        private void doReadJob() {
            Socket s = socket;
            InputStream inputStream;
            try {
                inputStream = s.getInputStream();
                Scanner scanner = new Scanner(inputStream);
                while (true) {
                    String line = scanner.nextLine();
                    if (null != line && !"".equals(line)) {
                        System.err.println(line);
                    }
                    // 如果已退出了,那么关闭连接
                    if ("已断开连接".equals(line)) {
                        socket.close();
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    socket.close();
                } catch (IOException ioException) {
                    ioException.printStackTrace();
                }
            }
        }
    
        /**
         * 写线程
         */
        private void doWriteJob() {
            Socket s = socket;
            try {
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    String output = scanner.nextLine();
                    if (Objects.nonNull(output) && !"".equals(output)) {
                        SocketUtils.sendMsg(s, output);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("错误发生了:" + e.getMessage());
            }
        }
    }
    

    结果:

    IO结果图

    思考:当前这样实现有什么瓶颈,可能会出现什么问题?

    存在问题:

    1. 服务端使用accept阻塞接收线程,连接一个一个处理,在高并发下处理性能缓慢
    2. 没有连接的时候线程一直处于阻塞状态造成资源的浪费(如果使用多线程接收处理并发,那么没连接的时候造成多个线程的资源浪费)。

    2. 使用NIO实现聊天室

    2.1 整体思路

      那我们来看下NIO是怎么解决上方的问题的,首先上这个demo整体的架构图。

    NIO架构图

      大概的逻辑为

    1. 服务端将ServerSocketChannel注册到Selector中,客户端连接进来的时候事件触发,将客户端的连接注册到selector中。
    2. 主线程负责selector的轮询工作,发现有事件可以处理就将其交给线程池
    3. 客户端同理分成两个部分,写操作和读操作,每个操作由一个线程单独完成;但是如果读操作处理使用while循环不断轮询等待接收的话,CPU会飙升,所以需要客户端新建一个selector来解决这个问题,注意这个selector跟服务端不是同一个,没有啥关系。

      代码分类大致跟IO写法一样,分成服务端、服务端处理器、客户端、客户端处理器,下面为demo。

    2.2 代码

    服务端:

    public class ChatServer {
    
        private Selector selector;
    
        private ServerSocketChannel serverSocketChannel;
    
        private static final ExecutorService handlerPool = Executors.newFixedThreadPool(100);
    
        public ChatServer() throws IOException {
            this.selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            ServerSocket serverSocket = serverSocketChannel.socket();
            serverSocket.bind(new InetSocketAddress(9999));
            // 将服务端的socket注册到selector中,接收客户端,并将其注册到selector中,其本身也是selector中的一个I/O事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.err.println("聊天室服务端初始化结束");
        }
    
        /**
         * 启动方法
         * 1.监听,拿到之后进行处理
         */
        public void start() throws IOException {
            int count;
            while (true) {
                // 可能出现select方法没阻塞,空轮询导致死循环的情况
                count = selector.select();
    
                if (count > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        // 交给线程池处理
                        handlerPool.execute(new ServerHandler(key, selector));
                        // 处理完成后移除
                        iterator.remove();
                    }
                }
            }
        }
    
        public static void main(String[] args) throws IOException {
            new ChatServer().start();
        }
    }
    

    服务端处理器:

    
    public class ServerHandler implements Runnable {
    
        private SelectionKey key;
    
        private Selector selector;
    
        public ServerHandler() {
    
        }
    
        /**
         * 本来可以通过key拿到selector,这里为了图方便就这样写了
         */
        public ServerHandler(SelectionKey key, Selector selector) {
            this.key = key;
            this.selector = selector;
        }
    
        @Override
        public void run() {
            try {
                if (key.isAcceptable()) {
                    // 说明是服务端的事件,注意这里强转换为的是ServerSocketChannel
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    // 接收连接
                    SocketChannel socket = channel.accept();
                    if (Objects.isNull(socket)) {
                        return;
                    }
    
                    socket.configureBlocking(false);
                    // 接收客户端的socket并且将其注册到服务端这边的selector中,注意客户端在此时跟服务端selector产生关联
                    socket.register(selector, SelectionKey.OP_READ);
                    System.err.println("服务端已接收连接");
                } else if (key.isReadable()) {
                    // 客户端发送信息过来了
                    doReadJob();
                }
            } catch (IOException e) {
                e.printStackTrace();
                // 错误处理
            }
        }
    
        /**
         * 读取操作
         */
        private void doReadJob() throws IOException {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int readCount = socketChannel.read(buffer);
            if (readCount > 0) {
                String msg = new String(buffer.array(), StandardCharsets.UTF_8);
                System.err.println(socketChannel.getRemoteAddress().toString() + "的信息为:" + msg);
    
                // 转发给其他客户端
                sendMsgToOtherClients(msg);
            }
        }
    
        /**
         * 转发消息给其他客户端
         *
         * @param msg 消息
         */
        private void sendMsgToOtherClients(String msg) throws IOException {
    
            SocketChannel self = (SocketChannel) key.channel();
    
            Set<SelectionKey> keys = selector.keys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                SelectableChannel channel = selectionKey.channel();
                // 如果是本身或者不是socketChannel类型则跳过
                if (self.equals(channel) || channel instanceof ServerSocketChannel) {
                    continue;
                }
    
                SocketChannel socketChannel = (SocketChannel) channel;
                ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
                socketChannel.write(byteBuffer);
            }
        }
    }
    

    客户端:

    
    public class ChatClient {
    
        private Selector selector;
    
        private SocketChannel socketChannel;
    
        private static ExecutorService dealPool = Executors.newFixedThreadPool(2);
    
        public ChatClient() throws IOException {
    
            /*
             * 说明一下:
             * 客户端这边的selector跟刚才在服务端定义的selector是不同的两个selector
             * 客户端这边不需要selector也能实现功能,但是读取的时候必须不断的循环,会导致CPU飙升,
             * 所以使用selector是为了解决这个问题的,别跟服务端的selector搞混就好
             */
            selector = Selector.open();
            socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999));
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
        }
    
    
        public void start() throws IOException, InterruptedException {
            // 连接
    //        socketChannel.connect(new InetSocketAddress("localhost", 9999));
            while (!socketChannel.finishConnect()) {
                System.err.println("正在连接...");
                TimeUnit.MILLISECONDS.sleep(200);
            }
    
            System.err.println("连接成功");
    
            // 使用两个线程来分别处理读取和写操作
            // 写数据
            dealPool.execute(new ClientHandler(selector, socketChannel, 1));
    
            // 读取数据
            dealPool.execute(new ClientHandler(selector, socketChannel, 0));
        }
    
    
        public static void main(String[] args) throws IOException, InterruptedException {
            new ChatClient().start();
        }
    }
    

    客户端处理器:

    
    public class ClientHandler implements Runnable {
    
        private Selector selector;
    
        private SocketChannel socketChannel;
    
        /**
         * 0-读,1-写
         */
        private int type;
    
        public ClientHandler() {
        }
    
        public ClientHandler(Selector selector, SocketChannel socketChannel, int type) {
            // selector是为了解决读时候CPU飙升的问题,具体见客户端的启动类代码注释
            this.selector = selector;
            this.socketChannel = socketChannel;
            this.type = type;
        }
    
        @Override
        public void run() {
            try {
                if (type == 0) {
                    doClientReadJob();
                    return;
                }
    
                doClientWriteJob();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 写操作
         */
        private void doClientWriteJob() throws IOException {
            SocketChannel sc = socketChannel;
            Scanner scanner = new Scanner(System.in);
            while (true) {
                if (scanner.hasNextLine()) {
                    String line = scanner.nextLine();
                    if (null != line && !"".equals(line)) {
                        ByteBuffer buffer = ByteBuffer.wrap(line.getBytes(StandardCharsets.UTF_8));
                        sc.write(buffer);
                    }
                }
            }
        }
    
        /**
         * 读操作
         */
        private void doClientReadJob() throws IOException {
            SocketChannel sc = socketChannel;
            ByteBuffer buf = ByteBuffer.allocate(1024);
            while (true) {
                int select = selector.select();
                if (select > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        // 这是必须的,不然下方的remove会出错
                        SelectionKey next = iterator.next();
                        // 这里因为只有本身这个客户端注册到客户端的selector中,所以有事件一定是它的,也就不用从key拿了,直接操作就行
                        buf.clear();
                        int read = sc.read(buf);
                        if (read > 0) {
                            String msg = new String(buf.array(), StandardCharsets.UTF_8);
                            System.err.println(msg);
                        }
                        // 事件处理完之后要移除这个key,否则的话selector.select()方法不会再读到这个key,即便有新的时间到这个channel来
                        iterator.remove();
                    }
                }
            }
        }
    
    }
    

      结果图:

    NIO结果图

    在编写的过程中发现了以下两点:

    1. select方法之后如果存在key,并且接下来的操作未对这个selectionKeyremove操作,那么下次的select不会再将其选入,即便有事件发生,也就是说,select方法不会选择之前已经选过的key。
    2. selector.select()方法中偶尔会出现不阻塞的情况。这就是NIO中的空轮询bug,也就是说,没有连接又不阻塞的话,while(true) ... 的写法就是一个死循环,会导致CPU飙升。

      第二点问题在NIO框架(如netty)中都采用了比较好的解决方法,可以去查下如何解决的。接下来看下NIO的写法是否解决了IO写法中存在的问题:

    1. 服务端使用accept阻塞接收线程,连接一个一个处理,在高并发下处理性能缓慢。

      答:上述写法中还是使用一个ServerSocketChannel来接收客户端,没有解决这个问题;但是可以通过使用线程池的方式来解决。也就是说将服务端的事件分成两个部分,第一个部分为接收客户端,使用一个线程池来维护;第二个部分为客户端的事件处理操作,也维护一个线程池来执行这些事件。

        这样性能上去了,由于selector的存在也不会出现资源浪费的事情netty就是这么做的哦。

    2. 没有连接的时候线程一直处于阻塞状态造成资源的浪费(如果使用多线程接收处理并发,那么没连接的时候造成多个线程的资源浪费)。

      答:解决。NIO写法主要有selector不断轮询,不会出现没连接不作为的情况,而且多个连接的话也没有问题(参考1的回答)。

    3. 小结

      两种写法都有Reactor模式的影子,但是IO写法有明显的缺点就是如果没有连接会造成资源浪费的问题(采用多个接收连接的话更甚),而NIO中selector轮询机制就很好的解决了无连接时无作为的情况,并且在性能方面可以通过职责分类和线程池来得到改善,所以,NIO,永远滴神。

    需要压力,需要努力。

  • 相关阅读:
    Linux日志文件utmp、wtmp、lastlog、messages
    Linux日志五大命令详解
    php 函数合并 array_merge 与 + 的区别
    MySQL对数据表进行分组查询(GROUP BY)
    如何在mysql中查询每个分组的前几名
    Autojump:一个可以在 Linux 文件系统快速导航的高级 cd 命令
    linux 查看磁盘空间大小
    js刷新页面方法大全
    [知乎有感] 读研到底为了什么,值不值得?
    [Hadoop] 在Ubuntu系统上一步步搭建Hadoop(单机模式)
  • 原文地址:https://www.cnblogs.com/zhangweicheng/p/13358026.html
Copyright © 2011-2022 走看看