zoukankan      html  css  js  c++  java
  • WebSocket编解码器

    WebSocket编解码器

    客户端代码

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <script type="text/javascript">
        var socket;
        if (window.WebSocket) {
    
            socket = new WebSocket("ws://localhost:7000/hello");
    
            //收到服务端回送的消息
            socket.onmessage = function (ev) {
                var rt = document.getElementById('responseText');
                rt.value = rt.value + "\n" + ev.data;
            }
            //连接开启
            socket.onopen = function (ev) {
                var rt = document.getElementById('responseText');
                rt.value = '连接开启了....';
    
            }
            //连接关闭
            socket.onclose = function (ev) {
                var rt = document.getElementById('responseText');
                rt.value = rt.value + "\n" + "连接关闭了.......";
    
            }
        } else {
            alert("当前浏览器不支持websocket");
        }
    
        function send(message) {
            if (window.WebSocket) {
                if (socket.readyState == WebSocket.OPEN) {
                    socket.send(message);
                } else {
                    alert("未连接")
                }
            }
    
        }
    </script>
    <form onsubmit="return false">
    
        <textarea name="message" style="height: 300px; 300px"></textarea>
        <input type="button" value="发送消息" onclick="send(this.form.message.value)"/>
        <textarea id="responseText" style="height: 300px; 300px"></textarea>
        <input type="button" value="清空" onclick="document.getElementById('responseText').value = ''"/>
    </form>
    </body>
    </html>

    服务端代码

    public class WebSocketServer {
        private final int PORT = 7000;
    
    
        public static void main(String[] args) {
            new WebSocketServer().run();
        }
    
        public void run() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .handler(new LoggingHandler())//在bossGroup增加一个日志处理器
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel channel) throws Exception {
                                //基于http协议,使用http的编码和解码器
                                channel.pipeline().addLast(new HttpServerCodec());
                                //是以块方式写,添加ChunkedWriteHandler处理器
                                channel.pipeline().addLast(new ChunkedWriteHandler());
                                //http数据在传输过程中是分段,HttpObjectAggregator,就是可以将多个段聚合
                                channel.pipeline().addLast(new HttpObjectAggregator(8192));
                                //websocket数据是以帧(frame)形式传递
                                //浏览器请求时 ws://localhost:7000/hello 表示请求的uri
                                //WebSocketServerProtocolHandler 核心功能将http协议升级为ws协议,保持长连接
                                channel.pipeline().addLast(new WebSocketServerProtocolHandler("/hello"));
                                //处理业务的handler
                                channel.pipeline().addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {
    
                                    /**
                                     *
                                     * @param ctx
                                     * @param msg 文本帧
                                     * @throws Exception
                                     */
                                    @Override
                                    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
                                        System.out.println("服务器收到的信息:" + msg.text());
                                        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器当前时间" + LocalDateTime.now() + msg.text()));
                                    }
    
                                    //当web客户端连接后,触发该方法
                                    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                                        //id表示唯一值,LongText是唯一的,ShortText不是唯一的
                                        System.out.println("handlerAdded 被调用" + ctx.channel().id().asLongText());
                                        System.out.println("handlerAdded 被调用" + ctx.channel().id().asShortText());
                                    }
    
                                    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                                        System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());
                                        System.out.println("handlerRemoved 被调用" + ctx.channel().id().asShortText());
                                    }
    
                                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                        System.out.println("异常信息:" + cause.getMessage());
                                        ctx.close();
                                    }
                                });
                            }
                        });
    
                ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
    
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
    
    }

    WebSocketServerProtocolHandler的handlerAdded

    初始化添加的时候,会调用handlerAdded进行处理器的添加,分别添加握手处理器WebSocketServerProtocolHandshakeHandler,UTF8文本帧验证器Utf8FrameValidator,关闭帧处理器WebSocketCloseFrameHandler:

    public void handlerAdded(ChannelHandlerContext ctx) {
            ChannelPipeline cp = ctx.pipeline();
            if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
                // Add the WebSocketHandshakeHandler before this one.在前面添加一个握手处理器
                cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
                        new WebSocketServerProtocolHandshakeHandler(serverConfig));
            }
            if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {
                // Add the UFT8 checking before this one.在前面添加帧验证器
                cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
                        new Utf8FrameValidator());
            }
        }

    添加完之后,管道中的处理器:

    head ------->HttpServerCodec------>ChunkedWriteHandler----->HttpObjectAggregator---->WebSocketServerProtocolHandshakeHandler------>Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail

    WebSocketServerProtocolHandshakeHandler#channelRead

    之后就是客户端发来HTTP请求websocket握手。HTTP解码出完整消息后就传递到WebSocketServerProtocolHandshakeHandler了,我们来看看他做了什么。

    • 验证协议url
    • 验证GET的请求升级。
    • 设置握手处理器。
    • 移除处理器WebSocketServerProtocolHandshakeHandler.
    • 创建握手WebSocketServerHandshaker 对象,进行握手。
    • 启动一个定义任务进行超时回调。
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            final FullHttpRequest req = (FullHttpRequest) msg;
            if (!isWebSocketPath(req)) {//不是websocket路径就不管
                ctx.fireChannelRead(msg);
                return;
            }
    
            try {
                if (!GET.equals(req.method())) {//只有GET支持的升级的
                    sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0)));
                    return;
                }
           //创建握手工厂
                final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                        getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()),
                        serverConfig.subprotocols(), serverConfig.decoderConfig());
                final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);//创建一个握手处理器
                final ChannelPromise localHandshakePromise = handshakePromise;//握手回调
                if (handshaker == null) {//不支持的版本
                    WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
                } else {
                    // Ensure we set the handshaker and replace this handler before we
                    // trigger the actual handshake. Otherwise we may receive websocket bytes in this handler
                    // before we had a chance to replace it.
                    //
                    // See https://github.com/netty/netty/issues/9471.
                    WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);//设置处理器
                    ctx.pipeline().remove(this);//移除当前处理器
    
                    final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
                    handshakeFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) {
                            if (!future.isSuccess()) {//发送不成功
                                localHandshakePromise.tryFailure(future.cause());
                                ctx.fireExceptionCaught(future.cause());
                            } else {//发送成功
                                localHandshakePromise.trySuccess();
                                // Kept for compatibility  保持兼容性 触发事件
                                ctx.fireUserEventTriggered(//这个HANDSHAKE_COMPLETE是过时的
                                        WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
                                ctx.fireUserEventTriggered(//这个是新的
                                        new WebSocketServerProtocolHandler.HandshakeComplete(
                                                req.uri(), req.headers(), handshaker.selectedSubprotocol()));
                            }
                        }
                    });
                    applyHandshakeTimeout();
                }
            } finally {
                req.release();
            }
        }

    isWebSocketPath验证URL

    这个主要就是验证URL是否是WebSockeURL,主要就是判断创建时候传进去的这个"/hello",默认是比较整个字符串,不是比较开头。

    private boolean isWebSocketPath(FullHttpRequest req) {
            String websocketPath = serverConfig.websocketPath();
            String uri = req.uri();
            boolean checkStartUri = uri.startsWith(websocketPath);
            boolean checkNextUri = checkNextUri(uri, websocketPath);
            return serverConfig.checkStartsWith() ? (checkStartUri && checkNextUri) : uri.equals(websocketPath);
        }

    sendHttpResponse发送消息

    如果响应的状态码不是200或者请求不是设置长连接,就关闭通道了。

        private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
            ChannelFuture f = ctx.channel().writeAndFlush(res);
            if (!isKeepAlive(req) || res.status().code() != 200) {//req不支持KeepAlive,或者res状态码不是200就等写完成了关闭通道
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }

    WebSocketServerHandshakerFactory的newHandshaker创建握手对象

    根据请求头信息的sec-websocket-version来决定要哪个版本的握手对象,一般都是13,如果都不支持就会返回null

        public WebSocketServerHandshaker newHandshaker(HttpRequest req) {
    
            CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);//从请求头获取WEBSOCKET版本,根据不同版本,返回不同握手对象
            if (version != null) {
                if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) {
                    // Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification).
                    return new WebSocketServerHandshaker13(
                            webSocketURL, subprotocols, decoderConfig);
                } else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) {
                    // Version 8 of the wire protocol - version 10 of the draft hybi specification.
                    return new WebSocketServerHandshaker08(
                            webSocketURL, subprotocols, decoderConfig);
                } else if (version.equals(WebSocketVersion.V07.toHttpHeaderValue())) {
                    // Version 8 of the wire protocol - version 07 of the draft hybi specification.
                    return new WebSocketServerHandshaker07(
                            webSocketURL, subprotocols, decoderConfig);
                } else {
                    return null;
                }
            } else {
                // Assume version 00 where version header was not specified
                return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig);
            }
        }

    ctx.pipeline().remove(this);

    只要握手对象创建好了,就不需要响应HTTP了,直接就把当前处理器WebSocketServerProtocolHandler给移除了。

    head ------->HttpServerCodec------>ChunkedWriteHandler----->HttpObjectAggregator---->Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail

    WebSocketServerHandshaker的handshake

    握手对象进行握手,其实就是发送响应数据。先会创建一个FullHttpResponse 响应,然后把跟HTTP相关的聚合,压缩处理器删除,如果有HttpServerCodec,那就在前面添加websocket的编解码器,等发送响应成功了把HttpServerCodec删了。如果是HTTP编解码器,就把解码器先替换成websocket的解码器,等发送响应成功了,再把编码器替换成websocket的编码器。

        public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
                                                HttpHeaders responseHeaders, final ChannelPromise promise) {
    
            if (logger.isDebugEnabled()) {
                logger.debug("{} WebSocket version {} server handshake", channel, version());
            }
            FullHttpResponse response = newHandshakeResponse(req, responseHeaders);//创建响应
            ChannelPipeline p = channel.pipeline();
            if (p.get(HttpObjectAggregator.class) != null) {
                p.remove(HttpObjectAggregator.class);//删除聚合
            }
            if (p.get(HttpContentCompressor.class) != null) {//删除压缩
                p.remove(HttpContentCompressor.class);
            }
            ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);//请求解码器
            final String encoderName;
            if (ctx == null) {//不存在
                // this means the user use an HttpServerCodec
                ctx = p.context(HttpServerCodec.class);//HttpServerCodec是否存在
                if (ctx == null) {//也不存在,就没办法解码http了,失败了
                    promise.setFailure(
                            new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
                    return promise;
                }//在之前添加WebSocket编解码
                p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
                p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
                encoderName = ctx.name();
            } else {
                p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());//替换HttpRequestDecoder
    
                encoderName = p.context(HttpResponseEncoder.class).name();
                p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());//在HttpResponseEncoder之前添加编码器
            }//监听发出事件
            channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        ChannelPipeline p = future.channel().pipeline();
                        p.remove(encoderName);//成功了就把http的编码器删除了,HttpServerCodec或者HttpResponseEncoder
                        promise.setSuccess();
                    } else {
                        promise.setFailure(future.cause());
                    }
                }
            });
            return promise;
        }

    发送回调前是这样:

    head ------->WebSocketFrameDecoder------>WebSocketFrameEncoder----->HttpResponseDecoder---->Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail

    回调后:

    head ------->WebSocketFrameDecoder------>WebSocketFrameEncoder---->Utf8FrameValidator----->WebSocketServerProtocolHandler--------------->自定义处理器--------------->tail

    applyHandshakeTimeout

    发送可能会等好久,所以就给了个超时的定时任务,默认设置是10秒,超时了就触发超时事件,然后关闭通道,如果发送回调了,就把定时任务取消。

        private void applyHandshakeTimeout() {
            final ChannelPromise localHandshakePromise = handshakePromise;
            final long handshakeTimeoutMillis = serverConfig.handshakeTimeoutMillis();
            if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
                return;//完成了就不管了
            }
         //起一个定时任务
            final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
                @Override
                public void run() {
                    if (!localHandshakePromise.isDone() &&
                        localHandshakePromise.tryFailure(new WebSocketServerHandshakeException("handshake timed out"))) {
                        ctx.flush()//没完成就刷出去,触发超时事件,然后关闭
                           .fireUserEventTriggered(ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT)
                           .close();
                    }
                }
            }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
         //如果成功了,就把超时任务取消
            // Cancel the handshake timeout when handshake is finished.
            localHandshakePromise.addListener(new FutureListener<Void>() {
                @Override
                public void operationComplete(Future<Void> f) {
                    timeoutFuture.cancel(false);
                }
            });
        }

    至此WebSocketServerProtocolHandshakeHandler做的事就完成了其实就是握手的时候用HTTP,然后就转到WebSocket了,所以为什么会看到会有替换和删除处理器了。

    WebSocket编解码器

    p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
    p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());

    父类WebSocketServerHandshaker 只提供了抽象方法,由子类WebSocketServerHandshaker13 实现。因为 handshaker 由上文 WebSocketServerHandshakerFactory#newHandshaker 方法创建,返回结果 WebSocketServerHandshaker13。

    WebSocketServerHandshaker13#newWebsocketDecoder

        protected WebSocketFrameDecoder newWebsocketDecoder() {
            return new WebSocket13FrameDecoder(decoderConfig());
        }

    WebSocket13FrameDecoder只有构造方法,关键方法还是在WebSocket08FrameDecoder里面。

    WebSocket08FrameDecoder解码器

    属性

        //读取状态
        enum State {
            READING_FIRST,//第一次读一个字节 FIN, RSV, OPCODE
            READING_SECOND,//解析出MASK, PAYLOAD LEN描述
            READING_SIZE,//解析具体长度PAYLOAD LEN
            MASKING_KEY,//解析掩码
            PAYLOAD,//解析数据
            CORRUPT//帧损坏了
        }
    
    
        private static final byte OPCODE_CONT = 0x0;//连续的frame
        private static final byte OPCODE_TEXT = 0x1;//文本frame
        private static final byte OPCODE_BINARY = 0x2;//二进制frame
        private static final byte OPCODE_CLOSE = 0x8;//关闭帧
        private static final byte OPCODE_PING = 0x9;//ping
        private static final byte OPCODE_PONG = 0xA;//pong

    decode解码

    和http类似,根据状态处理

    1. READING_FIRST:解析第一个字节,是不是最后一帧,扩展位怎么样,是什么帧类型。
    2. READING_SECOND:解析第二个字节,是否有掩码,数据长度是多少。
    3. READING_SIZE:处理长度,如果是0-125,那好办,如果是126,就要读取后面2个字节的数据,如果是127,就要读取后面8个字节的数据。
    4. MASKING_KEY:如果有掩码就解析出4字节掩码。
    5. PAYLOAD:解析出最后的数据。
    6. CORRUPT:帧数据可能损坏了,可能要关闭连接。

    READING_FIRST

    用了位操作去解析第一个字节,这里的Opcode实际上就是帧类型,比如0表示持续的帧,1表示文本帧,2表示二进制帧等等。

    case READING_FIRST:
                if (!in.isReadable()) {
                    return;
                }
    
                framePayloadLength = 0;
    
                // FIN, RSV, OPCODE
                byte b = in.readByte();
                frameFinalFlag = (b & 0x80) != 0;//取出FIN,表示是不是一帧的最后一段
                frameRsv = (b & 0x70) >> 4;//取出RSV
                frameOpcode = b & 0x0F;//取出Opcode
    
                if (logger.isTraceEnabled()) {
                    logger.trace("Decoding WebSocket Frame opCode={}", frameOpcode);
                }
    
                state = State.READING_SECOND;

    READING_SECOND

    然后读取掩码位,读取长度,进行一些合法性的检查,如果违反协议了,就直接发送关闭帧。

            case READING_SECOND:
                if (!in.isReadable()) {
                    return;
                }
                // MASK, PAYLOAD LEN 1
                b = in.readByte();//再读一个字节
                frameMasked = (b & 0x80) != 0;//读取掩码,1表示存在,4字节,0不存在
                framePayloadLen1 = b & 0x7F;//获取内容长度
    
                if (frameRsv != 0 && !config.allowExtensions()) {//有扩展标志位,但是不允许扩展
                    protocolViolation(ctx, in, "RSV != 0 and no extension negotiated, RSV:" + frameRsv);
                    return;
                }
    
                if (!config.allowMaskMismatch() && config.expectMaskedFrames() != frameMasked) {//需要掩码加密,但是发来的没进行掩码加密
                    protocolViolation(ctx, in, "received a frame that is not masked as expected");
                    return;
                }
            //控制操作,关闭,ping,pong
                if (frameOpcode > 7) { // control frame (have MSB in opcode set)
    
                    // control frames MUST NOT be fragmented
                    if (!frameFinalFlag) {//控制帧不用分段了
                        protocolViolation(ctx, in, "fragmented control frame");
                        return;
                    }
    
                    // control frames MUST have payload 125 octets or less
                    if (framePayloadLen1 > 125) {//长度超过125
                        protocolViolation(ctx, in, "control frame with payload length > 125 octets");
                        return;
                    }  
              //不为控制帧
                    // check for reserved control frame opcodes
                    if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING
                          || frameOpcode == OPCODE_PONG)) {
                        protocolViolation(ctx, in, "control frame using reserved opcode " + frameOpcode);
                        return;
                    }
              //关闭帧有内容的话,必须是2个字节的无符号整形表示状态码
                    // close frame : if there is a body, the first two bytes of the
                    // body MUST be a 2-byte unsigned integer (in network byte
                    // order) representing a getStatus code
                    if (frameOpcode == 8 && framePayloadLen1 == 1) {
                        protocolViolation(ctx, in, "received close control frame with payload len 1");
                        return;
                    }
                } else { // data frame  数据帧,不是持续,文本,二进制帧的话也违反协议了
                    // check for reserved data frame opcodes
                    if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT
                          || frameOpcode == OPCODE_BINARY)) {
                        protocolViolation(ctx, in, "data frame using reserved opcode " + frameOpcode);
                        return;
                    }
    
                    // check opcode vs message fragmentation state 1/2
                    if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) {//是持续帧,帧个数为0
                        protocolViolation(ctx, in, "received continuation data frame outside fragmented message");
                        return;
                    }
              //帧的端数不为0,但是不是持续帧,也不是ping
                    // check opcode vs message fragmentation state 2/2
                    if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) {
                        protocolViolation(ctx, in,
                                          "received non-continuation data frame while inside fragmented message");
                        return;
                    }
                }
    
                state = State.READING_SIZE;

    protocolViolation违反协议

    如果发现有违反协议的,直接把数据丢弃,如果通道没关闭,且设置了违反协议就关闭通道的话就发送关闭帧,抛出异常。

        private void protocolViolation(ChannelHandlerContext ctx, ByteBuf in, String reason) {
            protocolViolation(ctx, in, WebSocketCloseStatus.PROTOCOL_ERROR, reason);
        }
    
        private void protocolViolation(ChannelHandlerContext ctx, ByteBuf in, WebSocketCloseStatus status, String reason) {
            protocolViolation(ctx, in, new CorruptedWebSocketFrameException(status, reason));
        }
    
        private void protocolViolation(ChannelHandlerContext ctx, ByteBuf in, CorruptedWebSocketFrameException ex) {
            state = State.CORRUPT;//帧损坏的状态
            int readableBytes = in.readableBytes();
            if (readableBytes > 0) {
                // Fix for memory leak, caused by ByteToMessageDecoder#channelRead:
                // buffer 'cumulation' is released ONLY when no more readable bytes available.
                in.skipBytes(readableBytes);//略过,能帮助释放内存
            }
            if (ctx.channel().isActive() && config.closeOnProtocolViolation()) {//帧坏了就关闭通道
                Object closeMessage;
                if (receivedClosingHandshake) {
                    closeMessage = Unpooled.EMPTY_BUFFER;//空帧
                } else {
                    WebSocketCloseStatus closeStatus = ex.closeStatus();
                    String reasonText = ex.getMessage();
                    if (reasonText == null) {
                        reasonText = closeStatus.reasonText();
                    }
                    closeMessage = new CloseWebSocketFrame(closeStatus, reasonText);//封装成关闭帧
                }
                ctx.writeAndFlush(closeMessage).addListener(ChannelFutureListener.CLOSE);//发出去,成功后关闭通道
            }
            throw ex;//抛出异常
        }

    READING_SIZE

    处理长度的几种情况。

            case READING_SIZE:
    
                // Read frame payload length
                if (framePayloadLen1 == 126) {//如果是126的话,紧跟着后面需要有两个字节的长度
                    if (in.readableBytes() < 2) {
                        return;
                    }
                    framePayloadLength = in.readUnsignedShort();//读取2次节长度
                    if (framePayloadLength < 126) {//长度无效
                        protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)");
                        return;
                    }
                } else if (framePayloadLen1 == 127) {//如果是127,后面需要8个字节
                    if (in.readableBytes() < 8) {
                        return;
                    }
                    framePayloadLength = in.readLong();
                    // TODO: check if it's bigger than 0x7FFFFFFFFFFFFFFF, Maybe
                    // just check if it's negative?
    
                    if (framePayloadLength < 65536) {//小于等于2字节的
                        protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)");
                        return;
                    }
                } else {
                    framePayloadLength = framePayloadLen1;//0-125的情况
                }
            //大于最大长度默认65536
                if (framePayloadLength > config.maxFramePayloadLength()) {
                    protocolViolation(ctx, in, WebSocketCloseStatus.MESSAGE_TOO_BIG,
                        "Max frame length of " + config.maxFramePayloadLength() + " has been exceeded.");
                    return;
                }
    
                if (logger.isTraceEnabled()) {
                    logger.trace("Decoding WebSocket Frame length={}", framePayloadLength);
                }
    
                state = State.MASKING_KEY;

    MASKING_KEY

    解析出掩码,其实这个掩码加密解密只是用了异或^

            case MASKING_KEY://解析掩码
                if (frameMasked) {//有掩码 4字节的
                    if (in.readableBytes() < 4) {
                        return;
                    }
                    if (maskingKey == null) {
                        maskingKey = new byte[4];
                    }
                    in.readBytes(maskingKey);
                }
                state = State.PAYLOAD;

    PAYLOAD

    有掩码先解码,然后根据不同的Opcode类型封装成对应的帧数据。

            case PAYLOAD://解析数据
                if (in.readableBytes() < framePayloadLength) {
                    return;
                }
    
                ByteBuf payloadBuffer = null;
                try {
                    payloadBuffer = readBytes(ctx.alloc(), in, toFrameLength(framePayloadLength));
    
                    // Now we have all the data, the next checkpoint must be the next
                    // frame
                    state = State.READING_FIRST;//回到初始要解析的状态
    
                    // Unmask data if needed
                    if (frameMasked) {//如果有掩码,要解码
                        unmask(payloadBuffer);
                    }
    
                    // Processing ping/pong/close frames because they cannot be
                    // fragmented
                    if (frameOpcode == OPCODE_PING) {//如果是ping
                        out.add(new PingWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                        payloadBuffer = null;
                        return;
                    }
                    if (frameOpcode == OPCODE_PONG) {//如果是pong
                        out.add(new PongWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                        payloadBuffer = null;
                        return;
                    }
                    if (frameOpcode == OPCODE_CLOSE) {//收到关闭帧,也要回一个关闭帧
                        receivedClosingHandshake = true;
                        checkCloseFrameBody(ctx, payloadBuffer);
                        out.add(new CloseWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                        payloadBuffer = null;
                        return;
                    }
    
                    // Processing for possible fragmented messages for text and binary
                    // frames
                    if (frameFinalFlag) {//是最后一帧
                        // Final frame of the sequence. Apparently ping frames are
                        // allowed in the middle of a fragmented message
                        if (frameOpcode != OPCODE_PING) {//允许中间发心跳帧,心跳帧不算,不是心跳帧才要清零
                            fragmentedFramesCount = 0;
                        }
                    } else {
                        // Increment counter
                        fragmentedFramesCount++;//帧个数+1,为持续帧
                    }
    
                    // Return the frame
                    if (frameOpcode == OPCODE_TEXT) {//文本类型
                        out.add(new TextWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                        payloadBuffer = null;
                        return;
                    } else if (frameOpcode == OPCODE_BINARY) {//二进制
                        out.add(new BinaryWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                        payloadBuffer = null;
                        return;
                    } else if (frameOpcode == OPCODE_CONT) {//持续帧
                        out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv,
                                                               payloadBuffer));
                        payloadBuffer = null;
                        return;
                    } else {
                        throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: "
                                                                + frameOpcode);
                    }
                } finally {
                    if (payloadBuffer != null) {//没有解析出来要释放
                        payloadBuffer.release();
                    }
                }

    unmask解码

    其实就是取出4字节掩码,封装成一个整数,然后跟数据进行每次8位的轮询的异或运算解码。

        private void unmask(ByteBuf frame) {
            int i = frame.readerIndex();
            int end = frame.writerIndex();
    
            ByteOrder order = frame.order();
    
            // Remark: & 0xFF is necessary because Java will do signed expansion from
            // byte to int which we don't want.
            int intMask = ((maskingKey[0] & 0xFF) << 24)
                        | ((maskingKey[1] & 0xFF) << 16)
                        | ((maskingKey[2] & 0xFF) << 8)
                        | (maskingKey[3] & 0xFF);
    
            // If the byte order of our buffers it little endian we have to bring our mask
            // into the same format, because getInt() and writeInt() will use a reversed byte order
            if (order == ByteOrder.LITTLE_ENDIAN) {
                intMask = Integer.reverseBytes(intMask);
            }
    
            for (; i + 3 < end; i += 4) {
                int unmasked = frame.getInt(i) ^ intMask;
                frame.setInt(i, unmasked);
            }
            for (; i < end; i++) {
                frame.setByte(i, frame.getByte(i) ^ maskingKey[i % 4]);
            }
        }

    CORRUPT

    一般是有违反协议了,就丢弃了,但是就怕其他问题要读一帧,不然父类处理会出问题。

            case CORRUPT://帧坏了
                if (in.isReadable()) {
                    // If we don't keep reading Netty will throw an exception saying
                    // we can't return null if no bytes read and state not changed.
                    in.readByte();;//要读一下,否则父类会报错
                }
                return;

    问题在于父类ByteToMessageDecoder的callDecode中有:

                    if (oldInputLength == in.readableBytes()) {
                        throw new DecoderException(
                                StringUtil.simpleClassName(getClass()) +
                                        ".decode() did not read anything but decoded a message.");
                    }

    Utf8FrameValidator

    这个是验证文本帧是否是UTF8编码的。其实他就是检查是否是最后一帧,如果是文本帧的话就检测内容,不是UTF8的就抛异常。如果是持续帧,只有第一帧是文本的才会开始检测,所以后续来的肯定是文本帧,就不用判断是不是文本帧了,只要判断是不是在检测就好了。

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof WebSocketFrame) {
                WebSocketFrame frame = (WebSocketFrame) msg;
    
                try {
                    // Processing for possible fragmented messages for text and binary
                    // frames
                    if (((WebSocketFrame) msg).isFinalFragment()) {//是最后帧
                        // Final frame of the sequence. Apparently ping frames are
                        // allowed in the middle of a fragmented message
                        if (!(frame instanceof PingWebSocketFrame)) {
                            fragmentedFramesCount = 0;
    
                            // Check text for UTF8 correctness 监测文本帧
                            if ((frame instanceof TextWebSocketFrame) ||
                                    (utf8Validator != null && utf8Validator.isChecking())) {
                                // Check UTF-8 correctness for this payload
                                checkUTF8String(frame.content());
    
                                // This does a second check to make sure UTF-8
                                // correctness for entire text message
                                utf8Validator.finish();//如果不是就报异常
                            }
                        }
                    } else {//不是最后帧
                        // Not final frame so we can expect more frames in the
                        // fragmented sequence
                        if (fragmentedFramesCount == 0) {//是第一帧,只检测文本
                            // First text or binary frame for a fragmented set
                            if (frame instanceof TextWebSocketFrame) {
                                checkUTF8String(frame.content());//检测内容
                            }
                        } else {//不是第一帧,继续检测,因为前面是文本的,所以持续帧也肯定是
                            // Subsequent frames - only check if init frame is text
                            if (utf8Validator != null && utf8Validator.isChecking()) {
                                checkUTF8String(frame.content());
                            }
                        }
    
                        // Increment counter
                        fragmentedFramesCount++;//帧数累加
                    }
                } catch (CorruptedWebSocketFrameException e) {
                    frame.release();
                    throw e;
                }
            }
    
            super.channelRead(ctx, msg);
        }

    WebSocketServerProtocolHandler#decode

    主要是判断是不是关闭帧,是的话就拿出开始创建的握手对象,然后实现关闭,其实就是发送关闭帧。否则的话就让父类WebSocketProtocolHandler处理。

        protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
            if (serverConfig.handleCloseFrames() && frame instanceof CloseWebSocketFrame) {//如果要处理关闭帧
                WebSocketServerHandshaker handshaker = getHandshaker(ctx.channel());
                if (handshaker != null) {
                    frame.retain();
                    handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);//握手处理器来处理关闭
                } else {
                    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);//直接处理
                }
                return;
            }
            super.decode(ctx, frame, out);
        }

    WebSocketProtocolHandler#decode

    如果是心跳ping,pong帧的就响应,然后继续监听读消息,否则就将数据帧加进消息列表中。

        protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
            if (frame instanceof PingWebSocketFrame) {//ping帧,写回pong,继续监听读事件,直接返回
                frame.content().retain();
                ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content()));
                readIfNeeded(ctx);
                return;
            }
            if (frame instanceof PongWebSocketFrame && dropPongFrames) {//丢弃pong帧
                readIfNeeded(ctx);
                return;
            }
    
            out.add(frame.retain());
        }

    WebSocket08FrameEncoder解码器

    解码器就是编码器反的来,其实就根据情况吧数据封装成协议的格式,比如封装成帧。

        protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
            final ByteBuf data = msg.content();
            byte[] mask;
    
            byte opcode;
            if (msg instanceof TextWebSocketFrame) {
                opcode = OPCODE_TEXT;
            } else if (msg instanceof PingWebSocketFrame) {
                opcode = OPCODE_PING;
            } else if (msg instanceof PongWebSocketFrame) {
                opcode = OPCODE_PONG;
            } else if (msg instanceof CloseWebSocketFrame) {
                opcode = OPCODE_CLOSE;
            } else if (msg instanceof BinaryWebSocketFrame) {
                opcode = OPCODE_BINARY;
            } else if (msg instanceof ContinuationWebSocketFrame) {
                opcode = OPCODE_CONT;
            } else {
                throw new UnsupportedOperationException("Cannot encode frame of type: " + msg.getClass().getName());
            }
    
            int length = data.readableBytes();
    
            if (logger.isTraceEnabled()) {
                logger.trace("Encoding WebSocket Frame opCode={} length={}", opcode, length);
            }
         //封装第一个字节:
            int b0 = 0;
            if (msg.isFinalFragment()) {
                b0 |= 1 << 7;
            }
            b0 |= msg.rsv() % 8 << 4;
            b0 |= opcode % 128;
    
            if (opcode == OPCODE_PING && length > 125) {
                throw new TooLongFrameException("invalid payload for PING (payload length must be <= 125, was "
                        + length);
            }
         //处理长度:
            boolean release = true;
            ByteBuf buf = null;
            try {
                int maskLength = maskPayload ? 4 : 0;
                if (length <= 125) {//长度0-125
                    int size = 2 + maskLength;
                    if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) {
                        size += length;
                    }
                    buf = ctx.alloc().buffer(size);//前面2个字节+掩码长度(4字节)+内容长度
                    buf.writeByte(b0);
                    byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length);
                    buf.writeByte(b);
                } else if (length <= 0xFFFF) {//内容2字节长度
                    int size = 4 + maskLength;
                    if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) {
                        size += length;
                    }
                    buf = ctx.alloc().buffer(size);
                    buf.writeByte(b0);
                    buf.writeByte(maskPayload ? 0xFE : 126);
                    buf.writeByte(length >>> 8 & 0xFF);
                    buf.writeByte(length & 0xFF);
                } else {  //内容8字节长度
                    int size = 10 + maskLength;
                    if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) {
                        size += length;
                    }
                    buf = ctx.alloc().buffer(size);
                    buf.writeByte(b0);
                    buf.writeByte(maskPayload ? 0xFF : 127);
                    buf.writeLong(length);
                }
           //处理掩码,这里默认服务器返回一般不用掩码,而且这里有一种优化,数据不过不太大的话,就合并成一个缓冲区一起发送:
                // Write payload
                if (maskPayload) {//掩码编码
                    int random = (int) (Math.random() * Integer.MAX_VALUE);
                    mask = ByteBuffer.allocate(4).putInt(random).array();
                    buf.writeBytes(mask);
    
                    ByteOrder srcOrder = data.order();
                    ByteOrder dstOrder = buf.order();
    
                    int counter = 0;
                    int i = data.readerIndex();
                    int end = data.writerIndex();
    
                    if (srcOrder == dstOrder) {
                        // Use the optimized path only when byte orders match
                        // Remark: & 0xFF is necessary because Java will do signed expansion from
                        // byte to int which we don't want.
                        int intMask = ((mask[0] & 0xFF) << 24)
                                    | ((mask[1] & 0xFF) << 16)
                                    | ((mask[2] & 0xFF) << 8)
                                    | (mask[3] & 0xFF);
    
                        // If the byte order of our buffers it little endian we have to bring our mask
                        // into the same format, because getInt() and writeInt() will use a reversed byte order
                        if (srcOrder == ByteOrder.LITTLE_ENDIAN) {
                            intMask = Integer.reverseBytes(intMask);
                        }
    
                        for (; i + 3 < end; i += 4) {
                            int intData = data.getInt(i);
                            buf.writeInt(intData ^ intMask);
                        }
                    }
                    for (; i < end; i++) {
                        byte byteData = data.getByte(i);
                        buf.writeByte(byteData ^ mask[counter++ % 4]);
                    }
                    out.add(buf);
                } else {
                    if (buf.writableBytes() >= data.readableBytes()) {//可写长度如果大于等于内容大度,就合并成一个就发一次
                        // merge buffers as this is cheaper then a gathering write if the payload is small enough
                        buf.writeBytes(data);
                        out.add(buf);
                    } else {
                        out.add(buf);
                        out.add(data.retain());
                    }
                }
                release = false;
            } finally {
                if (release && buf != null) {
                    buf.release();
                }
            }
        }
  • 相关阅读:
    kafka管理器kafka-manager部署安装
    kafka消息监控-KafkaOffsetMonitor
    在Kafka中修改Topic的preferred replica
    kafka中对一个topic增加replicas
    在kafka上对topic新增partition
    kafka log4j配置
    kafka安装与使用
    kafka 消息服务
    软件工程期末考试复习(一)
    PM2自动发布本地项目到服务器
  • 原文地址:https://www.cnblogs.com/xiaojiesir/p/15522341.html
Copyright © 2011-2022 走看看