zoukankan      html  css  js  c++  java
  • MINA 网络黏包处理代码

    本文完整代码,可以浏览:

    https://github.com/hjj2017/xgame-code_server/blob/master/game_server/src/com/game/gameServer/framework/mina/MsgCumulativeFilter.java

    我在网上查阅过的 MINA 黏包处理,一般都是放在 Decoder 中做的。也就是黏包处理和消息解码放在一起做,显得比较混乱不好打理。而以下这段代码,我是把黏包处理放在 Filter 中了。在具体使用时可以这样:

     1 // 创建 IO 接收器
     2 NioSocketAcceptor acceptor = new NioSocketAcceptor();
     3 
     4 // 获取责任链
     5 DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
     6 // 处理网络粘包
     7 chain.addLast("msgCumulative", new MsgCumulativeFilter());
     8 
     9 // 添加自定义编解码器
    10 chain.addLast("msgCodec", new ProtocolCodecFilter(
    11     new XxxEncoder(),
    12     new XxxDecoder()
    13 ));
    14 
    15 // 获取会话配置
    16 IoSessionConfig cfg = acceptor.getSessionConfig();
    17 
    18 // 设置缓冲区大小
    19 cfg.setReadBufferSize(4096);
    20 // 设置 session 空闲时间
    21 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 10);
    22 
    23 // 设置 IO 句柄
    24 acceptor.setHandler(new XxxHandler());
    25 acceptor.setReuseAddress(true);
    26 
    27 try {
    28     // 绑定端口
    29     acceptor.bind(new InetSocketAddress("127.0.0.1", 4400));
    30 } catch (Exception ex) {
    31     // 输出错误日志
    32     System.error.println(ex);
    33 }

    目前 Netty 框架要比 MINA 流行的多,而且 Netty 对网络黏包处理也做了很好的处理,不用开发者自己费那么大劲。我也考虑过迁移到 Netty 框架上,不过目前还没有找到特别充分的理由。闲话不多说了,以下就是黏包处理代码:

      1 package com.game.gameServer.framework.mina;
      2 
      3 import java.util.concurrent.ConcurrentHashMap;
      4 
      5 import org.apache.mina.core.buffer.IoBuffer;
      6 import org.apache.mina.core.filterchain.IoFilterAdapter;
      7 import org.apache.mina.core.session.IoSession;
      8 
      9 import com.game.gameServer.framework.FrameworkLog;
     10 import com.game.gameServer.msg.SpecialMsgSerialUId;
     11 import com.game.part.msg.IoBuffUtil;
     12 
     13 /**
     14  * 消息粘包处理
     15  * 
     16  * @author hjj2017
     17  * @since 2014/3/17
     18  * 
     19  */
     20 class MsgCumulativeFilter extends IoFilterAdapter {
     21     /** 
     22      * 从客户端接收的消息估计长度,
     23      * {@value} 字节, 
     24      * 对于从客户端接收的数据来说, 都是简单的命令! 
     25      * 很少超过 {@value}B
     26      * 
     27      */
     28     private static final int DECODE_MSG_LEN = 64;
     29     /** 容器 Buff 字典 */
     30     private static final ConcurrentHashMap<Long, IoBuffer> _containerBuffMap = new ConcurrentHashMap<>();
     31 
     32     @Override
     33     public void sessionClosed(NextFilter nextFilter, IoSession sessionObj) throws Exception {
     34         if (nextFilter == null || 
     35             sessionObj == null) {
     36             // 如果参数对象为空, 
     37             // 则直接退出!
     38             FrameworkLog.LOG.error("null nextFilter or sessionObj");
     39             return;
     40         }
     41 
     42         // 移除容器 Buff
     43         removeContainerBuff(sessionObj);
     44         // 向下传递
     45         super.sessionClosed(nextFilter, sessionObj);
     46     }
     47 
     48     @Override
     49     public void messageReceived(
     50         NextFilter nextFilter, IoSession sessionObj, Object msgObj) throws Exception {
     51         if (nextFilter == null || 
     52             sessionObj == null) {
     53             // 如果参数对象为空, 
     54             // 则直接退出!
     55             FrameworkLog.LOG.error("null nextFilter or sessionObj");
     56             return;
     57         }
     58 
     59         // 获取会话 UId
     60         long sessionUId = sessionObj.getId();
     61 
     62         if (!(msgObj instanceof IoBuffer)) {
     63             // 如果消息对象不是 ByteBuff, 
     64             // 则直接向下传递!
     65             FrameworkLog.LOG.warn("msgObj is not a IoBuff, sessionUId = " + sessionUId);
     66             super.messageReceived(nextFilter, sessionObj, msgObj);
     67         }
     68 
     69         // 获取输入 Buff
     70         IoBuffer inBuff = (IoBuffer)msgObj;
     71 
     72         if (!inBuff.hasRemaining()) {
     73             // 如果没有剩余内容, 
     74             // 则直接退出!
     75             FrameworkLog.LOG.error("inBuff has not remaining, sessionUId = " + sessionUId);
     76             return;
     77         } else if (inBuff.remaining() <= 8) {
     78             // 如果 <= 8 字节, 
     79             // 那还是执行粘包处理过程吧 ...
     80             // 8 字节 = 消息长度 ( Short ) + 消息类型 ( Short ) + 时间戳 ( Int )
     81             // 如果比这个长度都小, 
     82             // 那肯定不是一条完整消息 ...
     83             this.msgRecv_0(nextFilter, sessionObj, inBuff);
     84             return;
     85         }
     86 
     87         // 获取消息长度
     88         final int msgSize = inBuff.getShort();
     89         inBuff.position(0);
     90 
     91         if (msgSize == inBuff.limit() && 
     92             containerBuffIsEmpty(sessionObj)) {
     93             // 
     94             // 如果消息长度和极限值刚好相同, 
     95             // 并且容器 Buff 中没有任何内容 ( 即, 上一次消息没有粘包 ),
     96             // 那么直接向下传递!
     97             // 
     98             super.messageReceived(
     99                 nextFilter, sessionObj, inBuff
    100             );
    101         } else {
    102             // 
    103             // 如果消息长度和极限值不同, 
    104             // 则说明是网络粘包!
    105             // 这时候跳转到粘包处理过程 ...
    106             // 
    107             this.msgRecv_0(nextFilter, sessionObj, inBuff);
    108         }
    109     }
    110 
    111     /**
    112      * 接收连包消息
    113      * 
    114      * @param nextFilter
    115      * @param sessionObj
    116      * @param inBuff
    117      * @throws Exception 
    118      * 
    119      */
    120     private void msgRecv_0(
    121         NextFilter nextFilter, IoSession sessionObj, IoBuffer inBuff) throws Exception {
    122         if (nextFilter == null || 
    123             sessionObj == null) {
    124             // 如果参数对象为空, 
    125             // 则直接退出!
    126             FrameworkLog.LOG.error("null nextFilter or sessionObj");
    127             return;
    128         }
    129 
    130         // 获取会话 UId
    131         long sessionUId = sessionObj.getId();
    132         // 获取容器 Buff
    133         IoBuffer containerBuff = getContainerBuff(sessionObj);
    134 
    135         // 添加新 Buff 到容器 Buff 的末尾
    136         IoBuffUtil.append(containerBuff, inBuff);
    137         // 令 position = 0
    138         containerBuff.position(0);
    139 
    140 //        // 记录调试信息
    141 //        FrameworkLog.LOG.debug("
    in = [ " + inBuff.getHexDump() + " ]");
    142 
    143         for (int i = 0; ; i++) {
    144 //            // 记录调试信息
    145 //            FrameworkLog.LOG.debug(
    146 //                "i = " + i 
    147 //                + "
    co = [ " + containerBuff.getHexDump() + " ]"
    148 //                + "
    co.pos = " + containerBuff.position() 
    149 //                + "
    co.lim = " + containerBuff.limit()
    150 //            );
    151 
    152             if (containerBuff.remaining() < 4) {
    153                 // 
    154                 // 如果剩余字节数 < 4, 
    155                 // 这样根本无法识别出消息类型 msgSerialUId ...
    156                 // 直接退出!
    157                 // 在退出前, 
    158                 // 准备好接收下一次消息!
    159                 // 
    160                 IoBuffUtil.readyToNext(containerBuff);
    161                 return;
    162             }
    163 
    164             // 获取原始位置
    165             final int oldPos = containerBuff.position();
    166             // 获取消息长度和类型
    167             final int msgSize = containerBuff.getShort();
    168             final int msgSerialUId = containerBuff.getShort();
    169 
    170 //            // 记录调试信息
    171 //            FrameworkLog.LOG.debug(
    172 //                "i = " + i 
    173 //                + "
    msgSize = " + msgSize
    174 //                + "
    msgSerialUId = " + msgSerialUId
    175 //            );
    176 
    177             // 还原原始位置
    178             containerBuff.position(oldPos);
    179 
    180             if (msgSerialUId == SpecialMsgSerialUId.CG_FLASH_POLICY || 
    181                 msgSerialUId == SpecialMsgSerialUId.CG_QQ_TGW) {
    182                 // 
    183                 // 如果是 Flash 安全策略消息, 
    184                 // 或者是腾讯网关消息, 
    185                 // 则尝试找一下 0 字节的位置 ...
    186                 // 
    187                 int pos0 = IoBuffUtil.indexOf(containerBuff, (byte)0);
    188 
    189                 if (pos0 <= -1) {
    190                     // 如果找不到 0 字节的位置, 
    191                     // 则说明消息还没接收完, 
    192                     // 准备接受下次消息并直接退出!
    193                     IoBuffUtil.readyToNext(containerBuff);
    194                     return;
    195                 }
    196 
    197                 // 复制 Buff 内容
    198                 containerBuff.position(0);
    199                 IoBuffer realBuff = IoBuffUtil.copy(containerBuff, pos0);
    200 
    201                 // 更新 Buff 位置
    202                 final int newPos = containerBuff.position() + pos0;
    203                 containerBuff.position(newPos);
    204                 // 压缩容器 Buff
    205                 IoBuffUtil.compact(containerBuff);
    206 
    207                 // 向下传递
    208                 super.messageReceived(
    209                     nextFilter, sessionObj, realBuff
    210                 );
    211                 continue;
    212             }
    213 
    214             if (msgSize <= 0) {
    215                 // 
    216                 // 如果消息长度 <= 0, 
    217                 // 则直接退出!
    218                 // 这种情况可能是消息已经乱套了 ...
    219                 // 还是重新来过吧!
    220                 // 
    221                 FrameworkLog.LOG.error("i = " + i + ", msgSize = " + msgSize + ", sessionUId = " + sessionUId);
    222                 // 将容器 Buff 内容清空
    223                 containerBuff.position(0);
    224                 containerBuff.flip();
    225                 // 压缩容器 Buff
    226                 IoBuffUtil.compact(containerBuff);
    227                 return;
    228             }
    229 
    230             if (containerBuff.remaining() < msgSize) {
    231                 // 
    232                 // 如果消息长度不够, 
    233                 // 则可能是出现网络粘包情况了 ...
    234                 // 直接退出就可以了!
    235                 // 
    236                 FrameworkLog.LOG.warn(
    237                     "i = " + i
    238                     + ", msgSize = " + msgSize 
    239                     + ", containerBuff.remaining = " + containerBuff.remaining()
    240                     + ", sessionUId = " + sessionUId
    241                 );
    242 
    243                 // 准备接受下一次消息
    244                 IoBuffUtil.readyToNext(containerBuff);
    245                 return;
    246             }
    247 
    248             // 创建新 Buff 并复制字节内容
    249             IoBuffer realBuff = IoBuffUtil.copy(containerBuff, msgSize);
    250 
    251             if (realBuff == null) {
    252                 // 
    253                 // 如果真实的 Buff 为空, 
    254                 // 则直接退出!
    255                 // 这种情况可能也是消息乱套了 ...
    256                 // 记录一下错误信息
    257                 // 
    258                 FrameworkLog.LOG.error("i = " + i + ", null realBuff, sessionUId = " + sessionUId);
    259             } else {
    260 //                // 记录调试信息
    261 //                FrameworkLog.LOG.debug(
    262 //                    "i = " + i
    263 //                    + "
    real = [ " + realBuff.getHexDump() + " ]"
    264 //                    + "
    real.pos = " + realBuff.position()
    265 //                    + "
    real.lim = " + realBuff.limit()
    266 //                );
    267 
    268                 // 向下传递
    269                 super.messageReceived(
    270                     nextFilter, sessionObj, realBuff
    271                 );
    272             }
    273 
    274             // 更新位置
    275             containerBuff.position(containerBuff.position() + msgSize);
    276             // 压缩容器 Buff
    277             IoBuffUtil.compact(containerBuff);
    278         }
    279     }
    280     
    281     /**
    282      * 获取玩家的 Buff, 如果为空则新建一个!
    283      * 
    284      * @param sessionObj
    285      * @return 
    286      * 
    287      */
    288     private static IoBuffer getContainerBuff(IoSession sessionObj) {
    289         if (sessionObj == null) {
    290             // 如果参数对象为空, 
    291             // 则直接退出!
    292             return null;
    293         }
    294 
    295         // 获取会话 UId
    296         long sessionUId = sessionObj.getId();
    297         // 获取容器 Buff
    298         IoBuffer containerBuff = _containerBuffMap.get(sessionUId);
    299 
    300         if (containerBuff == null) {
    301             // 创建缓存 Buff
    302             containerBuff = IoBuffer.allocate(DECODE_MSG_LEN);
    303             containerBuff.setAutoExpand(true);
    304             containerBuff.setAutoShrink(true);
    305             containerBuff.position(0);
    306             containerBuff.flip();
    307             // 缓存  Buff 对象
    308             Object oldVal = _containerBuffMap.putIfAbsent(sessionUId, containerBuff);
    309 
    310             if (oldVal != null) {
    311                 FrameworkLog.LOG.warn("exists oldVal");
    312             }
    313         }
    314 
    315         return containerBuff;
    316     }
    317 
    318     /**
    319      * 移除容器 Buff
    320      * 
    321      * @param sessionObj
    322      * 
    323      */
    324     private static void removeContainerBuff(IoSession sessionObj) {
    325         if (sessionObj == null) {
    326             // 如果参数对象为空, 
    327             // 则直接退出!
    328             return;
    329         }
    330 
    331         // 获取会话 UId
    332         long sessionUId = sessionObj.getId();
    333         // 获取容器 Buff
    334         IoBuffer containerBuff = _containerBuffMap.get(sessionUId);
    335 
    336         if (containerBuff != null) {
    337             // 是否所占资源
    338             containerBuff.clear();
    339         }
    340 
    341         // 移除玩家的 Buff 对象
    342         _containerBuffMap.remove(sessionUId);
    343     }
    344 
    345     /**
    346      * 容器 Buff 为空 ?
    347      * 
    348      * @param sessionObj
    349      * @return 
    350      * 
    351      */
    352     private static boolean containerBuffIsEmpty(IoSession sessionObj) {
    353         if (sessionObj == null) {
    354             // 如果参数对象为空, 
    355             // 则直接退出!
    356             return false;
    357         }
    358 
    359         // 获取容器 Buff
    360         IoBuffer containerBuff = getContainerBuff(sessionObj);
    361 
    362         if (containerBuff == null) {
    363             // 如果容器为空, 
    364             // 则直接退出!
    365             FrameworkLog.LOG.error("null containerBuff, sessionUId = " + sessionObj.getId());
    366             return false;
    367         } else {
    368             // 如果当前位置和极限值都为 0, 
    369             // 则判定为空!
    370             return (containerBuff.position() == 0 
    371                  && containerBuff.limit() == 0);
    372         }
    373     }
    374 }
  • 相关阅读:
    浅谈Chrome V8引擎中的垃圾回收机制
    selenium反爬机制
    03 HTTP协议与HTTPS协议
    HTTP缓存机制和原理
    python 自动发送邮件
    02 Anaconda的介绍,安装记以及使用
    01 关于jupyter的环境安装
    SQLAlchemy
    django-debug-toolbar
    flask 第十篇 after_request before_request
  • 原文地址:https://www.cnblogs.com/afritxia2008/p/4620423.html
Copyright © 2011-2022 走看看