zoukankan      html  css  js  c++  java
  • java nio消息半包、粘包解决方案

    问题背景

    NIO是面向缓冲区进行通信的,不是面向流的。我们都知道,既然是缓冲区,那它一定存在一个固定大小。这样一来通常会遇到两个问题:

    • 消息粘包:当缓冲区足够大,由于网络不稳定种种原因,可能会有多条消息从通道读入缓冲区,此时如果无法分清数据包之间的界限,就会导致粘包问题;
    • 消息不完整:若消息没有接收完,缓冲区就被填满了,会导致从缓冲区取出的消息不完整,即半包的现象。

    介绍这个问题之前,务必要提一下我代码整体架构。
    代码参见GitHub仓库

    https://github.com/CuriousLei/smyl-im

    在这个项目中,我的NIO核心库设计思路流程图如下所示

    image

    介绍:

    • 服务端为每一个连接上的客户端建立一个Connector对象,为其提供IO服务;
    • ioArgs对象内部实例域引用了缓冲区buffer,作为直接与channel进行数据交互的缓冲区;
    • 两个线程池,分别操控ioArgs进行读和写操作;
    • connector与ioArgs关系:(1)输入,线程池处理读事件,数据写入ioArgs,并回调给connector;(2)输出,connector将数据写入ioArgs,将ioArgs传入Runnable对象,供线程池处理;
    • 两个selector线程,分别监听channel的读和写事件。事件就绪,则触发线程池工作。

    思路

    光这样实现,必然会有粘包、半包问题。要重现这两个问题也很简单。

    • ioArgs中把缓冲区设置小一点,发送一条大于该长度的数据,服务端会当成两条消息读取,即消息不完整;
    • 在线程代码中,加一个Thread.sleep()延时等待,客户端连续发几条消息(总长度小于缓冲区大小),也可以重现粘包现象。

    这个问题实质上是消息体与缓冲区数据不一一对应导致的。那么,如何解决呢?

    固定头部方案

    可以采用固定头部方案来解决,头部设置四个字节,存储一个int值,记录后面数据的长度。以此来标记一个消息体。

    • 读取数据时,根据头部的长度信息,按序读取ioArgs缓冲区中的数据,若没有达到长度要求,继续读下一个ioArgs。这样自然不会出现粘包、半包问题。
    • 输出数据时,也采用同样的机制封装数据,首部四个字节记录长度。

    我的工程项目中,客户端和服务端共用一个nio核心包,即niohdl,可保证收发数据格式一致。

    设计方案

    要实现以上设想,必须在connector和ioArgs之间加一层Dispatcher类,用于处理消息体缓冲区之间的转化关系(消息体取个名字:Packet)。根据输入和输出的不同,分别叫ReceiveDispatcher和SendDispatcher。即通过它们来操作Packet与ioArgs之间的转化。

    Packet

    定义这个消息体,继承关系如下图所示:

    image

    Packet是基类,代码如下:

    package cn.buptleida.niohdl.core;
    import java.io.Closeable;
    import java.io.IOException;
    /**
     * 公共的数据封装
     * 提供了类型以及基本的长度的定义
     */
    public class Packet implements Closeable {
        protected byte type;
        protected int length;
    
        public byte type(){
            return type;
        }
    
        public int length(){
            return length;
        }
    
        @Override
        public void close() throws IOException {
    
        }
    }
    

    SendPacket和ReceivePacket分别代表发送消息体和接收消息体。StringReceivePacket和StringSendPacket代表字符串类的消息,因为本次实践只限于字符串消息的收发,今后可能有文件之类的,有待扩展。

    代码中必然会涉及到字节数组的操作,所以,以StringSendPacket为例,需要提供将String转化为byte[]的方法。代码如下所示:

    package cn.buptleida.niohdl.box;
    import cn.buptleida.niohdl.core.SendPacket;
    
    public class StringSendPacket extends SendPacket {
    
        private final byte[] bytes;
    
        public StringSendPacket(String msg) {
            this.bytes = msg.getBytes();
            this.length = bytes.length;//父类中的实例域
        }
    
        @Override
        public byte[] bytes() {
            return bytes;
        }
    }
    

    SendDispatcher

    在connector对象的实例域中会引用一个SendDispatcher对象。发送数据时,会通过SendDispatcher中的方法对数据进行封装和处理。其大致的关系图如下所示:

    image

    SendDispatcher中设置任务队列Queue queue,需要发送消息时,connector将消息写入sendPacket,并存入队列queue,执行出队。用packetTemp变量引用出队的元素,将四字节的长度信息和packetTemp写入ioArgs的缓冲区中,发送完毕之后,再判断packetTemp是否完整写出(使用position和total指针标记、判断),决定继续输出packetTemp的内容,还是开始下一轮出队。
    这个过程的程序框图如下所示:

    image

    在代码中,SendDispatcher实际上是一个接口,我用AsyncSendDispatcher实现此接口,代码如下:

    package cn.buptleida.niohdl.impl.async;
    
    import cn.buptleida.niohdl.core.*;
    import cn.buptleida.utils.CloseUtil;
    
    import java.io.IOException;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedDeque;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class AsyncSendDispatcher implements SendDispatcher {
        private final AtomicBoolean isClosed = new AtomicBoolean(false);
        private Sender sender;
        private Queue<SendPacket> queue = new ConcurrentLinkedDeque<>();
        private AtomicBoolean isSending = new AtomicBoolean();
        private ioArgs ioArgs = new ioArgs();
        private SendPacket packetTemp;
        //当前发送的packet大小以及进度
        private int total;
        private int position;
    
        public AsyncSendDispatcher(Sender sender) {
            this.sender = sender;
        }
    
        /**
         * connector将数据封装进packet后,调用这个方法
         * @param packet
         */
        @Override
        public void send(SendPacket packet) {
            queue.offer(packet);//将数据放进队列中
            if (isSending.compareAndSet(false, true)) {
                sendNextPacket();
            }
        }
        
        @Override
        public void cancel(SendPacket packet) {
    
        }
    
        /**
         * 从队列中取数据
         * @return
         */
        private SendPacket takePacket() {
            SendPacket packet = queue.poll();
            if (packet != null && packet.isCanceled()) {
                //已经取消不用发送
                return takePacket();
            }
            return packet;
        }
    
        private void sendNextPacket() {
            SendPacket temp = packetTemp;
            if (temp != null) {
                CloseUtil.close(temp);
            }
            SendPacket packet = packetTemp = takePacket();
            if (packet == null) {
                //队列为空,取消发送状态
                isSending.set(false);
                return;
            }
    
            total = packet.length();
            position = 0;
    
            sendCurrentPacket();
        }
    
        private void sendCurrentPacket() {
            ioArgs args = ioArgs;
    
            args.startWriting();//将ioArgs缓冲区中的指针设置好
    
            if (position >= total) {
                sendNextPacket();
                return;
            } else if (position == 0) {
                //首包,需要携带长度信息
                args.writeLength(total);
            }
    
            byte[] bytes = packetTemp.bytes();
            //把bytes的数据写入到IoArgs中
            int count = args.readFrom(bytes, position);
            position += count;
    
            //完成封装
            args.finishWriting();//flip()操作
            //向通道注册OP_write,将Args附加到runnable中;selector线程监听到就绪即可触发线程池进行消息发送
            try {
                sender.sendAsync(args, ioArgsEventListener);
            } catch (IOException e) {
                closeAndNotify();
            }
        }
    
        private void closeAndNotify() {
            CloseUtil.close(this);
        }
    
        @Override
        public void close(){
            if (isClosed.compareAndSet(false, true)) {
                isSending.set(false);
                SendPacket packet = packetTemp;
                if (packet != null) {
                    packetTemp = null;
                    CloseUtil.close(packet);
                }
            }
        }
    
        /**
         * 接收回调,来自writeHandler输出线程
         */
        private ioArgs.IoArgsEventListener ioArgsEventListener = new ioArgs.IoArgsEventListener() {
            @Override
            public void onStarted(ioArgs args) {
    
            }
    
            @Override
            public void onCompleted(ioArgs args) {
                //继续发送当前包packetTemp,因为可能一个包没发完
                sendCurrentPacket();
            }
        };
    
    
    }
    
    

    ReceiveDispatcher

    同样,ReceiveDispatcher也是一个接口,代码中用AsyncReceiveDispatcher实现。在connector对象的实例域中会引用一个AsyncReceiveDispatcher对象。接收数据时,会通过ReceiveDispatcher中的方法对接收到的数据进行拆包处理。其大致的关系图如下所示:
    image

    每一个消息体的首部会有一个四字节的int字段,代表消息的长度值,按照这个长度来进行读取。如若一个ioArgs未满足这个长度,就读取下一个ioArgs,保证数据包的完整性。这个流程就不画程序框图了,偷个懒hhhh。其实看下面代码注释已经很清晰了,容易理解。

    AsyncReceiveDispatcher的代码如下所示:

    package cn.buptleida.niohdl.impl.async;
    
    import cn.buptleida.niohdl.box.StringReceivePacket;
    import cn.buptleida.niohdl.core.ReceiveDispatcher;
    import cn.buptleida.niohdl.core.ReceivePacket;
    import cn.buptleida.niohdl.core.Receiver;
    import cn.buptleida.niohdl.core.ioArgs;
    import cn.buptleida.utils.CloseUtil;
    
    import java.io.IOException;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class AsyncReceiveDispatcher implements ReceiveDispatcher {
        private final AtomicBoolean isClosed = new AtomicBoolean(false);
        private final Receiver receiver;
        private final ReceivePacketCallback callback;
        private ioArgs args = new ioArgs();
        private ReceivePacket packetTemp;
        private byte[] buffer;
        private int total;
        private int position;
    
        public AsyncReceiveDispatcher(Receiver receiver, ReceivePacketCallback callback) {
            this.receiver = receiver;
            this.receiver.setReceiveListener(ioArgsEventListener);
            this.callback = callback;
        }
    
        /**
         * connector中调用该方法进行
         */
        @Override
        public void start() {
            registerReceive();
        }
    
        private void registerReceive() {
    
            try {
                receiver.receiveAsync(args);
            } catch (IOException e) {
                closeAndNotify();
            }
        }
    
        private void closeAndNotify() {
            CloseUtil.close(this);
        }
    
        @Override
        public void stop() {
    
        }
    
        @Override
        public void close() throws IOException {
            if(isClosed.compareAndSet(false,true)){
                ReceivePacket packet = packetTemp;
                if(packet!=null){
                    packetTemp = null;
                    CloseUtil.close(packet);
                }
            }
        }
    
        /**
         * 回调方法,从readHandler输入线程中回调
         */
        private ioArgs.IoArgsEventListener ioArgsEventListener = new ioArgs.IoArgsEventListener() {
            @Override
            public void onStarted(ioArgs args) {
                int receiveSize;
                if (packetTemp == null) {
                    receiveSize = 4;
                } else {
                    receiveSize = Math.min(total - position, args.capacity());
                }
                //设置接受数据大小
                args.setLimit(receiveSize);
            }
    
            @Override
            public void onCompleted(ioArgs args) {
                assemblePacket(args);
                //继续接受下一条数据,因为可能同一个消息可能分隔在两份IoArgs中
                registerReceive();
            }
        };
    
        /**
         * 解析数据到packet
         * @param args
         */
        private void assemblePacket(ioArgs args) {
            if (packetTemp == null) {
                int length = args.readLength();
                packetTemp = new StringReceivePacket(length);
                buffer = new byte[length];
                total = length;
                position = 0;
            }
            //将args中的数据写进外面buffer中
            int count = args.writeTo(buffer,0);
            if(count>0){
                //将数据存进StringReceivePacket的buffer当中
                packetTemp.save(buffer,count);
                position+=count;
                
                if(position == total){
                    completePacket();
                    packetTemp = null;
                }
            }
            
        }
    
        private void completePacket() {
            ReceivePacket packet = this.packetTemp;
            CloseUtil.close(packet);
            callback.onReceivePacketCompleted(packet);
        }
    
    }
    

    总结

    其实粘包、半包的解决方案并没有什么奥秘,单纯地复杂而已。方法核心就是自定义一个消息体Packet,完成Packet中的byte数组与缓冲区数组之间的复制转化即可。当然,position、limit等等指针的辅助很重要。

    总结这个博客,也是将目前为止的工作进行梳理和记录。我将通过smyl-im这个项目来持续学习+实践。因为之前学习过程中有很多零碎的知识点,都躺在我的有道云笔记里,感觉没必要总结成博客。本次博客讲的内容刚好是一个成体系的东西,正好可以将这个项目背景带出来,后续的博客就可以在这基础上衍生拓展了。

  • 相关阅读:
    forEach 终止循环
    js 解决引用赋值修改新数组导致原数组跟着改变的问题
    es6判断数组是否包含某个元素
    tab css 切换效果
    js对象赋值影响原对象
    小程序返回上个页面 修改上个页面的数据
    小程序 scroll-view scroll-x 不生效
    proxy跨域处理
    常用Dos操作指令
    django全文检索
  • 原文地址:https://www.cnblogs.com/buptleida/p/12732288.html
Copyright © 2011-2022 走看看