zoukankan      html  css  js  c++  java
  • 三种TCP协议聊天室实现

    一 概述

    • 使用Java的IO实现聊天室
    • 使用Java的NIO实现聊天室
    • 使用Netty实现聊天室

    二 IO聊天室

    1 服务器

    public class IOServer {
    
        public static void main(String[] args) throws IOException {
            ServerSocket serverSocket = new ServerSocket();
            serverSocket.setReuseAddress(true);
            serverSocket.bind(new InetSocketAddress(8899));
    
            ExecutorService executor = Executors.newCachedThreadPool();
            Set<Socket> socketGroup = new HashSet<>();
            while (true) {
                Socket socket = serverSocket.accept();
                socketGroup.add(socket);
    
                executor.execute(() -> {
                    try (
                            InputStream in = socket.getInputStream();
                            InputStreamReader reader = new InputStreamReader(in, "UTF-8");
                            BufferedReader br = new BufferedReader(reader);
                    ) {
                        String line;
                        while ((line = br.readLine()) != null) {
                            int port = socket.getPort();
                            System.out.println("from client:{" + port + "}" + line);
                            String finalLine = line;
                            for (Socket client : socketGroup) {
                                if (client == socket) continue;
                                try {
                                    OutputStream output = client.getOutputStream();
                                    DataOutputStream out = new DataOutputStream(output);
                                    String s = "client{" + port + "}" + finalLine + "
    ";
                                    out.write(s.getBytes());
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }
    
    

    2 客户端

    public class IOClient {
    
        public static void main(String[] args) throws IOException {
            Socket socket = new Socket();
            InetSocketAddress address = new InetSocketAddress("localhost", 8899);
            socket.connect(address);
            try (
                    OutputStream output = socket.getOutputStream();
                    DataOutputStream out = new DataOutputStream(output);
                    Reader rd = new InputStreamReader(socket.getInputStream());
                    BufferedReader bufferRd = new BufferedReader(rd);
            ) {
                // 子线程监听输入并发送
                new Thread(() -> {
                    InputStreamReader in = new InputStreamReader(System.in);
                    BufferedReader reader = new BufferedReader(in);
                    while (true) {
                        try {
                            out.write((reader.readLine() + '
    ').getBytes());
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
                // 主线程循环监听接受到的数据并输出
                while (true) {
                    System.out.println(bufferRd.readLine());
                }
            }
        }
    }
    

    三 NIO聊天室

    1 服务器

    public class NIOServer {
        public static void main(String[] args) throws IOException {
            ServerSocketChannel srvSocketChannel = ServerSocketChannel.open();
            srvSocketChannel.configureBlocking(false);
            ServerSocket socket = srvSocketChannel.socket();
            socket.setReuseAddress(true);
            socket.bind(new InetSocketAddress(8899));
    
            Selector selector = Selector.open();
            srvSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            Set<SocketChannel> channelGroup = new HashSet<>();
    
            while (selector.select() > 0) {
                Set<SelectionKey> keys = selector.selectedKeys();
    
                for (SelectionKey key : keys) {
                    SocketChannel client;
                    if (key.isAcceptable()) {
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        client = channel.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_READ);
                        channelGroup.add(client);
                        System.out.println(client.getRemoteAddress());
                    } else if (key.isReadable()) {
                        client = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        client.read(buffer);
                        buffer.flip();
                        System.out.print(new String(buffer.array()));
                        channelGroup.forEach(channel -> {
                            buffer.rewind();
                            if (channel != client) {
                                try {
                                    int port = client.socket().getPort();
                                    byte[] array = buffer.array();
                                    String s = "client{" + port + "}:" + new String(array);
                                    channel.write(ByteBuffer.wrap(s.getBytes()));
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    }
                    keys.remove(key);
                }
            }
        }
    }
    

    2 客户端

    public class NIOClient {
    
        public static void main(String[] args) throws IOException {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            InetSocketAddress address = new InetSocketAddress("localhost", 8899);
            socketChannel.connect(address);
    
            Selector selector = Selector.open();
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
    
            while (selector.select() > 0) {
                Set<SelectionKey> keys = selector.selectedKeys();
                for (SelectionKey key : keys) {
                    SocketChannel client;
                    if (key.isConnectable()) {
                        client = (SocketChannel) key.channel();
                        if (client.isConnectionPending()) {
                            client.finishConnect();
                            client.register(selector, SelectionKey.OP_READ);
                            new Thread(() -> {
                                InputStreamReader in = new InputStreamReader(System.in);
                                BufferedReader reader = new BufferedReader(in);
                                while (true) {
                                    try {
                                        String line = reader.readLine() + '
    ';
                                        client.write(ByteBuffer.wrap(line.getBytes()));
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                }
                            }).start();
                        }
                    } else if (key.isReadable()) {
                        client = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        client.read(byteBuffer);
                        byteBuffer.flip();
                        while (byteBuffer.hasRemaining()) {
                            System.out.print((char)byteBuffer.get());
                        }
                    }
                    keys.remove(key);
                }
            }
        }
    }
    

    四 Netty聊天室

    1 服务器

    • TCPServer.java
    public class TCPServer {
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boss,worker).channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ServerChannelInitializer());
                ChannelFuture channelFuture = bootstrap.bind(8899).sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    
    • ServerChannelInitializer.java
    public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new DelimiterBasedFrameDecoder(4096,Delimiters.lineDelimiter()));
            pipeline.addLast(new StringDecoder(UTF_8));
            pipeline.addLast(new StringEncoder(UTF_8));
            pipeline.addLast(new ServerHandler());
        }
    
    }
    
    • ServerHandler.java
    public class ServerHandler extends SimpleChannelInboundHandler<String> {
    
        private static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            group.forEach(ch -> {
                ch.writeAndFlush(channel.remoteAddress() + " 上线" + "
    ");
            });
            group.add(channel);
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            group.forEach(ch -> {
                ch.writeAndFlush(ctx.channel().remoteAddress() + " 下线" + "
    ");
            });
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            Channel channel = ctx.channel();
            group.forEach(ch -> {
                if (ch != channel) {
                    ch.writeAndFlush(channel.remoteAddress() + ":" + msg + "
    ");
                } else {
                    ch.writeAndFlush("自己:" + msg + "
    ");
                }
            });
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
    

    2 客户端

    • TCPClient.java
    public class TCPClient {
        public static void main(String[] args) throws InterruptedException, IOException {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new ClientChannelInitializer());
                Channel channel = bootstrap
                                        .connect("localhost", 8899)
                                        .sync()
                                        .channel();
                InputStreamReader in = new InputStreamReader(System.in);
                BufferedReader reader = new BufferedReader(in);
                while (true) {
                    channel.writeAndFlush(reader.readLine() + "
    ");
                }
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    • ClientChannelInitializer.java
    public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
    
            pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
            pipeline.addLast(new StringDecoder(UTF_8));
            pipeline.addLast(new StringEncoder(UTF_8));
            pipeline.addLast(new SimpleChannelInboundHandler<String>() {
                @Override
                protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                    System.out.println(msg);
                }
            });
        }
    }
    

    五 总结

    • Netty实现简单,逻辑清晰,但是隐藏了很多复杂的细节,后续的学习再慢慢剖析吧。
    • IO的线程模型,实现比较通俗易懂。
    • NIO的实现相对比较难懂,需要大家对Selector、Channel和Buffer有比较深刻的理解,不然很容易出错。

    注:NIO是Netty的基础,学好NIO对于Netty的学习有重要作用。

  • 相关阅读:
    我的友情链接
    我的友情链接
    我的友情链接
    我的友情链接
    我的友情链接
    我的友情链接
    使用kubespray在国内安装Kubernetes(1)
    docker国内镜像拉取和镜像加速registry-mirrors配置修改
    docker pull很慢解决办法
    Docker 国内仓库和镜像
  • 原文地址:https://www.cnblogs.com/linzhanfly/p/10171105.html
Copyright © 2011-2022 走看看