zoukankan      html  css  js  c++  java
  • 03_netty实现聊天室功能

    【概述】

    聊天室主要由两块组成:聊天服务器端(ChatRoomServer)和聊天客户端(ChatClient)。

    [ 聊天服务器(ChatRoomServer)功能概述 ]  

    1.监听所有客户端的接入、断线

    2.有客户端A接入聊天室时,将接入消息发给除了客户端A的其他客户端

    3.当客户端A退出聊天室时,将退出消息发给除了客户端A的其他客户端

    4.当客户端A发送消息到聊天室时,将消息转发给除了客户端A的其他客户端

    [ 聊天客户端(ChatClient)功能概述 ]

    1.发送消息至聊天服务器

    2.接收聊天服务器发送过来的所有消息

    【聊天服务端 ChatRoomServer】

    /**
     * 聊天室服务端
     */
    public class ChatRoomServer {
    
        private final int port ;
    
    
        public ChatRoomServer(int port) {
            this.port = port;
        }
    
        public void start(){
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            try{
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boss,worker)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChatServerInitialize())
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .option(ChannelOption.SO_KEEPALIVE, true);
                ChannelFuture future = bootstrap.bind(port).sync();
                future.channel().closeFuture().sync();
    
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            new ChatRoomServer(9999).start();  //服务端监听本地的9999端口
        }
    }

    【ChatServerInitialize】

    public class ChatServerInitialize extends ChannelInitializer<SocketChannel>{
    
        @Override
        protected void initChannel(SocketChannel channel) throws Exception {
            System.out.println("用户【"+channel.remoteAddress()+"】接入聊天室......");
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast("framer",new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
            pipeline.addLast("decoder",new StringDecoder());
            pipeline.addLast("encoder",new StringEncoder());
            pipeline.addLast("handler",new ChatServerHandler());
        }
    }

    【ChatServerHandler】

    /**
     * 聊天服务器对各种情况的处理
     */
    public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
    
        public static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        /**
         * 当从服务端收到新的客户端连接时
         * 客户端的 Channel 存入 channels 列表中,并通知列表中的其他客户端 Channel
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel clientChannel = ctx.channel();
            channels.add(clientChannel);
            for (Channel ch : channels) {
                if (ch != clientChannel) {  //通知除了自己以外的其他用户
                    ch.writeAndFlush("【提示】:用户【" + clientChannel.remoteAddress() + "】进入聊天室...
    ");
                }
            }
        }
    
    
        /**
         * 每当从服务端收到客户端断开时
         * 客户端的 Channel 自动从 channels 列表中移除了,并通知列表中的其他客户端 Channel
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel clientChannel = ctx.channel();
            channels.remove(clientChannel);
            for (Channel ch : channels) {
                if (ch != clientChannel) {  //通知除了自己以外的其他用户
                    ch.writeAndFlush("【提示】:用户【" + clientChannel.remoteAddress() + "】退出聊天室...
    ");
                }
            }
        }
    
        /**
         * 接受到客户端发出的消息
         * 判断channel是否是
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            Channel clientChannel = ctx.channel();
            for (Channel ch : channels) {
                if (ch != clientChannel) {
                    ch.writeAndFlush("用户【" + clientChannel.remoteAddress() + "】说:" + msg + "
    ");
                } else {
                    ch.writeAndFlush("【我】说:" + msg + "
    ");
                }
            }
        }
    
        /**
         * 服务端监听到客户端活动
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Channel clientChannel = ctx.channel();
            System.out.println("用户【"+clientChannel.remoteAddress()+"】在线中...");
        }
    
        /**
         * 服务端监听到客户端 不活动
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Channel clientChannel = ctx.channel();
            System.out.println("用户【 " +clientChannel.remoteAddress()+"】:离线了");
    
        }
    }

    【ChatClient 聊天客户端】

    /**
     * 聊天客户端
     */
    public class ChatClient {
    
        private final String host;
    
        private final int port;
    
        public ChatClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public void start() {
            EventLoopGroup worker = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
    
            try{
                bootstrap.group(worker)
                        .channel(NioSocketChannel.class)
                        .handler(new ClientInitializer());
                Channel channel  = bootstrap.connect(host,port).sync().channel();
                //客户端从键盘输入数据
                BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
                while(true){
                    channel.writeAndFlush(input.readLine()+"
    ");
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                worker.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            new ChatClient("127.0.0.1",9999).start(); //连接服务器端
        }
    
    }

    【ChatClientInitializer 】

    public class ChatClientInitializer extends ChannelInitializer<SocketChannel>{
    
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            //当有客户端连接服务器时,netty会调用这个初始化器的 initChannel方法
            System.out.println("客户端开始初始化......");
    
            ChannelPipeline pipeline = socketChannel.pipeline();
    
            pipeline.addLast("framer",new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
            pipeline.addLast("decoder",new StringDecoder());
            pipeline.addLast("encoder",new StringEncoder());
            pipeline.addLast("handler",new ChatClientHandler());
        }
    }

    【ChatClientHandler】

    public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
    
        /**
         * 打印服务端发送过来的数据
         */
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            System.out.println(s);
        }
    }

    【运行结果】

     [1.启动聊天服务器]

    [2.启动一个客户端A]

     [3.再启动一个客户端B]

    [4.客户端A发送消息]

    [5.客户端A关闭]

     

  • 相关阅读:
    【专利自助申请指引 ● 第1章. 申请流程介绍 ● 1.2.12 答复审查意见】
    【专利自助申请指引 ● 第1章. 申请流程介绍 ● 1.2.11 主动提出修改或补正(可选)】
    【专利自助申请指引 ● 第1章. 申请流程介绍 ● 1.2.10 缴纳审查过程中的费用】
    【专利自助申请指引 ● 第1章. 申请流程介绍 ● 1.2.9 申请专利优先审查(可选)】
    【专利自助申请指引 ● 第1章. 申请流程介绍 ● 1.2.8 接收各种电子回执和通知书】
    【专利自助申请指引 ● 第1章. 申请流程介绍 ● 1.2.7 费用减缴请求(可选)】
    业务代码“五宗罪”:为什么业务代码看起来总是不够清晰直观
    【整理】互联网服务端技术体系:服务解耦之消息系统
    框架源码阅读的方法与技巧
    【整理】互联网服务端技术体系:熔断机制的设计及Hystrix实现解析
  • 原文地址:https://www.cnblogs.com/HigginCui/p/10327568.html
Copyright © 2011-2022 走看看