zoukankan      html  css  js  c++  java
  • Netty 系列八(基于 WebSocket 的简单聊天室).

    一、前言

        之前写过一篇 Spring 集成 WebSocket 协议的文章 —— Spring消息之WebSocket ,所以对于 WebSocket 协议的介绍就不多说了,可以参考这篇文章。这里只做一些补充说明。另外,Netty 对 WebSocket 协议的支持要比 Spring 好太多了,用起来舒服的多。

        WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧。

        由 IETF 发布的 WebSocket RFC,定义了 6 种帧, Netty 为它们每种都提供了一个 POJO 实现。下表列出了这些帧类型,并描述了它们的用法。

    二、聊天室功能说明

        1、A、B、C 等所有用户都可以加入同一个聊天室。

        2、A 发送的消息,B、C 可以同时收到,但是 A 收不到自己发送的消息。

        3、当用户长时间没有发送消息,系统将把他踢出聊天室。

       

    三、聊天室功能实现

        1、Netty 版本

    <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
         <version>5.0.0.Alpha2</version>
    </dependency>

        2、处理 HTTP 协议的 ChannelHandler —— 非 WebSocket 协议的请求,返回 index.html 页面

    public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    
        private final String wsUri;
        private static File INDEX;
    
        static {
            URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
            try {
                String path = location.toURI() + "index.html";
                path = !path.contains("file:") ? path : path.substring(5);
                INDEX = new File(path);
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
    
        public HttpRequestHandler(String wsUri) {
            this.wsUri = wsUri;
        }
    
    
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
            // 如果请求了Websocket,协议升级,增加引用计数(调用retain()),并将他传递给下一个 ChannelHandler
            // 之所以需要调用 retain() 方法,是因为调用 channelRead() 之后,资源会被 release() 方法释放掉,需要调用 retain() 保留资源
            if (wsUri.equalsIgnoreCase(request.uri())) {
                ctx.fireChannelRead(request.retain());
            } else {
                //处理 100 Continue 请求以符合 HTTP 1.1 规范
                if (HttpHeaderUtil.is100ContinueExpected(request)) {
                    send100Continue(ctx);
                }
                // 读取 index.html
                RandomAccessFile randomAccessFile = new RandomAccessFile(INDEX, "r");
                HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
                HttpHeaders headers = response.headers();
                //在该 HTTP 头信息被设置以后,HttpRequestHandler 将会写回一个 HttpResponse 给客户端
                headers.set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
                boolean keepAlive = HttpHeaderUtil.isKeepAlive(request);
                if (keepAlive) {
                    headers.setLong(HttpHeaderNames.CONTENT_LENGTH, randomAccessFile.length());
                    headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
                }
                ctx.write(response);
                //将 index.html 写给客户端
                if (ctx.pipeline().get(SslHandler.class) == null) {
                    ctx.write(new DefaultFileRegion(randomAccessFile.getChannel(), 0, randomAccessFile.length()));
                } else {
                    ctx.write(new ChunkedNioFile(randomAccessFile.getChannel()));
                }
                //写 LastHttpContent 并冲刷至客户端,标记响应的结束
                ChannelFuture channelFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
                if (!keepAlive) {
                    channelFuture.addListener(ChannelFutureListener.CLOSE);
                }
            }
    
        }
    
        private void send100Continue(ChannelHandlerContext ctx) {
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
            ctx.writeAndFlush(response);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }

        3、处理 WebSocket 协议的 ChannelHandler —— 处理 TextWebSocketFrame 的消息帧

    /**
     * WebSocket 帧:WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧
     */
    public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    
        private final ChannelGroup group;
    
        public TextWebSocketFrameHandler(ChannelGroup group) {
            this.group = group;
        }
    
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
            //增加消息的引用计数(保留消息),并将他写到 ChannelGroup 中所有已经连接的客户端
            Channel channel = ctx.channel();
            //自己发送的消息不返回给自己
            group.remove(channel);
            group.writeAndFlush(msg.retain());
            group.add(channel);
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            //是否握手成功,升级为 Websocket 协议
            if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
                // 握手成功,移除 HttpRequestHandler,因此将不会接收到任何消息
                // 并把握手成功的 Channel 加入到 ChannelGroup 中
                ctx.pipeline().remove(HttpRequestHandler.class);
                group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
                group.add(ctx.channel());
            } else if (evt instanceof IdleStateEvent) {
                IdleStateEvent stateEvent = (IdleStateEvent) evt;
                if (stateEvent.state() == IdleState.READER_IDLE) {
                    group.remove(ctx.channel());
                    ctx.writeAndFlush(new TextWebSocketFrame("由于您长时间不在线,系统已自动把你踢下线!")).addListener(ChannelFutureListener.CLOSE);
                }
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }

       WebSocket 协议升级完成之后, WebSocketServerProtocolHandler 将会把 HttpRequestDecoder 替换为 WebSocketFrameDecoder,把 HttpResponseEncoder 替换为WebSocketFrameEncoder。为了性能最大化,它将移除任何不再被 WebSocket 连接所需要的 ChannelHandler。这也包括了 HttpObjectAggregator 和 HttpRequestHandler 。

        4、ChatServerInitializer —— 多个 ChannelHandler 合并成 ChannelPipeline 链

    public class ChatServerInitializer extends ChannelInitializer<Channel> {
    
        private final ChannelGroup group;
        private static final int READ_IDLE_TIME_OUT = 60; // 读超时
        private static final int WRITE_IDLE_TIME_OUT = 0;// 写超时
        private static final int ALL_IDLE_TIME_OUT = 0; // 所有超时
    
        public ChatServerInitializer(ChannelGroup group) {
            this.group = group;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new HttpServerCodec());
            pipeline.addLast(new ChunkedWriteHandler());
            pipeline.addLast(new HttpObjectAggregator(64 * 1024));
            // 处理那些不发送到 /ws URI的请求
            pipeline.addLast(new HttpRequestHandler("/ws"));
            // 如果被请求的端点是 "/ws",则处理该升级握手
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
            // //当连接在60秒内没有接收到消息时,进会触发一个 IdleStateEvent 事件,被 HeartbeatHandler 的 userEventTriggered 方法处理
            pipeline.addLast(new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS));
            pipeline.addLast(new TextWebSocketFrameHandler(group));
    
        }
    }
    ChatServerInitializer.java

    tips:上面这些开箱即用 ChannelHandler 的作用,我就不一一介绍了,可以参考上一篇文章

        5、引导类 ChatServer

    public class ChatServer {
    
        private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
        private final EventLoopGroup group = new NioEventLoopGroup();
        private Channel channel;
    
        public ChannelFuture start(InetSocketAddress address) {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChatServerInitializer(channelGroup));
            ChannelFuture channelFuture = bootstrap.bind(address);
            channelFuture.syncUninterruptibly();
            channel = channelFuture.channel();
            return channelFuture;
        }
    
        public void destroy() {
            if (channel != null) {
                channel.close();
            }
            channelGroup.close();
            group.shutdownGracefully();
        }
    
        public static void main(String[] args) {
            final ChatServer chatServer = new ChatServer();
            ChannelFuture channelFuture = chatServer.start(new InetSocketAddress(9999));
            // 返回与当前Java应用程序关联的运行时对象
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    chatServer.destroy();
                }
            });
            channelFuture.channel().closeFuture().syncUninterruptibly();
        }
    
    }
    ChatServer.java

    三、效果展示

        在浏览器中输入 http://127.0.0.1:9999 即可看到预先准备好的 index.html 页面;访问 ws://127.0.0.1:9999/ws (可随意找一个 WebSocket 测试工具测试)即可加入聊天室。

    有点 low 的聊天室总算是完成了,算是 Netty 对 HTTP 协议和 WebSocket 协议的一次实践吧!虽然功能欠缺,但千里之行,始于足下!不积硅步,无以至千里;不积小流,无以成江海!

     

    参考资料:《Netty IN ACTION》

    演示源代码:https://github.com/JMCuixy/NettyDemo/tree/master/src/main/java/org/netty/demo/chatroom

  • 相关阅读:
    !function() {}()
    element.dataset API
    正则匹配 数字和英文状态下的逗号
    《vim实用技巧》读书笔记
    ajax分页
    smarty分页类
    数组排序
    数组大类
    自动刷新价格
    简单购物车
  • 原文地址:https://www.cnblogs.com/jmcui/p/9600533.html
Copyright © 2011-2022 走看看