zoukankan      html  css  js  c++  java
  • Mina传输大数组,多路解码,粘包问题的处理

    我的实际情况:

       1,传递的业务数据种类很多,这就决定了我们要用多路解码器,MINA的中文手册提供的是DemuxingProtocolCodecFactory;

      2,,有的数据长度达到8K,网上有资料说Mina在传输数据超过2K的情况下,会分片传输,因此要考虑如何来接收;

      3,若数据发送很快,或者网络状况不佳,很容易出现粘包的情况,这也是要解决的问题。

    1)针对多路解码:

    编码器:

       将编码器继承MessageEncoder<T>,T是你编码的对象的类,此中我是要编码Requstwork类;其中GetBytes()是我自己定义的将对象的数据组成字节数组的函数;

    public class RequstNetworkEncoder implements MessageEncoder<RequstNetwork>{
        @Override
        public void encode(IoSession ioSession, RequstNetwork requstNetwork, ProtocolEncoderOutput out)
                throws Exception {
            if (requstNetwork != null) {
                byte[] bytes1 = GetBytes(requstNetwork);
                int capacity = bytes1.length;
                IoBuffer buffer = IoBuffer.allocate(capacity, false);
                buffer.setAutoExpand(true);
                buffer.put(bytes1);           
                buffer.flip();
                out.write(buffer);
            }
        }
    }

    对应的解码器:

    public class RequstNetworkDecoder implements MessageDecoder {
        @Override
        public MessageDecoderResult decodable(IoSession ioSession, IoBuffer ioBuffer) {
            if(ioBuffer.remaining()<2){
                //还没有达到不同数据的标志位的地方
                return MessageDecoderResult.NEED_DATA;
            }
            else{
                ioBuffer.position(1);
                byte b=ioBuffer.get();
                if (b==(此处为区分不同数据的标志)){  
                    return  MessageDecoderResult.OK;
    
                }
                else{
                    return MessageDecoderResult.NOT_OK;
                }
            }
        }
    
        @Override
        public MessageDecoderResult decode(IoSession ioSession, IoBuffer in, ProtocolDecoderOutput out)
                throws Exception {
            RequstNetworkReply reply=new RequstNetworkReply();
           //自己解码的过程
            out.write(reply);
            return MessageDecoderResult.OK;
        }
    
        @Override
        public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
    
        }
    }

    编解码工厂:

    public class MyProtocolCodecFactory extends DemuxingProtocolCodecFactory {
    
        public MyProtocolCodecFactory(){
            super.addMessageEncoder(RequstNetwork.class,RequstNetworkEncoder.class);
            super.addMessageDecoder(RequstNetworkDecoder.class);
    
        }
    }

    针对大数组的传输和粘包,修改了一下网上的做法:

    public class RequestPlanDecoder extends CumulativeProtocolDecoder {
    
        private final AttributeKey CONTEXT = new AttributeKey(getClass(),
                "context");
    
        @Override
        protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
                throws Exception {
          
            Context ctx =getContext(session);//获取session 的context
    
            long matchCount=ctx.getMatchLength();//目前已获取的数据
            long length=ctx.getLength();//数据总长度
            IoBuffer buffer=ctx.getBuffer();//数据存入buffer
    
            //第一次取数据
            if(length==0){
                length=in.getLong();
                //保存第一次获取的长度
                ctx.setLength(length);
                matchCount=in.remaining();
               ctx.setMatchLength(matchCount);
            }
            else{
                matchCount+=in.remaining();
                ctx.setMatchLength(matchCount);
            } 
            ctx.setMatchLength(matchCount); 
            if (in.hasRemaining()) {
               // 如果buff中还有数据 
               buffer.put(in);
               // 添加到保存数据的buffer中 
               if (matchCount >= length) { 
                 ////自己解码的部分/////// 
                  if(buffer.remaining() > 0) {
                     //如果读取一个完整包内容后还粘了包,就让父类再调用一次,进行下一次解析 
                     IoBuffer temp = IoBuffer.allocate(1024).setAutoExpand(true);
                     temp.put(buffer);
                     temp.flip();
                     in.sweep();
                     //清空数据 
                     in.put(temp);
                  } 
                 ctx.reset();//清空
                 return true;  
               } else { 
                 ctx.setBuffer(buffer); 
                 return false; 
               } 
          } 
       return false; 
    } 
    
    //获取session的context 
    public Context getContext(IoSession session) { 
        Context ctx = (Context) session.getAttribute(CONTEXT);
        if (ctx == null) { 
          ctx = new Context();
          session.setAttribute(CONTEXT, ctx);
        } 
        return ctx; 
    } 
    /** * 定义一个内部类,用来封转当前解码器中的一些公共数据,主要是用于大数据解析 **/
     private class Context {
    
       public IoBuffer buffer; 
    
       public long length = 0; 
    
       public long matchLength = 0; 
    
       public Context() { 
           buffer = IoBuffer.allocate(1024).setAutoExpand(true);
       }
       public void setBuffer(IoBuffer buffer) {
           this.buffer = buffer; 
       } 
       public void setLength(long length) {
           this.length = length; 
       }
       public void setMatchLength(long matchLength) { 
           this.matchLength = matchLength;
       } 
       public IoBuffer getBuffer() { 
           return buffer;
       }
       public long getLength() { 
           return length; 
       } 
       public long getMatchLength() {
           return matchLength;
        }
       public void reset(){ 
         this.buffer.clear();
         this.length=0; 
         this.matchLength=0; 
       }
     
      } 
    } 

    我想让传大数组的解码器能和其他解码器一起共用,通过查看官方的MINA API直到MessageDecoder就是继承了CumulativeProtocolDecoder,于是就做了如下结合:

    public class RequestPlanDecode implements MessageDecoder  {
        private final AttributeKey CONTEXT = new AttributeKey(getClass(),
                "context");
        @Override
        public MessageDecoderResult decodable(IoSession ioSession, IoBuffer in) {
            if(in.remaining()<2){
                return MessageDecoderResult.NEED_DATA;
            }
            else{
                byte b1=in.get();
                byte b2=in.get();
                if(b2==<span style="font-family: Arial, Helvetica, sans-serif;">(此处为区分不同数据的标志)</span>){
                    return MessageDecoderResult.OK;
                }
                else {
                    return  MessageDecoderResult.NOT_OK;
                }
            }
        }
    
        @Override
        public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
                throws Exception {
            //=================结合CumulativeProtocolDecoder================//
            Context ctx =getContext(session);//获取session  的context
    
            long matchCount=ctx.getMatchLength();//目前已获取的数据
            long length=ctx.getLength();//数据总长度
            IoBuffer buffer=ctx.getBuffer();//数据存入buffer
    
            //第一次取数据
            if(length==0){
                length=in.getLong();
                //保存第一次获取的长度
                ctx.setLength(length);
                matchCount=in.remaining();
                ctx.setMatchLength(matchCount);
            }
            else{
                matchCount+=in.remaining();
                ctx.setMatchLength(matchCount);
            }
            if (in.hasRemaining()) {// 如果buff中还有数据
                buffer.put(in);// 添加到保存数据的buffer中
                if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
                   ////自己解码的部分///////
    
                    if(buffer.remaining() > 0) {////解决粘包
                        IoBuffer temp = IoBuffer.allocate(1024).setAutoExpand(true);
                        temp.put(buffer);
                        temp.flip();
                        in.sweep();
                        in.put(temp);
                    }
                    ctx.reset();
                    return MessageDecoderResult.OK;
    
                } else {
                    ctx.setBuffer(buffer);
                    return MessageDecoderResult.NEED_DATA;
                }
            }
            return MessageDecoderResult.NEED_DATA;
        }
    
        @Override
        public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput)
                throws Exception {
    
        }
        /////////////////////////////////////结合CumulativeProtocolDecoder/////////////////////////////////////////////////
        //获取session的context
        public Context getContext(IoSession session) {
            Context ctx = (Context) session.getAttribute(CONTEXT);
            if (ctx == null) {
                ctx = new Context();
                session.setAttribute(CONTEXT, ctx);
            }
            return ctx;
        }
        /**
         * 定义一个内部类,用来封转当前解码器中的一些公共数据,主要是用于大数据解析
         *
         */
        private class Context {
            public IoBuffer buffer;
            public long length = 0;
            public long matchLength = 0;
    
            public Context() {
                buffer = IoBuffer.allocate(1024).setAutoExpand(true);
            }
    
            public void setBuffer(IoBuffer buffer) {
                this.buffer = buffer;
            }
    
            public void setLength(long length) {
                this.length = length;
            }
            public void setMatchLength(long matchLength) {
                this.matchLength = matchLength;
            }
    
            public IoBuffer getBuffer() {
    
                return buffer;
            }
    
            public long getLength() {
                return length;
            }
    
            public long getMatchLength() {
                return matchLength;
            }
    
            public  void reset(){
                this.buffer.clear();
                this.length=0;
                this.matchLength=0;
            }
        }
    
    }
    
  • 相关阅读:
    win7服务器从本地粘贴的文件,粘贴卡死
    AOP面向切面编程
    静态代理和动态代理
    查询数据库中第n行数据
    记录我的成长
    路径总和,双重递归
    java Queue 常用方法(持续更新)
    对称二叉树
    Linux学习之五——Linux虚拟机文件系统
    Linux学习之四——命令运行机制及查看命令帮助
  • 原文地址:https://www.cnblogs.com/bkyliufeng/p/6249317.html
Copyright © 2011-2022 走看看