zoukankan      html  css  js  c++  java
  • fast协议解读

    背景

    股票行情一般传输的数据类型为: int / long / float /double / string 来表示行情价格成交量之类的数据。
    正常传输过程中,都是使用tag=value的方式。 如1=date(标号1代表日期) 2=openPrice(2表示开盘价格) 等等, 在解析每个字段之前需要先解析这个字段标号,然后通过这个标号能够从提前约定的字段(一般编码端和解码端都有一个xml模板类似的约定配置文件)对应类型来解析这个字段。

    前提约定:
    tag : 1->日期 2->时间 3->开盘价 4->最高价 5->最低价 6->当前价。 其中tag为short类型,即2个字节
    日期、时间为int; 开高低收为float ,保留3位小数

    样例数据:
    1=20190310 , 2=142900 , 3=13.4 , 4=15.0 ,5=13.0 ,6=13.5

    不使用fast协议来传输需要的字节数: tag占用字节(6*2) + value(4 + 4 + 4 +4 + 4 + 4)=36字节

    同样的数据,如果使用fast协议传输需要字节数: tag(6*1) +value(4 + 3 + 3+ 3+ 3)=29字节

    fast协议特征

    基本特征:

    1. 每个字段中所有的byte的最高位用0表示当前字节属于该字段,用1表示这是该字段的最后一个字节(停止位特征),byte流和unicode字符串流数据部分不使用停止位特征
    2. fast协议传输过程中不会传输float/double类型的数据,而是将其根据小数位【具体每个字段小数位数在模板配置文件中约定】扩展成数字类型。
    3. 数字类型在传输过程中,可以为1,2,3,4,5,6,7,8,9,10个字节,具体要根据是否为有符号、无符号、以及数字的范围来具体确定占用几个字节
    4. 在传输数字时,如果涉及到有符号数的时候,第一个字节的第2位用来表示符号,0表示正数,1表示负数。
    5. 在传输ascii编码类型的时候,占用1个字节。ascii编码本身第一位为0,,所以一个字节是符合fast协议规定的。
    6. 在传输unicode编码类型的时候,使用(size,真正数据)来传输,size代表数据真正占用的字节数。
    7. 在传输byte流的时候,和unicode编码一样,也使用(size,真正数据)来传输。
    8. 在传输unicode 和byte流的时候不使用停止位特征,即每个字节的最高位为真实数据。数据长度字段仍然使用停止位特征

    下面代码出自:openfast-1.1.1

    fast协议解读

    停止位

    org.openfast.template.type.codec

        /**
         * 数据编码成功后,将最后一个字节的首位设置成1,即为停止位
         * 
         * */
        public byte[] encode(ScalarValue value) {
            byte[] encoding = encodeValue(value);
            encoding[encoding.length - 1] |= 0x80;
            return encoding;
        }
    
    

    有符号数编码类

    类:org.openfast.template.type.codec.SignedInteger

        /**
         * 编码方法
         * */
        public byte[] encodeValue(ScalarValue value) {
            long longValue = ((NumericValue) value).toLong();
            int size = getSignedIntegerSize(longValue);
            byte[] encoding = new byte[size];
            //组装数据,即每个字节第一位不表示数据;组装完成后仍然是大端序列(低字节位为值得高有效位)
            for (int factor = 0; factor < size; factor++) {
                //0x3f = 0011 1111
                //0x7f = 0111 1111
                int bitMask = (factor == (size - 1)) ? 0x3f : 0x7f;
                encoding[size - factor - 1] = (byte) ((longValue >> (factor * 7)) & bitMask);
            }
            // Get the sign bit from the long value and set it on the first byte
            // 01000000 00000000 ... 00000000
            // ^----SIGN BIT
            //将第一个字节的第二位设置为符号位, 0表示正数;1表示负数
            encoding[0] |= (0x40 & (longValue >> 57));
            return encoding;
        }
    
        /**
         * 解码方法
         * */
        public ScalarValue decode(InputStream in) {
            long value = 0;
            try {
                // IO read方法如果返回小于-1的时候,表示结束;正常范围0-255
                int byt = in.read();
                if (byt < 0) {
                    Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached.");
                    return null; // short circuit if global error handler does not throw exception
                }
                //通过首字节的第二位与运算,确认该数据的符号
                if ((byt & 0x40) > 0) { 
                    value = -1;
                }
                //到此,value的符号已经确定,
                //value=0 则该数为负数, value= -1该数为正数
                // int value = -1   16进制为 0xFF FF FF FF
                // int value = 0    16进制为 0x00 00 00 00
                //下面的只是通过位操作来复原真实的数据
                value = (value << 7) | (byt & 0x7f);  //(value << 7)确保最后7位为0;     (byt & 0x7f) 还是byt
                while ((byt & 0x80) == 0) {  //根据第一位来判断当前byte是否属于这个字段
                    byt = in.read();
                    if (byt < 0) {
                        Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached.");
                        return null; // short circuit if global error handler does not throw exception
                    }
                    value = (value << 7) | (byt & 0x7f); //先把有效位往左移7位,然后再处理当前的七位
                }
            } catch (IOException e) {
                Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e);
                return null; // short circuit if global error handler does not throw exception
            }
            return createValue(value);
        }
    
        /**
         * 判断无符号数所要占用的字节数
         * */
        public static int getUnsignedIntegerSize(long value) {
            if (value < 128) {
                return 1; // 2 ^ 7
            }
            if (value <= 16384) {
                return 2; // 2 ^ 14
            }
            if (value <= 2097152) {
                return 3; // 2 ^ 21
            }
            if (value <= 268435456) {
                return 4; // 2 ^ 28
            }
            if (value <= 34359738368L) {
                return 5; // 2 ^ 35
            }
            if (value <= 4398046511104L) {
                return 6; // 2 ^ 42
            }
            if (value <= 562949953421312L) {
                return 7; // 2 ^ 49
            }
            if (value <= 72057594037927936L) {
                return 8; // 2 ^ 56
            }
            return 9;
        }
    
        /**
         * 判断有符号数需要占用的字节
         * */
        public static int getSignedIntegerSize(long value) {
            if ((value >= -64) && (value <= 63)) {
                return 1; // - 2 ^ 6 ... 2 ^ 6 -1
            }
            if ((value >= -8192) && (value <= 8191)) {
                return 2; // - 2 ^ 13 ... 2 ^ 13 -1
            }
            if ((value >= -1048576) && (value <= 1048575)) {
                return 3; // - 2 ^ 20 ... 2 ^ 20 -1
            }
            if ((value >= -134217728) && (value <= 134217727)) {
                return 4; // - 2 ^ 27 ... 2 ^ 27 -1
            }
            if ((value >= -17179869184L) && (value <= 17179869183L)) {
                return 5; // - 2 ^ 34 ... 2 ^ 34 -1
            }
            if ((value >= -2199023255552L) && (value <= 2199023255551L)) {
                return 6; // - 2 ^ 41 ... 2 ^ 41 -1
            }
            if ((value >= -281474976710656L) && (value <= 281474976710655L)) {
                return 7; // - 2 ^ 48 ... 2 ^ 48 -1
            }
            if ((value >= -36028797018963968L) && (value <= 36028797018963967L)) {
                return 8; // - 2 ^ 55 ... 2 ^ 55 -1
            }
            if ((value >= -4611686018427387904L && value <= 4611686018427387903L)) {
                return 9;
            }
            return 10;
        }
    
    
    
    

    无符号数编码类

    org.openfast.template.type.codec.UnsignedInteger

    
        /**
         * 编码方法
         * */
        public byte[] encodeValue(ScalarValue scalarValue) {
            long value = scalarValue.toLong();
            int size = getUnsignedIntegerSize(value);
            byte[] encoded = new byte[size];
            for (int factor = 0; factor < size; factor++) {
                encoded[size - factor - 1] = (byte) ((value >> (factor * 7)) & 0x7f);
            }
            return encoded;
        }
    
        /**
         * 
         * 解码方法
         * */
        public ScalarValue decode(InputStream in) {
            long value = 0;
            int byt;
            try {
                do {
                    byt = in.read();
                    if (byt < 0) {
                        Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached.");
                        return null; // short circuit if global error handler does not throw exception
                    }
                    value = (value << 7) | (byt & 0x7f);
                } while ((byt & 0x80) == 0);
            } catch (IOException e) {
                Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e);
                return null; // short circuit if global error handler does not throw exception
            }
            return createValue(value);
        }
    
    

    AsciiString编码类

    org.openfast.template.type.codec.AsciiString

    
        public byte[] encodeValue(ScalarValue value) {
            if ((value == null) || value.isNull()) {
                throw new IllegalStateException("Only nullable strings can represent null values.");
            }
            String string = value.toString();
            if ((string != null) && (string.length() == 0)) {
                return TypeCodec.NULL_VALUE_ENCODING;
            }
            if (string.startsWith(ZERO_TERMINATOR)) {
                return ZERO_PREAMBLE;
            }
            return string.getBytes();
        }
    
        public ScalarValue decode(InputStream in) {
            int byt;
            ByteArrayOutputStream buffer = Global.getBuffer();
            try {
                do {
                    byt = in.read();
                    if (byt < 0) {
                        Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached.");
                        return null; // short circuit if global error handler does not throw exception
                    }
                    buffer.write(byt);
                } while ((byt & 0x80) == 0);
            } catch (IOException e) {
                Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e);
                return null; // short circuit if global error handler does not throw exception
            }
            byte[] bytes = buffer.toByteArray();
            //复原最后一个字节为真实数据
            bytes[bytes.length - 1] &= 0x7f;
            if (bytes[0] == 0) {
                if (!ByteUtil.isEmpty(bytes))
                    Global.handleError(FastConstants.R9_STRING_OVERLONG, null);
                if (bytes.length > 1 && bytes[1] == 0)
                    return new StringValue("u0000");
                return new StringValue("");
            }
            return new StringValue(new String(bytes));
        }
    
    

    字节流编码类

    org.openfast.template.type.codec.ByteVectorType

    注意:字节流类型不使用停止位

    
        public byte[] encode(ScalarValue value) {
            byte[] bytes = value.getBytes();
            int lengthSize = IntegerCodec.getUnsignedIntegerSize(bytes.length);
            byte[] encoding = new byte[bytes.length + lengthSize];
            byte[] length = TypeCodec.UINT.encode(new IntegerValue(bytes.length));
            //数据流所占长度
            System.arraycopy(length, 0, encoding, 0, lengthSize);
            //数据
            System.arraycopy(bytes, 0, encoding, lengthSize, bytes.length);
            return encoding;
        }
    
    
        public ScalarValue decode(InputStream in) {
            //解析字节流的长度
            int length = ((IntegerValue) TypeCodec.UINT.decode(in)).value;
            byte[] encoding = new byte[length];
            //读取字节流
            for (int i = 0; i < length; i++)
                try {
                    int nextByte = in.read();
                    if (nextByte < 0) {
                        Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached.");
                        return null; // short circuit if global error handler does not throw exception
                    }
                    encoding[i] = (byte) nextByte;
                } catch (IOException e) {
                    Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e);
                    return null; // short circuit if global error handler does not throw exception
                }
            return new ByteVectorValue(encoding);
        }
    
    

    Unicode字符串类型编码类

    org.openfast.template.type.codec.UnicodeString

    这个类型和上面的字节流编码类逻辑一样。

  • 相关阅读:
    python 对比学习
    支付宝
    springboot logback
    ngnix学习视频
    node学习
    puppeteer 相关知识
    Dota2App--第三天
    Dota2APP--第二天
    Dota2APP--第一天
    iOS ---进阶之摇一摇
  • 原文地址:https://www.cnblogs.com/henuzyy/p/10506093.html
Copyright © 2011-2022 走看看