zoukankan      html  css  js  c++  java
  • Netty 应用实例-群聊系统,心跳检测机制案例 ,WebSocket 编程实现服务器和客户端长连接

    实例要求:
    1) 编写一个 Netty 群聊系统, 实现服务器端和客户端之间的数据简单通讯(非阻塞)
    2) 实现多人群聊
    3) 服务器端: 可以监测用户上线, 离线, 并实现消息转发功能
    4) 客户端: 通过 channel 可以无阻塞发送消息给其它所有用户, 同时可以接受其它用户发送的消息(有服务器转发得到)
    5) 目的: 进一步理解 Netty

     代码:

    GroupChatServer
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class GroupChatServer {
    
        private int port; //监听端口
    
    
        public GroupChatServer(int port) {
            this.port = port;
        }
    
        //编写run方法,处理客户端的请求
        public void run() throws  Exception{
    
            //创建两个线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
    
            try {
                ServerBootstrap b = new ServerBootstrap();
    
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
    
                                //获取到pipeline
                                ChannelPipeline pipeline = ch.pipeline();
                                //向pipeline加入解码器
                                pipeline.addLast("decoder", new StringDecoder());
                                //向pipeline加入编码器
                                pipeline.addLast("encoder", new StringEncoder());
                                //加入自己的业务处理handler
                                pipeline.addLast(new GroupChatServerHandler());
    
                            }
                        });
    
                System.out.println("netty 服务器启动");
                ChannelFuture channelFuture = b.bind(port).sync();
    
                //监听关闭
                channelFuture.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            new GroupChatServer(7000).run();
        }
    }
    View Code
    GroupChatServerHandler
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    
        //public static List<Channel> channels = new ArrayList<Channel>();
    
        //使用一个hashmap 管理
        //public static Map<String, Channel> channels = new HashMap<String,Channel>();
    
        //定义一个channle 组,管理所有的channel
        //GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
        private static ChannelGroup  channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    
        //handlerAdded 表示连接建立,一旦连接,第一个被执行
        //将当前channel 加入到  channelGroup
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            //将该客户加入聊天的信息推送给其它在线的客户端
            /*
            该方法会将 channelGroup 中所有的channel 遍历,并发送 消息,
            我们不需要自己遍历
             */
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " 
    ");
            channelGroup.add(channel);
    
    
    
    
        }
    
        //断开连接, 将xx客户离开信息推送给当前在线的客户
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了
    ");
            System.out.println("channelGroup size" + channelGroup.size());
    
        }
    
        //表示channel 处于活动状态, 提示 xx上线
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
            System.out.println(ctx.channel().remoteAddress() + " 上线了~");
        }
    
        //表示channel 处于不活动状态, 提示 xx离线了
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    
            System.out.println(ctx.channel().remoteAddress() + " 离线了~");
        }
    
        //读取数据
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    
            //获取到当前channel
            Channel channel = ctx.channel();
            //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
    
            channelGroup.forEach(ch -> {
                if(channel != ch) { //不是当前的channel,转发消息
                    ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "
    ");
                }else {//回显自己发送的消息给自己
                    ch.writeAndFlush("[自己]发送了消息" + msg + "
    ");
                }
            });
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //关闭通道
            ctx.close();
        }
    }
    View Code
    GroupChatClient
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.util.Scanner;
    
    
    public class GroupChatClient {
    
        //属性
        private final String host;
        private final int port;
    
        public GroupChatClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public void run() throws Exception{
            EventLoopGroup group = new NioEventLoopGroup();
    
            try {
    
    
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
    
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
    
                            //得到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入相关handler
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自定义的handler
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });
    
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //得到channel
                Channel channel = channelFuture.channel();
                System.out.println("-------" + channel.localAddress()+ "--------");
                //客户端需要输入信息,创建一个扫描器
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String msg = scanner.nextLine();
                    //通过channel 发送到服务器端
                    channel.writeAndFlush(msg + "
    ");
                }
            }finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new GroupChatClient("127.0.0.1", 7000).run();
        }
    }
    View Code
    GroupChatClientHandler
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(msg.trim());
        }
    }
    View Code
    User
    public class User {
        private int id;
        private String pwd;
    }
    View Code

    Netty 心跳检测机制案例 
    实例要求:
    1) 编写一个 Netty 心跳检测机制案例, 当服务器超过 3 秒没有读时, 就提示读空闲
    2) 当服务器超过 5 秒没有写操作时, 就提示写空闲
    3) 实现当服务器超过 7 秒没有读或者写操作时, 就提示读写空闲
    4)
    代码如下: MyServer
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.util.concurrent.TimeUnit;
    
    public class MyServer {
        public static void main(String[] args) throws Exception{
    
    
            //创建两个线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(bossGroup, workerGroup);
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //加入一个netty 提供 IdleStateHandler
                        /*
                        说明
                        1. IdleStateHandler 是netty 提供的处理空闲状态的处理器
                        2. long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
                        3. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
                        4. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
    
                        5. 文档说明
                        triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
     * read, write, or both operation for a while.
     *                  6. 当 IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理
     *                  通过调用(触发)下一个handler 的 userEventTiggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
                         */
                        pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS));
                        //加入一个对空闲检测进一步处理的handler(自定义)
                        pipeline.addLast(new MyServerHandler());
                    }
                });
    
                //启动服务器
                ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
                channelFuture.channel().closeFuture().sync();
    
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    View Code
    MyServerHandler
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleStateEvent;
    
    public class MyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         *
         * @param ctx 上下文
         * @param evt 事件
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    
            if(evt instanceof IdleStateEvent) {
    
                //将  evt 向下转型 IdleStateEvent
                IdleStateEvent event = (IdleStateEvent) evt;
                String eventType = null;
                switch (event.state()) {
                    case READER_IDLE:
                      eventType = "读空闲";
                      break;
                    case WRITER_IDLE:
                        eventType = "写空闲";
                        break;
                    case ALL_IDLE:
                        eventType = "读写空闲";
                        break;
                }
                System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType);
                System.out.println("服务器做相应处理..");
    
                //如果发生空闲,我们关闭通道
               // ctx.channel().close();
            }
        }
    }
    View Code

    Netty 通过 WebSocket 编程实现服务器和客户端长连接 

    实例要求:
    1) Http 协议是无状态的, 浏览器和服务器间的请求响应一次, 下一次会重新创建连接.
    2) 要求: 实现基于 webSocket 的长连接的全双工的交互
    3) 改变 Http 协议多次请求的约束, 实现长连接了, 服务器可以发送消息给浏览器
    4) 客户端浏览器和服务器端会相互感知, 比如服务器关闭了, 浏览器会感知, 同样浏览器关闭了, 服务器会感知
    5) 运行界面
    MyServer:
    import com.atguigu.netty.heartbeat.MyServerHandler;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.util.concurrent.TimeUnit;
    
    public class MyServer {
        public static void main(String[] args) throws Exception{
    
    
            //创建两个线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(bossGroup, workerGroup);
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
    
                        //因为基于http协议,使用http的编码和解码器
                        pipeline.addLast(new HttpServerCodec());
                        //是以块方式写,添加ChunkedWriteHandler处理器
                        pipeline.addLast(new ChunkedWriteHandler());
    
                        /*
                        说明
                        1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
                        2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
                         */
                        pipeline.addLast(new HttpObjectAggregator(8192));
                        /*
                        说明
                        1. 对应websocket ,它的数据是以 帧(frame) 形式传递
                        2. 可以看到WebSocketFrame 下面有六个子类
                        3. 浏览器请求时 ws://localhost:7000/hello 表示请求的uri
                        4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
                        5. 是通过一个 状态码 101
                         */
                        pipeline.addLast(new WebSocketServerProtocolHandler("/hello2"));
    
                        //自定义的handler ,处理业务逻辑
                        pipeline.addLast(new MyTextWebSocketFrameHandler());
                    }
                });
    
                //启动服务器
                ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
                channelFuture.channel().closeFuture().sync();
    
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    View Code
    MyTextWebSocketFrameHandler
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    
    import java.time.LocalDateTime;
    
    //这里 TextWebSocketFrame 类型,表示一个文本帧(frame)
    public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    
            System.out.println("服务器收到消息 " + msg.text());
    
            //回复消息
            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " " + msg.text()));
        }
    
        //当web客户端连接后, 触发方法
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            //id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一
            System.out.println("handlerAdded 被调用" + ctx.channel().id().asLongText());
            System.out.println("handlerAdded 被调用" + ctx.channel().id().asShortText());
        }
    
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    
            System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("异常发生 " + cause.getMessage());
            ctx.close(); //关闭连接
        }
    }
    View Code
    hello.html
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <script>
        var socket;
        //判断当前浏览器是否支持websocket
        if(window.WebSocket) {
            //go on
            socket = new WebSocket("ws://localhost:7000/hello2");
            //相当于channelReado, ev 收到服务器端回送的消息
            socket.onmessage = function (ev) {
                var rt = document.getElementById("responseText");
                rt.value = rt.value + "
    " + ev.data;
            }
    
            //相当于连接开启(感知到连接开启)
            socket.onopen = function (ev) {
                var rt = document.getElementById("responseText");
                rt.value = "连接开启了.."
            }
    
            //相当于连接关闭(感知到连接关闭)
            socket.onclose = function (ev) {
    
                var rt = document.getElementById("responseText");
                rt.value = rt.value + "
    " + "连接关闭了.."
            }
        } else {
            alert("当前浏览器不支持websocket")
        }
    
        //发送消息到服务器
        function send(message) {
            if(!window.socket) { //先判断socket是否创建好
                return;
            }
            if(socket.readyState == WebSocket.OPEN) {
                //通过socket 发送消息
                socket.send(message)
            } else {
                alert("连接没有开启");
            }
        }
    </script>
        <form onsubmit="return false">
            <textarea name="message" style="height: 300px;  300px"></textarea>
            <input type="button" value="发生消息" onclick="send(this.form.message.value)">
            <textarea id="responseText" style="height: 300px;  300px"></textarea>
            <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
        </form>
    </body>
    </html>
    View Code
     

     

  • 相关阅读:
    052 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 14 Eclipse下程序调试——debug2 多断点调试程序
    051 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 13 Eclipse下程序调试——debug入门1
    050 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 12 continue语句
    049 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 11 break语句
    048 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 10 案例——阶乘的累加和
    047 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 09 嵌套while循环应用
    046 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 08 for循环的注意事项
    045 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 07 for循环应用及局部变量作用范围
    剑指OFFER----面试题04.二维数组中的查找
    剑指OFFER----面试题03. 数组中重复的数字
  • 原文地址:https://www.cnblogs.com/cb1186512739/p/12774401.html
Copyright © 2011-2022 走看看