zoukankan      html  css  js  c++  java
  • Netty实现简单群聊

    场景

    使用Netty实现简单群聊。服务端实现监控客户端上下线及通知、群聊消息转发。

    实现

    客户端与服务端使用String类型的消息进行发送与接收,因此客户端与服务端需要首先添加Netty封装的用于网络传输的编码解密处理器,否则将无法成功打印消息。

    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            //字符串解码器,用于将通过ByteBuf传输的数据转换成String
            pipeline.addLast("decoder", new StringDecoder());
            //字符串编码器,用于将String编码到ByteBuf中进行网络传输
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast(new ServerHandler());
        }
    });
    

    服务端

    package others.netty.groupChat;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    /**
     * Netty群聊服务端实现
     * 根据不同事件驱动通知客户端 上下线状态,消息转发
     *
     * @author makeDoBetter
     * @version 1.0
     * @date 2021/4/25 10:36
     * @since JDK 1.8
     */
    public class Sever {
        private int port;
    
        public Sever(int port) {
            this.port = port;
        }
    
        public void run(){
            NioEventLoopGroup boss = new NioEventLoopGroup(1);
            NioEventLoopGroup worker = new NioEventLoopGroup();
            ServerBootstrap sever = new ServerBootstrap();
            try {
                sever.group(boss, worker)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE ,true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                //字符串解码器,用于将通过ByteBuf传输的数据转换成String
                                pipeline.addLast("decoder", new StringDecoder());
                                //字符串编码器,用于将String编码到ByteBuf中进行网络传输
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new ServerHandler());
                            }
                        });
                ChannelFuture channelFuture = sever.bind(port).sync();
                //添加一个监听器
                channelFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()){
                            System.out.println("服务端启动完成");
                        } else {
                            System.out.println("服务端启动失败");
                        }
                    }
                });
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) {
            new Sever(1234).run();
        }
    }
    
    

    自定义服务处理器,用于处理客户端连接与断开、消息转发等。

    package others.netty.groupChat;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    /**
     * 实现客户端上下线状态、消息通知
     *
     * @author makeDoBetter
     * @version 1.0
     * @date 2021/4/25 10:47
     * @since JDK 1.8
     */
    public class ServerHandler extends SimpleChannelInboundHandler<String> {
        /**
         * 定义一个全局的单线程的静态变量,用于存储整个连接的客户端集合
         */
        private static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            //不需要遍历,group可以直接处理
            group.writeAndFlush("[客户端]" + channel.remoteAddress() + "上线");
            group.add(channel);
            System.out.println(group.toString());
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            //不需要手动remove()当前channel,group会自动剔除离线channel
            group.writeAndFlush("[客户端]" + channel.remoteAddress() + "下线");
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            Channel self = ctx.channel();
            //实现消息的转发,忽略自身
            group.forEach(channel -> {
                if (self != channel) {
                    System.out.println(msg + "发送到客户端" + channel.remoteAddress());
                    channel.writeAndFlush("[客户端]" + self.remoteAddress() + "说:" + msg + "
    ");
                }
            });
        }
    }
    
    

    客户端

    package others.netty.groupChat;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.util.Scanner;
    
    /**
     * 客户端实现服务端连接,消息发送、接收打印
     *
     * @author makeDoBetter
     * @version 1.0
     * @date 2021/4/25 11:23
     * @since JDK 1.8
     */
    public class Client {
        private String host;
        private int port;
    
        public Client(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public void run(){
            NioEventLoopGroup group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            try {
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                //字符串解码器,用于将通过ByteBuf传输的数据转换成String
                                pipeline.addLast("decoder", new StringDecoder());
                                //字符串编码器,用于将String编码到ByteBuf中进行网络传输
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new ClientHandler());
                            }
                        });
                ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
                channelFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()){
                            System.out.println("====连接成功=====");
                        } else {
                            System.out.println("客户端连接失败");
                        }
                    }
                });
                Channel channel = channelFuture.channel();
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()){
                    String line = scanner.nextLine();
                    channel.writeAndFlush(line);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            new Client("127.0.0.1", 1234).run();
        }
    }
    
    

    客户端处理程序,打印转发到当前客户端的消息。

    package others.netty.groupChat;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    /**
     * 客户端读取数据事件
     *
     * @author makeDoBetter
     * @version 1.0
     * @date 2021/4/25 11:37
     * @since JDK 1.8
     */
    public class ClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(msg.trim());
        }
    }
    
    

  • 相关阅读:
    51nod数字1的数量
    bzoj3669: [Noi2014]魔法森林 lct版
    【NOI2014】起床困难综合症 位运算+贪心
    bzoj2631: tree lct
    bzoj2002 弹飞绵羊 lct版
    codevs1245最小的N个和 小根堆
    RTSP/GB28181/SDK协议视频融合平台EasyCVR接口获取协议平台接入参数的调用方法
    RTSP/GB28181/SDK协议视频融合平台EasyCVR上传通道数据库不显示怎么解决?
    基于视频协议融合平台EasyCVR开发的视频综合管理监控平台EasyCVS通道流检查功能的实现
    RTSP/GB28181/HIKSDK/大华SDK协议安防视频云平台EasyCVR新增告警功能介绍
  • 原文地址:https://www.cnblogs.com/fjrgg/p/14721325.html
Copyright © 2011-2022 走看看