zoukankan      html  css  js  c++  java
  • 三种TCP协议聊天室实现

    一 概述

    • 使用Java的IO实现聊天室
    • 使用Java的NIO实现聊天室
    • 使用Netty实现聊天室

    二 IO聊天室

    1 服务器

    public class IOServer {
    
        public static void main(String[] args) throws IOException {
            ServerSocket serverSocket = new ServerSocket();
            serverSocket.setReuseAddress(true);
            serverSocket.bind(new InetSocketAddress(8899));
    
            ExecutorService executor = Executors.newCachedThreadPool();
            Set<Socket> socketGroup = new HashSet<>();
            while (true) {
                Socket socket = serverSocket.accept();
                socketGroup.add(socket);
    
                executor.execute(() -> {
                    try (
                            InputStream in = socket.getInputStream();
                            InputStreamReader reader = new InputStreamReader(in, "UTF-8");
                            BufferedReader br = new BufferedReader(reader);
                    ) {
                        String line;
                        while ((line = br.readLine()) != null) {
                            int port = socket.getPort();
                            System.out.println("from client:{" + port + "}" + line);
                            String finalLine = line;
                            for (Socket client : socketGroup) {
                                if (client == socket) continue;
                                try {
                                    OutputStream output = client.getOutputStream();
                                    DataOutputStream out = new DataOutputStream(output);
                                    String s = "client{" + port + "}" + finalLine + "
    ";
                                    out.write(s.getBytes());
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }
    
    

    2 客户端

    public class IOClient {
    
        public static void main(String[] args) throws IOException {
            Socket socket = new Socket();
            InetSocketAddress address = new InetSocketAddress("localhost", 8899);
            socket.connect(address);
            try (
                    OutputStream output = socket.getOutputStream();
                    DataOutputStream out = new DataOutputStream(output);
                    Reader rd = new InputStreamReader(socket.getInputStream());
                    BufferedReader bufferRd = new BufferedReader(rd);
            ) {
                // 子线程监听输入并发送
                new Thread(() -> {
                    InputStreamReader in = new InputStreamReader(System.in);
                    BufferedReader reader = new BufferedReader(in);
                    while (true) {
                        try {
                            out.write((reader.readLine() + '
    ').getBytes());
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
                // 主线程循环监听接受到的数据并输出
                while (true) {
                    System.out.println(bufferRd.readLine());
                }
            }
        }
    }
    

    三 NIO聊天室

    1 服务器

    public class NIOServer {
        public static void main(String[] args) throws IOException {
            ServerSocketChannel srvSocketChannel = ServerSocketChannel.open();
            srvSocketChannel.configureBlocking(false);
            ServerSocket socket = srvSocketChannel.socket();
            socket.setReuseAddress(true);
            socket.bind(new InetSocketAddress(8899));
    
            Selector selector = Selector.open();
            srvSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            Set<SocketChannel> channelGroup = new HashSet<>();
    
            while (selector.select() > 0) {
                Set<SelectionKey> keys = selector.selectedKeys();
    
                for (SelectionKey key : keys) {
                    SocketChannel client;
                    if (key.isAcceptable()) {
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        client = channel.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_READ);
                        channelGroup.add(client);
                        System.out.println(client.getRemoteAddress());
                    } else if (key.isReadable()) {
                        client = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        client.read(buffer);
                        buffer.flip();
                        System.out.print(new String(buffer.array()));
                        channelGroup.forEach(channel -> {
                            buffer.rewind();
                            if (channel != client) {
                                try {
                                    int port = client.socket().getPort();
                                    byte[] array = buffer.array();
                                    String s = "client{" + port + "}:" + new String(array);
                                    channel.write(ByteBuffer.wrap(s.getBytes()));
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    }
                    keys.remove(key);
                }
            }
        }
    }
    

    2 客户端

    public class NIOClient {
    
        public static void main(String[] args) throws IOException {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            InetSocketAddress address = new InetSocketAddress("localhost", 8899);
            socketChannel.connect(address);
    
            Selector selector = Selector.open();
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
    
            while (selector.select() > 0) {
                Set<SelectionKey> keys = selector.selectedKeys();
                for (SelectionKey key : keys) {
                    SocketChannel client;
                    if (key.isConnectable()) {
                        client = (SocketChannel) key.channel();
                        if (client.isConnectionPending()) {
                            client.finishConnect();
                            client.register(selector, SelectionKey.OP_READ);
                            new Thread(() -> {
                                InputStreamReader in = new InputStreamReader(System.in);
                                BufferedReader reader = new BufferedReader(in);
                                while (true) {
                                    try {
                                        String line = reader.readLine() + '
    ';
                                        client.write(ByteBuffer.wrap(line.getBytes()));
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                }
                            }).start();
                        }
                    } else if (key.isReadable()) {
                        client = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        client.read(byteBuffer);
                        byteBuffer.flip();
                        while (byteBuffer.hasRemaining()) {
                            System.out.print((char)byteBuffer.get());
                        }
                    }
                    keys.remove(key);
                }
            }
        }
    }
    

    四 Netty聊天室

    1 服务器

    • TCPServer.java
    public class TCPServer {
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boss,worker).channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ServerChannelInitializer());
                ChannelFuture channelFuture = bootstrap.bind(8899).sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    
    • ServerChannelInitializer.java
    public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new DelimiterBasedFrameDecoder(4096,Delimiters.lineDelimiter()));
            pipeline.addLast(new StringDecoder(UTF_8));
            pipeline.addLast(new StringEncoder(UTF_8));
            pipeline.addLast(new ServerHandler());
        }
    
    }
    
    • ServerHandler.java
    public class ServerHandler extends SimpleChannelInboundHandler<String> {
    
        private static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            group.forEach(ch -> {
                ch.writeAndFlush(channel.remoteAddress() + " 上线" + "
    ");
            });
            group.add(channel);
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            group.forEach(ch -> {
                ch.writeAndFlush(ctx.channel().remoteAddress() + " 下线" + "
    ");
            });
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            Channel channel = ctx.channel();
            group.forEach(ch -> {
                if (ch != channel) {
                    ch.writeAndFlush(channel.remoteAddress() + ":" + msg + "
    ");
                } else {
                    ch.writeAndFlush("自己:" + msg + "
    ");
                }
            });
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
    

    2 客户端

    • TCPClient.java
    public class TCPClient {
        public static void main(String[] args) throws InterruptedException, IOException {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new ClientChannelInitializer());
                Channel channel = bootstrap
                                        .connect("localhost", 8899)
                                        .sync()
                                        .channel();
                InputStreamReader in = new InputStreamReader(System.in);
                BufferedReader reader = new BufferedReader(in);
                while (true) {
                    channel.writeAndFlush(reader.readLine() + "
    ");
                }
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    • ClientChannelInitializer.java
    public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
    
            pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
            pipeline.addLast(new StringDecoder(UTF_8));
            pipeline.addLast(new StringEncoder(UTF_8));
            pipeline.addLast(new SimpleChannelInboundHandler<String>() {
                @Override
                protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                    System.out.println(msg);
                }
            });
        }
    }
    

    五 总结

    • Netty实现简单,逻辑清晰,但是隐藏了很多复杂的细节,后续的学习再慢慢剖析吧。
    • IO的线程模型,实现比较通俗易懂。
    • NIO的实现相对比较难懂,需要大家对Selector、Channel和Buffer有比较深刻的理解,不然很容易出错。

    注:NIO是Netty的基础,学好NIO对于Netty的学习有重要作用。

  • 相关阅读:
    5.19 省选模拟赛 T1 小B的棋盘 双指针 性质
    5.15 省选模拟赛 容斥 生成函数 dp
    5.15 省选模拟赛 T1 点分治 FFT
    5.15 牛客挑战赛40 B 小V的序列 关于随机均摊分析 二进制
    luogu P4929 【模板】舞蹈链 DLX
    CF 878E Numbers on the blackboard 并查集 离线 贪心
    5.10 省选模拟赛 拍卖 博弈 dp
    5.12 省选模拟赛 T2 贪心 dp 搜索 差分
    5.10 省选模拟赛 tree 树形dp 逆元
    luogu P6088 [JSOI2015]字符串树 可持久化trie 线段树合并 树链剖分 trie树
  • 原文地址:https://www.cnblogs.com/linzhanfly/p/10171105.html
Copyright © 2011-2022 走看看