zoukankan      html  css  js  c++  java
  • [编织消息框架][分层模型设计]系统与应用

    上篇有介绍过,对象设计按开发者角度划分设计

    为什么不直全部实现不走QRpc?

    开始设计时没考虑到rpc功能,后面才补上,加上rpc有二级业务模型,要进行二次解释,有性能损耗,所以干脆不改了

    用两幅图来解释

    QPacket是个吸血模型(相对贫血模型来讲 如 java bean对象只有getter setter 方法),大多数逻辑实现写在该对象上,如转换成应用层模型,包编解码

     1 /**
     2  * @author solq
     3  **/
     4 @Target(TYPE)
     5 @Retention(RUNTIME)
     6 public @interface QOpCode {
     7     short value();
     8 
     9     public static final short QPRODUCE = 1;
    10     public static final short QCONSUME = 2;
    11     public static final short QSUBSCIBE = 3;
    12     public static final short QCODE = 4;
    13     public static final short QRPC = 5;
    14 
    15 }
    QOpCode
      1 /***
      2  * 包 格式 [sn] + [c] + [b] +[sid]
      3  * 
      4  * c [0000 1111] 16个协议
      5  * 
      6  * @author solq
      7  */
      8 public class QPacket implements IRecycle, IByte {
      9 
     10     /**
     11      * 包固定长度
     12      */
     13     public final static int PACK_FIXED_LENG = Long.BYTES + Short.BYTES + Long.BYTES;
     14     /**
     15      * 响应掩码 [0001 0000]
     16      */
     17     public final static short MASK_RESPONSE = 0x10;
     18 
     19     /**
     20      * 压缩掩码[0010 0000]
     21      */
     22     public final static short MASK_COMPRESS = 0x20;
     23 
     24     public final static int MASK_OPCODE = MASK_RESPONSE | MASK_COMPRESS;
     25 
     26     /** 序号 用于包ID,解决冥等 **/
     27     private long sn;
     28     /** sessionId **/
     29     private long sid;
     30     /** opCode **/
     31     private short c;
     32     /** 内容 **/
     33     private byte[] b;
     34     /** 临时数据 **/
     35     private Object tmpData;
     36 
     37     @Override
     38     public void recycle() {
     39     b = null;
     40     tmpData = null;
     41     }
     42 
     43     @Override
     44     public int toSize() {
     45     return QPacket.PACK_FIXED_LENG + b.length;
     46     }
     47 
     48     public int writeBytes(ByteBuf byteBuf) {
     49     byteBuf.writeLong(sn);
     50     byteBuf.writeShort(c);
     51     byteBuf.writeBytes(b);
     52     byteBuf.writeLong(sid);
     53     return toSize();
     54     }
     55 
     56     public void writeToByteBuf(ByteBuf byteBuf) {
     57     final int packetLen = toSize();
     58     byteBuf.writeShort(QMConfig.getInstance().getPacketHeadFlag(packetLen));
     59     byteBuf.writeInt(packetLen);
     60     writeBytes(byteBuf);
     61     byteBuf.writeByte(QMConfig.getInstance().getPacketEndFlag(packetLen));
     62     }
     63 
     64     /////////////////// toObject///////////////////////
     65     @Override
     66     public byte[] toBytes() {
     67     final int len = toSize();
     68     byte[] ret = new byte[len];
     69     int offset = 0;
     70     PacketUtil.writeLong(offset, sn, ret);
     71     PacketUtil.writeShort(offset += Long.BYTES, c, ret);
     72     PacketUtil.writeBytes(offset += Short.BYTES, b, ret);
     73     PacketUtil.writeLong(offset += b.length, sid, ret);
     74     return ret;
     75     }
     76 
     77     public QProduce toProduce() {
     78     QProduce ret = toObject();
     79     if (ret == null) {
     80         ret = QProduce.of(getBytes());
     81         tmpData = ret;
     82     }
     83     return ret;
     84     }
     85 
     86     public QConsume toConsume() {
     87     QConsume ret = toObject();
     88     if (ret == null) {
     89         ret = QConsume.byte2Object(getBytes());
     90         tmpData = ret;
     91     }
     92     return ret;
     93     }
     94 
     95     public Collection<QSubscribe> toSubscribe() {
     96     Collection<QSubscribe> ret = toObject();
     97     if (ret == null) {
     98         ret = SerialUtil.readValue(getBytes(), SerialUtil.subType);
     99         tmpData = ret;
    100     }
    101     return ret;
    102     }
    103 
    104     public QRpc toRpc() {
    105     QRpc ret = toObject();
    106     if (ret == null) {
    107         ret = QRpc.toObject(getBytes());
    108         tmpData = ret;
    109     }
    110     return ret;
    111     }
    112 
    113     public short toCode() {
    114     Short ret = toObject();
    115     if (ret == null) {
    116         ret = PacketUtil.readShort(0, b);
    117         tmpData = ret;
    118     }
    119     return ret;
    120     }
    121 
    122     @SuppressWarnings("unchecked")
    123     <T> T toObject() {
    124     return (T) tmpData;
    125     }
    126 
    127     byte[] getBytes() {
    128     byte[] bytes = null;
    129     if (hasStatus(MASK_COMPRESS)) {
    130         bytes = SerialUtil.unZip(b);
    131     } else {
    132         bytes = b;
    133     }
    134     return bytes;
    135     }
    136 
    137     public void responseCode(short code) {
    138     c = QOpCode.QCODE;
    139     b = new byte[2];
    140     tmpData = code;
    141     PacketUtil.writeShort(0, code, b);
    142     }
    143 
    144     public void setStatus(short value) {
    145     c |= value;
    146     }
    147 
    148     public boolean hasStatus(short value) {
    149     return (c & value) == value;
    150     }
    151 
    152     // static
    153     @SuppressWarnings("unchecked")
    154     public static QPacket of(Object data) {
    155     if (data instanceof QProduce) {
    156         return of((QProduce) data);
    157     }
    158     if (data instanceof QRpc) {
    159         return of((QRpc) data);
    160     }
    161     if (data instanceof QConsume) {
    162         return of((QConsume) data);
    163     }
    164     if (data instanceof byte[]) {
    165         return of((byte[]) data);
    166     }
    167     if (data instanceof Integer) {
    168         return of((Integer) data);
    169     }
    170     if (data instanceof QPacket) {
    171         return (QPacket) data;
    172     }
    173     if (TypeUtils.isAssignable(data.getClass(), SerialUtil.subType.getType())) {
    174         return of((Collection<QSubscribe>) data);
    175     }
    176     throw new RuntimeException("未支持类型 :" + data.getClass());
    177     }
    178 
    179     public static QPacket of(ByteBuf byteBuf, int packetLen) {
    180     long sn = byteBuf.readLong();
    181     short c = byteBuf.readShort();
    182     byte[] b = new byte[packetLen - QPacket.PACK_FIXED_LENG];
    183     byteBuf.readBytes(b);
    184     long sid = byteBuf.readLong();
    185     return of(c, sn, sid, null, b);
    186     }
    187 
    188     public static QPacket of(byte[] bytes) {
    189     int offset = 0;
    190     long sn = PacketUtil.readLong(offset, bytes);
    191     short c = PacketUtil.readShort(offset += Long.BYTES, bytes);
    192     byte[] b = PacketUtil.readBytes(offset += Short.BYTES, bytes.length - QPacket.PACK_FIXED_LENG, bytes);
    193     long sid = PacketUtil.readLong(offset += b.length, bytes);
    194     return of(c, sn, sid, null, b);
    195     }
    196 
    197     public static QPacket of(short code) {
    198     byte[] b = new byte[2];
    199     PacketUtil.writeShort(0, code, b);
    200     long sn = PacketUtil.getSn();
    201     return of(QOpCode.QCODE, sn, -1, null, b);
    202     }
    203 
    204     public static QPacket of(QRpc obj) {
    205     byte[] b = obj.toBytes();
    206     long sn = PacketUtil.getSn();
    207     return of(QOpCode.QRPC, sn, -1, null, b);
    208     }
    209 
    210     public static QPacket of(QProduce obj) {
    211     byte[] b = obj.toBytes();
    212     long sn = PacketUtil.getSn();
    213     return of(QOpCode.QPRODUCE, sn, -1, null, b);
    214     }
    215 
    216     public static QPacket of(QConsume obj) {
    217     byte[] b = obj.toBytes();
    218     long sn = PacketUtil.getSn();
    219     return of(QOpCode.QCONSUME, sn, -1, null, b);
    220     }
    221 
    222     public static QPacket of(Collection<QSubscribe> obj) {
    223     byte[] b = SerialUtil.writeValueAsBytes(obj);
    224     long sn = PacketUtil.getSn();
    225     return of(QOpCode.QSUBSCIBE, sn, -1, null, b);
    226     }
    227 
    228     public static QPacket of(short c, long sn, long sid, Object value, byte[] body) {
    229     QPacket ret = new QPacket();
    230     ret.c = c;
    231     ret.sn = sn;
    232     ret.sid = sid;
    233     // 未压缩,处理压缩
    234     if (!ret.hasStatus(MASK_COMPRESS) && body.length >= QMConfig.getInstance().COMPRESS_SIZE) {
    235         body = SerialUtil.zip(body);
    236         ret.setStatus(MASK_COMPRESS);
    237     }
    238     ret.b = body;
    239     ret.tmpData = value;
    240     return ret;
    241     }
    242 
    243     // getter
    244 
    245     public short getC() {
    246     return (short) (c & ~QPacket.MASK_OPCODE);
    247     }
    248 
    249 
    250     public long getSid() {
    251     return sid;
    252     }
    253 
    254     public long getSn() {
    255     return sn;
    256     }
    257 
    258     public byte[] getB() {
    259     return b;
    260     }
    261 
    262     public void setSid(long sid) {
    263     this.sid = sid;
    264     }
    265 
    266 }
    QPacket
     1 public class QConsume implements IRecycle {
     2     /**
     3      * 最后读取指针记录
     4      */
     5     private long o;
     6     /**
     7      * 数据
     8      */
     9     private Object[] b;
    10     /**
    11      * topic
    12      */
    13     private String t;
    14     /**
    15      * raw数据
    16      */
    17     private byte[] r;
    18 }
    QConsume
     1 /***
     2  * 属性名采取最少字母命名,减少通信跟存储 生产消息对象
     3  * 临时对象,负责 业务与qpacket数据交互
     4  * @author solq
     5  */
     6 @QOpCode(QOpCode.QPRODUCE)
     7 public class QProduce implements IRecycle {
     8     /** 订阅 **/
     9     private String t;
    10     /** 内容信息 **/
    11     private Object[] b;
    12     /** 作用本地查询 **/
    13     @JsonIgnore
    14     private long offset;
    15 }
    QProduce
     1 public class QSubscribe {
     2     /**
     3      * topic
     4      */
     5     private String topic;
     6     /**
     7      * groudId
     8      */
     9     private String groupId;
    10 }
    QSubscribe
     1 /**
     2  * <p>
     3  * 2byte model 1byte command + 4byte indexs len + indexs + params + 1byte
     4  * compress
     5  * </p>
     6  * 
     7  * @author solq
     8  */
     9 public class QRpc implements IRecycle, IByte {
    10     /**模块号**/
    11     private short model;
    12     /**方法命令编号**/
    13     private byte command;
    14     /**参数索引**/
    15     private byte[] indexs;
    16     /**参数**/
    17     private byte[] params;
    18 }
    QRpc
      public static QPacket of(short c, long sn, long sid, Object value, byte[] body) {
        QPacket ret = new QPacket();
        ret.c = c;
        ret.sn = sn;
        ret.sid = sid;
        // 未压缩,处理压缩 当内容长度大于配置就启动压缩处理
        if (!ret.hasStatus(MASK_COMPRESS) && body.length >= QMConfig.getInstance().COMPRESS_SIZE) {
            body = SerialUtil.zip(body);
            ret.setStatus(MASK_COMPRESS);
        }
        ret.b = body;
        ret.tmpData = value;
        return ret;
      }
  • 相关阅读:
    01-初学总结之《谭浩强C程序设计》
    00-计算机经典参考书籍
    (转)android图片压缩总结
    am等adb命令小总结
    (原创)在service中定时执行网络操作的几点说明
    (转)访问者模式
    (原创)用Receiver和SystemService监听网络状态,注册Receiver的两种方式
    (原创)Activity启动模式之singleTask
    (原创)开发微信公众平台遇到的乱码等问题的解决
    (转载)XML解析之-XStream解析
  • 原文地址:https://www.cnblogs.com/solq111/p/6550100.html
Copyright © 2011-2022 走看看