zoukankan      html  css  js  c++  java
  • (八)AIO聊天室实战

    1.AIO模型分析

    • AsyncChannelGroup指的是,可以被多个异同通道所共享的资源群组。其中,最主要的资源有线程池。
    • 不指定的情况下,系统会使用一个默认的AsyncChannelGroup。
    • 在AIO编程模型中,高效率和方便是因为操作系统做了很多的事。
    • 在请求处理的过程中,并不是在主线程中完成的,而是通道组利用线程池资源,在不同的线程中完成异步处理。

    2.实验结果

     

     

    3.完整代码

       3.1服务器端

    public class ChatServer {
    
        private static final String LOCALHOST = "localhost";
        private static final int DEFAULT_PORT = 8888;
        private static final String QUIT = "quit";
        private static final int BUFFER = 1024;
        private static final int THREADPOOL_SIZE = 8;
    
        private AsynchronousChannelGroup channelGroup; // 自定义Group
        private AsynchronousServerSocketChannel serverChannel;
        private List<ClientHandler> connectedClients;
        private Charset charset = Charset.forName("UTF-8");
        private int port;
    
        public ChatServer() {
            this(DEFAULT_PORT);
        }
    
        public ChatServer(int port) {
            this.port = port;
            this.connectedClients = new ArrayList<>();
        }
        private boolean readyToQuit(String msg) {
            return QUIT.equals(msg);
        }
        private void close(Closeable closeable) {
            if (closeable != null) {
                try {
                    closeable.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        // 获取客户端的端口号并打印出来
        private String getClientName(AsynchronousSocketChannel clientChannel) {
            int port = -1;
            try {
                InetSocketAddress remoteAddress = (InetSocketAddress) clientChannel.getRemoteAddress();
                port = remoteAddress.getPort();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return "客户端[" + port + "]";
        }
    
    
        // 添加和移除用户操作,记得用synchronized修饰
        private synchronized void addClient(ClientHandler clientHandler) {
            connectedClients.add(clientHandler);
            System.out.println(getClientName(clientHandler.getClientChannel()) + "已连接");
    
        }
        private synchronized void removeClient(ClientHandler clientHandler) {
            connectedClients.remove(clientHandler);
            AsynchronousSocketChannel clientChannel = clientHandler.getClientChannel();
            System.out.println(getClientName(clientChannel) + "已断开连接");
            close(clientChannel);
        }
    
        // 转发和接收
        private synchronized String receive(ByteBuffer buffer) {
            return String.valueOf(charset.decode(buffer));
        }
        private synchronized void forwardMessage(AsynchronousSocketChannel clientChannel, String fwdMsg) {
            for (ClientHandler connectedHandler : connectedClients) {
                AsynchronousSocketChannel client = connectedHandler.getClientChannel();
                if (!client.equals(clientChannel)) {
                    // 注意这个try-catch是自己加的,避免因写操作问题而引发服务器宕dang机
                    try {
                        // 将要转发的信息写入到缓冲区中
                        ByteBuffer buffer = charset.encode(getClientName(client) + ": " + fwdMsg);
    //                    ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
    //                    buffer.put(charset.encode(getClientName(client) + ": " + fwdMsg));
    //                    buffer.flip();
                        // 写给其他客户
                        client.write(buffer, null, connectedHandler);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        private void start() {
    
            try {
                // 创建自定义Group
                ExecutorService executorService = Executors.newFixedThreadPool(THREADPOOL_SIZE); // 创建固定数量的线程池
                channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
                // 创建AsynchronousServerSocketChannel,绑定服务端监听端口
                serverChannel = AsynchronousServerSocketChannel.open(channelGroup); // 传入自定义的Group
                serverChannel.bind(new InetSocketAddress(LOCALHOST, port));
                System.out.println("启动服务器,监听端口:" + port + "...");
    
                // 持续监听客户端的连接请求,如果不加循环,第一次异步调用accept后,还没等到回调函数,系统就退出了
                while (true) {
                    serverChannel.accept(null, new AcceptHandler()); // 使用AcceptHandler()的回调函数异步处理accept()
                    // 因为不想浪费资源,过于频繁的调用accept
                    // 希望通过AcceptHandler的回调函数来进一步持续监听从客户端发来的请求
                    System.in.read(); // 阻塞,等待System.in流上的输入数据
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                close(serverChannel);
            }
        }
    
        // CompletionHandler接口泛型的类型,第一个表示异步调用函数应该返回的类型,即accept()返回结果的类型
        // 第二个泛型类型指的是,attachment对象的类型
        private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object> {
    
            @Override
            public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
                if (serverChannel.isOpen()) {
                    serverChannel.accept(null, this); // 让服务器接着等待下一个客户端的连接
                }
                // 接收本次连接的客户端所发送的消息
                if (clientChannel != null && clientChannel.isOpen()) {
                    ClientHandler handler = new ClientHandler(clientChannel); // handler处理特定的clientChannel
                    // 通过Buffer实现和Channel之间的数据交互
                    ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
                    // 将新用户添加到在线用户列表
                    addClient(handler);
    
                    // 第一个buffer参数:读取channel数据写入buffer里; 第二个buffer参数:把buffer当做attachment
                    // read()返回的数据类型是Integer,表示从channel中读到了多少数据。
                    // 如果在回调函数想要得到具体的数据内容是什么,把buffer当做attachment,因为attachment会作为参数传递到回调函数
                    clientChannel.read(buffer,buffer, handler);
                }
            }
    
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("连接失败: " + exc.getMessage());
            }
        }
    
        // 处理在clientChannel上的异步读写操作的结果
        private class ClientHandler implements CompletionHandler<Integer,Object>{
            private AsynchronousSocketChannel clientChannel;
            public ClientHandler(AsynchronousSocketChannel clientChannel) {
                this.clientChannel = clientChannel;
            }
    
            public AsynchronousSocketChannel getClientChannel() {
                return clientChannel;
            }
    
            @Override
            public void completed(Integer result, Object attachment) {
                // 判断完成的是read操作还是write操作,如果是read操作,attachment对象就是包含客户端数据的buffer
                ByteBuffer buffer = (ByteBuffer) attachment;
                // 处理read()的异步调用
                if (buffer != null) {
                    if (result <= 0) {
                        // 没有读到有效数据,客户端异常
                        // 将客户移除在线列表(列表里存放的是每个clientChannel对应的ClientHandler)
                        removeClient(this);
                    }else {
                        buffer.flip(); // 写模式变为读模式
                        // buffer中是编码后的数据,把Byte解码成字符串
                        String fwdMsg = receive(buffer);
                        System.out.println(getClientName(clientChannel) + ": " + fwdMsg);
    
                        // 转发消息
                        forwardMessage(clientChannel, fwdMsg);
                        buffer.clear();
    
                        if (readyToQuit(fwdMsg)) {
                            removeClient(this);
                        }else {
                            clientChannel.read(buffer, buffer, this);
                        }
                    }
                }
            }
    
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("读写操作失败:" + exc.getMessage());
            }
        }
    
        public static void main(String[] args) {
            ChatServer server = new ChatServer();
            server.start();
        }
    }

       3.2客户端

    public class ChatClient {
        private static final String LOCAL_HOST = "localhost";
        private static final int DEFAULT_PORT = 8888;
        private static final String QUIT = "quit";
        private static final int BUFFER = 1024;
    
        private AsynchronousSocketChannel clientChannel;
        private Charset charset = Charset.forName("UTF-8");
    
        public boolean readyToQuit(String msg) {
            return QUIT.equals(msg);
        }
    
        public void close(Closeable closeable) {
            if (closeable != null) {
                try {
                    closeable.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        // 向服务器发送消息
        public void send(String msg) throws ExecutionException, InterruptedException {
            if (msg.isEmpty()) {
                return;
            }
            ByteBuffer buffer = charset.encode(msg);
            Future<Integer> writeResult = clientChannel.write(buffer);
            writeResult.get();
        }
    
        private void start() {
            try {
                // 创建异步通道channel,并发起连接请求
                clientChannel = AsynchronousSocketChannel.open();
                Future<Void> future = clientChannel.connect(new InetSocketAddress(LOCAL_HOST, DEFAULT_PORT));
    
                future.get();  // 阻塞式调用,直到建立连接再进行下一步操作
                System.out.println("已连接到服务器");
    
                // 创建线程处理用户输入
                new Thread(new UserInputHandler(this)).start();
    
                // 主线程中循环读取服务器转发过来的其他客户的消息
                ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
                while (true) {
                    Future<Integer> readResult = clientChannel.read(buffer);
                    int result = readResult.get(); // 阻塞式读取数据
                    if (result <= 0) {
                        // 发生异常,没有读取到数据
                        close(clientChannel);
                        System.out.println("与服务器连接异常");
                        System.exit(1); // 停止程序,否则quit后,会出现报错
                    }else {
                        // 正常打印消息
                        buffer.flip();
                        String msg = String.valueOf(charset.decode(buffer));
                        buffer.clear();
                        System.out.println(msg);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } finally {
                close(clientChannel);
            }
        }
    
        public static void main(String[] args) {
            ChatClient client = new ChatClient();
            client.start();
        }
    }
    public class UserInputHandler implements Runnable {
        private ChatClient client;
    
        public UserInputHandler(ChatClient chatClient) {
            this.client = chatClient;
        }
    
        @Override
        public void run() {
            BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                try {
                    String msg = consoleReader.readLine();
                    client.send(msg);
                    if (client.readyToQuit(msg)) {
                        System.out.println("成功推出聊天室");
                        break;
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    参考

    一站式学习Java网络编程 全面理解BIO_NIO_AIO,学习手记(八)

  • 相关阅读:
    LA 4123 (计数 递推) Glenbow Museum
    UVa 1640 (计数) The Counting Problem
    UVa 11361 (计数 递推) Investigating Div-Sum Property
    LA 3516 (计数 DP) Exploring Pyramids
    CodeForces Round #295 Div.2
    CodeForces Round #294 Div.2
    CodeForces Round #293 Div.2
    UVa 1648 (推公式) Business Center
    UVa 10868 (物理) Bungee Jumping
    UVa 10837 (欧拉函数 搜索) A Research Problem
  • 原文地址:https://www.cnblogs.com/HuangYJ/p/14457179.html
Copyright © 2011-2022 走看看