zoukankan      html  css  js  c++  java
  • netty 详解(二)netty 实现群聊

     

      服务端 Server

    package com.oy.groupchat;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class Server {
        private int port;
        public Server(int port) {
            this.port = port;
        }
    
        public static void main(String[] args) throws Exception {
            new Server(8000).run();
        }
    
        public void run() throws Exception {
            EventLoopGroup boss = new NioEventLoopGroup(1);
            EventLoopGroup work = new NioEventLoopGroup();
    
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap
                        .channel(NioServerSocketChannel.class)
                        .group(boss, work)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new GroupChatServerChannelInitializer());
    
                // 绑定端口,启动服务
                ChannelFuture future = bootstrap.bind(port).sync();
                System.out.println("server started and listen " + port);
                // 监听关闭
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully();
                work.shutdownGracefully();
            }
        }
    }

      服务器端 Channel 初始化器 GroupChatServerChannelInitializer

    package com.oy.groupchat;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class GroupChatServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            /* 向管道加入处理器 */
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast("decoder", new StringDecoder()); // 解码器
            pipeline.addLast("encoder", new StringEncoder()); // 编码器
            // 添加自定义的处理器
            pipeline.addLast("GroupChatServerHandler", new GroupChatServerHandler());
        }
    
    }

      

      服务器端 自定义的 handler---GroupChatServerHandler

    package com.oy.groupchat;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    
        // 定义一个 channel 组,管理所有的 channel
        // GlobalEventExecutor.INSTANCE 是全局的事件执行器,单例
        private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        /**
         * 一旦连接,第一个执行
         * 将当前 channel 加入到 channelGroup
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            // 将该客户加入聊天的信息推送给其他在线的客户端
            channelGroup.writeAndFlush(sdf.format(new Date()) + " [客户端]" + channel.remoteAddress() + " 加入聊天
    ");
            channelGroup.add(channel);
        }
    
        /**
         * 断开连接
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            // 将该客户离开聊天的信息推送给其他在线的客户端
            channelGroup.writeAndFlush(sdf.format(new Date()) + " [客户端]" + channel.remoteAddress() + " 离开
    ");
            System.out.println("当前 channelGroup 大小:" + channelGroup.size());
        }
    
        /**
         * 表示 channel 处于活动状态, 提示上线
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().remoteAddress() + "上线了。" + sdf.format(new Date()));
        }
    
        /**
         * 表示 channel 处于不活动状态, 提示下线
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().remoteAddress() + "下线了。" + sdf.format(new Date()));
        }
    
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            // 获取当前 channel
            final Channel channel = ctx.channel();
    
            // 遍历 channelGroup,根据不同的情况,返回不同的消息
            channelGroup.forEach(ch -> {
                if (channel != ch) { // 不是当前 channel, 转发消息
                    ch.writeAndFlush("[客户端]" + channel.remoteAddress() + "发送了消息:" + msg + "
    ");
                } else {
                    ch.writeAndFlush("[自己]" + channel.remoteAddress() + "发送了消息:" + msg + "
    ");
                }
            });
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

      客户端 Client

    package com.oy.groupchat;
    
    import com.oy.helloworld.NettyClient;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    import java.util.Scanner;
    
    public class Client {
        private static final String HOST = "127.0.0.1";
        private static final int PORT = 8000;
    
        public static void main(String[] args) {
            new Client().run(HOST, PORT);
        }
    
        public void run(String host, int port) {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap client = new Bootstrap()
                        .group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new GroupChatClientChannelInitializer());
                            }
                        });
    
                ChannelFuture future = client.connect(host, port).sync();
                System.out.println("--------------" + future.channel().localAddress() + "--------------");
    
                Channel channel = future.channel();
                // 客户端输入信息
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNext()) {
                    String msg = scanner.nextLine();
                    channel.writeAndFlush(msg + "
    ");
                }
    
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    }

      

      GroupChatClientChannelInitializer

    package com.oy.groupchat;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class GroupChatClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            /* 向管道加入处理器 */
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast("decoder", new StringDecoder()); // 解码器
            pipeline.addLast("encoder", new StringEncoder()); // 编码器
            // 添加自定义的处理器
            pipeline.addLast("GroupChatClientHandler", new GroupChatClientHandler());
        }
    
    }

      

      GroupChatClientHandler

    package com.oy.groupchat;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
            System.out.println(msg);
        }
    }

      启动服务端程序 和 3 个 客户端程序

       

      客户端发送消息

      ---

  • 相关阅读:
    【Spring】注解的循环依赖问题
    【网络】计算机网络自顶向下读书笔记
    【JUC】并发编程的艺术笔记
    【JUC】多线程手撕代码面试题
    【操作系统】王道操作系统全盘汇总
    【Spring】IoC源码分析以及实现
    【Spring】用例子来初次理解IoC
    拼音工具类
    chr(10)与chr(13)的区别
    List 集合转String 逗号拼接
  • 原文地址:https://www.cnblogs.com/xy-ouyang/p/12824998.html
Copyright © 2011-2022 走看看