zoukankan      html  css  js  c++  java
  • 007-核心技术-netty-基于netty的群聊系统

    一、基于netty群聊

    服务器端

    package com.github.bjlhx15.netty.demo.netty.groupchat;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    import java.text.SimpleDateFormat;
    
    public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    
        //定义一个channel组。管理所有的channel
        //GlobalEventExecutor.INSTANCE是全局的事件执行器,是一个单例
        private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        //handlerAdded 表示连接建立,一旦连接,第一个被执行
    //    当前的channel加入到 channelgroup
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
    //        将该客户加入聊天的信息推送到其他所有客户端
            // 该方法会将 channelGroup中所有的channel遍历,并发送消息,不需要自己遍历
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天
    ");
            channelGroup.add(channel);
        }
    
        //断开连接
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了
    ");
    //        channelGroup.remove()//不需要执行这里,这个方法会触发channelGroup 删除这个channel
        }
    
        //    表示channel处于活动状态
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().remoteAddress() + " 上线了");
        }
    
        //    表示channel处于不活动状态
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().remoteAddress() + " 离线了");
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
    //        获取到当前的channel
            Channel channel = channelHandlerContext.channel();
            channelGroup.forEach(ch -> {
                if (channel != ch) {
                    ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送消息," + s + "
    ");
                } else {
                    ch.writeAndFlush("[自己]发送了消息" + s + "
    ");
                }
            });
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    服务器端Handler

    package com.github.bjlhx15.netty.demo.netty.groupchat;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    import java.text.SimpleDateFormat;
    
    public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    
        //定义一个channel组。管理所有的channel
        //GlobalEventExecutor.INSTANCE是全局的事件执行器,是一个单例
        private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        //handlerAdded 表示连接建立,一旦连接,第一个被执行
    //    当前的channel加入到 channelgroup
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
    //        将该客户加入聊天的信息推送到其他所有客户端
            // 该方法会将 channelGroup中所有的channel遍历,并发送消息,不需要自己遍历
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天
    ");
            channelGroup.add(channel);
        }
    
        //断开连接
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了
    ");
    //        channelGroup.remove()//不需要执行这里,这个方法会触发channelGroup 删除这个channel
        }
    
        //    表示channel处于活动状态
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().remoteAddress() + " 上线了");
        }
    
        //    表示channel处于不活动状态
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().remoteAddress() + " 离线了");
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
    //        获取到当前的channel
            Channel channel = channelHandlerContext.channel();
            channelGroup.forEach(ch -> {
                if (channel != ch) {
                    ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送消息," + s + "
    ");
                } else {
                    ch.writeAndFlush("[自己]发送了消息" + s + "
    ");
                }
            });
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    客户端

    package com.github.bjlhx15.netty.demo.netty.groupchat;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.util.Scanner;
    
    public class GroupChatClient {
        private final String host;
        private final int port;
    
        public GroupChatClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public void run() throws InterruptedException {
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap()
                        .group(eventExecutors)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new GroupChatClientHandler());//加入自己业务的handler
                            }
                        });
                ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
                Channel channel = channelFuture.channel();
                System.out.println("------------" + channel.localAddress() + "-------");
    
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String s = scanner.nextLine();
                    channel.writeAndFlush(s + "
    ");
                }
            } finally {
                eventExecutors.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            new GroupChatClient("127.0.0.1", 7000).run();
        }
    }

    客户端Handler

    package com.github.bjlhx15.netty.demo.netty.groupchat;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            System.out.println(s.trim());
        }
    }

    启动测试即可

    如果有私聊,需要将ChannelGroup ,替换为Map<String,Channel>,key是用户ID,然后就能找到对应的Channel,在ChannelRead0时查找发送即可。

    转载请注明出处,感谢。
    作者:李宏旭
    阅罢此文,如果您觉得本文不错并有所收获,请【打赏】或【推荐】,也可【评论】留下您的问题或建议与我交流。
    你的支持是我不断创作和分享的不竭动力!
  • 相关阅读:
    Maven+JSP+Servlet+JDBC+Redis+Mysql实现的黑马旅游网
    Thymeleaf+SpringBoot+SpringDataJPA实现的中小医院信息管理系统
    Thymeleaf+SpringBoot+Mybatis实现的家庭财务管理系统
    JSP+Struts2+JDBC+Mysql实现的校园宿舍管理系统
    JSP+SSH+Mysql+C3P0实现的传智播客网上商城
    JSP+SSH+Mysql实现的学生管理系统
    JSP+Servlet+C3P0+Mysql实现的简单新闻系统
    笔记:EF出现列名 'Discriminator' 无效、类没有加入数据库上下文也被数据迁移生成表
    判断字符串中是否存在的几种方案:string.indexof、string.contains、list.contains、list.any几种方式效率对比
    Sql server 中关闭ID自增字段(SQL取消ID自动增长)
  • 原文地址:https://www.cnblogs.com/bjlhx/p/15111943.html
Copyright © 2011-2022 走看看