zoukankan      html  css  js  c++  java
  • Mina框架断包、粘包问题解决方式

    Mina框架断包、粘包问题解决方式

    Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然。也能够提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 能够帮助我们高速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。

    在mina中,一般的应用场景用TextLine的Decode和Encode就够用了(TextLine的默认切割符尽管是 ,但事实上分隔符是能够自己指定的,如:newTextLineDecoder(charset, decodingDelimiter);)

    但默认解码器每次读取缓冲的数据是有限制的,即ReadBufferSize的大小。默认是2048个字节。当数据包比較大时将被分成多次读取。造成断包。

    尽管能够通过acceptor.getSessionConfig().setReadBufferSize(newsize)这样的方式来添加默认容量,但毕竟不是王道(太大了浪费空间。肯定会减少数据的处理效率)。

    所以。当我们接收的数据的大小不是非常固定,且easy偏大的时候,默认的TextLine就不适合了。

    这时我们在解析之前就须要推断数据包是否完整,这样处理起来就会非常麻烦。那么Mina 中幸好提供了CumulativeProtocolDecoder

    类,从名字上能够看出累积性的协议解码器,也就是说仅仅要有数据发送过来。这个类就会去读取数据。然后累积到内部的IoBuffer 缓冲区,可是详细的拆包(把累积到缓冲区的数据解码为JAVA 对象)交由子类的doDecode()方法完毕,实际上CumulativeProtocolDecoder就是在decode()重复的调用暴漏给子类实现的doDecode()方法。

    详细运行步骤例如以下所看到的:

    A. 你的doDecode()方法返回true 时,CumulativeProtocolDecoder 的decode()方法会首先推断你是否在doDecode()方法中从内部的IoBuffer 缓冲区读取了数据。假设没有,则会抛出非法的状态异常,也就是你的doDecode()方法返回true 就表示你已经消费了本次数据(相当于聊天室中一个完整的消息已经读取完成),进一步说,也就是此时你必须已经消费过内部的IoBuffer 缓冲区的数据(哪怕是消费了一个字节的数据)。

    假设验证通过,那么CumulativeProtocolDecoder会检查缓冲区内是否还有数据未读取。假设有就继续调用doDecode()方法。没有就停止对doDecode()方法的调用。直到有新的数据被缓冲。

    B. 当你的doDecode()方法返回false 时。CumulativeProtocolDecoder 会停止对doDecode()方法的调用。但此时假设本次数据还有未读取完的,就将含有剩余数据的IoBuffer 缓冲区保存到IoSession 中,以便下一次数据到来时能够从IoSession 中提取合并。

    假设发现本次数据全都读取完成,则清空IoBuffer 缓冲区。

    简而言之,当你觉得读取到的数据已经够解码了。那么就返回true,否则就返回false。这个CumulativeProtocolDecoder事实上最重要的工作就是帮你完毕了数据的累积,由于这个工作是非常烦琐的。

    一、    实现解码器

    CumulativeProtocolDecoder是一个抽象类,必须继承并实现其doDecode方法。用户自己定义协议的拆分就应该写在doDecode方法中,以下的MyDecoder类是一个其子类的实现:

    public class MyDecoder extends CumulativeProtocolDecoder {

        public static Logger log = Logger.getLogger(MyDecoder.class);

        /**

         * 包解码器组件

         */

        private PacketComponent packetComponent;

         /**

         * 这种方法的返回值是重点:

         * 1、当内容刚好时,返回false,告知父类接收下一批内容

         * 2、内容不够时须要下一批发过来的内容,此时返回false,这样父类 CumulativeProtocolDecoder

         *   会将内容放进IoSession中,等下次来数据后就自己主动拼装再交给本类的doDecode

         * 3、当内容多时,返回true,由于须要再将本批数据进行读取。父类会将剩余的数据再次推送本

         * 类的doDecode

         */ 

        public boolean doDecode(IoSession session,IoBuffer in, 

                ProtocolDecoderOutput out) throws Exception { 

        log.info("in.remaining : "+in.remaining());

            if(in.remaining() > 0){//有数据时。读取前8字节推断消息长度 

                byte [] sizeBytes = new byte[8]; 

                in.mark();//标记当前位置。以便reset

    //由于我的前数据包的长度是保存在第4-8字节中,

                in.get(sizeBytes,0,8);//读取4字节 

                            //DataTypeChangeHelper是自己写的一个byte[]int的一个工具类 

                int size = (int) DataTypeUtil.bytesToInt(sizeBytes,4);

                log.info("size : "+size);

                in.reset();

                if(size > in.remaining()){//假设消息内容不够,则重置。相当于不读取size 

                    return false;//父类接收新数据,以拼凑成完整数据 

                } else

                    byte[] bytes = new byte[size];  

                    in.get(bytes, 0, size);

                  //把字节转换为Java对象的工具类

                    PackageData pack = packetComponent.getDataFromBuffer(IoBuffer.wrap(bytes));

                    out.write(pack);

                    if(in.remaining() > 0){//假设读取内容后还粘了包,就让父类再重读  一次。进行下一次解析 

                        return true

                    } 

                } 

            } 

            return false;//处理成功,让父类进行接收下个包 

        }

        getter();

        Setter();

    }

    二、    实现编解码工厂和解码器

    我们还须要一个编解码工厂,用来为编解码过滤器提供编码器和解码器,解码器此处我们用不到。可是也必须提供,所以能够提供一个空的实现。

    /**

     *

     * 编解码工厂

     *

     */

    public class MyCodecFcatory implements ProtocolCodecFactory {

        private ProtocolEncoder encoder = null;

        private ProtocolDecoder decoder = null;

     

        public MyCodecFcatory(ProtocolEncoder encoder, ProtocolDecoderdecoder) {

           this.encoder = encoder;

           this.decoder = decoder;

        }

     

        @Override

        public ProtocolEncoder getEncoder(IoSession session) throws Exception {

           return this.encoder;

        }

     

        @Override

        public ProtocolDecoder getDecoder(IoSession session) throws Exception {

           return this.decoder;

        }

    }

    /**

     *

     * 编码器:不做不论什么操作,数据已是约定好的格式。按原格式编码

     *

     */

    public class MyEncoder extends ProtocolEncoderAdapter {

     

        @Override

        public void encode(IoSession session, Object message,

               ProtocolEncoderOutput out) throws Exception {

           // TODO Do nothing

        }

    }

    三、    配置编解码过滤器

    以下就能够配置编解码过滤器了:

    <!-- 累加数据包解码器:解断丢包、粘包问题 -->

        <bean id="codec" class="org.apache.mina.filter.codec.ProtocolCodecFilter">

           <constructor-arg>

               <bean class="com.mina.codec.MyCodecFcatory">

                  <constructor-arg index="0">

                      <bean class="com.mina.codec.MyEncoder"></bean>

                  </constructor-arg>

                  <constructor-arg index="1">

                      <bean class="com.mina.codec.MyDecoder">

                         <property name="packetComponent">

                             <bean class="com. mina.component.RootComponent">

                               

                             </bean>

                          </property>

                      </bean>

                  </constructor-arg>

                     

               </bean>

           </constructor-arg>

        </bean>

    <bean id="filterChainBuilder" class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder"> 

          <property name="filters"> 

            <map>

              <entry key="codec" value-ref="codec"/>

              <entry key="logger" value-ref="loggerFilter"/>

              <entry key="executors" value-ref="executors"/>

            </map> 

          </property> 

        </bean> 

    须要注意的是:在doDecode中通过out.write(pack) 把数据输出后,官方的说明文档中说接下来会继续运行后面的过滤器,然后是IoHandle。假设你是仅仅用了一个编解码过滤器的话,这可能全然没问题,可是假设使用了两个编解码过滤器(可能非常少有人会这样做,但本人因为前期使用了另外一个自己定义的编解码过滤器。后来想加上这个可累加的解码器。为了图省事就在原过滤器的前面新添加了一个编解码过滤器,后来数据流就不走我原来的编解码过滤器了,out.write()之后直接到了IoHandle里面,搞了我好久,无奈最后把两个编解码过滤器合二为一啦。当中原因我还没时间去搞个清楚。为防止大家和我犯同一个错误,特此提醒!)

  • 相关阅读:
    文件下载和进度显示
    响应
    log4j2-2.13.0版本安装
    maven私服nexus仓库3.24.0版本搭建
    window下MYSQL定时备份表库的BAT
    JBoss7.3.0EAP版本安装
    jetbrains-IDEA2020版本插件搜索以及官方汉化和其他插件安装介绍
    Jenkins迁移job插件Job Import Plugin
    Appium下出现Original error: pkg.... 解决办法
    IIS10下部署.NetCore站点出现出现 HTTP 错误 500.19,错误代码:0x8007000d及一些问题
  • 原文地址:https://www.cnblogs.com/mfrbuaa/p/5069487.html
Copyright © 2011-2022 走看看