zoukankan      html  css  js  c++  java
  • Redis源码分析(十二)--- redis-check-dump本地数据库检测

           这个文件我在今天分析学习的时候,一直有种似懂非懂的感觉,代码量700+的代码,最后开放给系统的就是一个process()方法。这里说的说的数据库检测,是针对key的检测,会用到,下面提到的结构体:

    /* Data type to hold opcode with optional key name an success status */
    /* 用于key的检测时使用,后续检测操作都用到了entry结构体 */
    typedef struct {
    	//key的名字
        char* key;
        //类型
        int type;
        //是否是成功状态
        char success;
    } entry;

    后续所涉及到的很多API都是与这个结构体相关,此代码最终检测的其实是一个叫dump.rdb的文件,在检测的后面还会加上循环冗余校验CRC64。下面亮出API:

    int checkType(unsigned char t) /* 每当添加一个新的obj类型时,都要检测这个类型是否合理 */
    int readBytes(void *target, long num) /* 在当前文件偏移量位置往后读取num个字节位置 */
    int processHeader(void) /* 读取快照文件的头部,检测头部名称或版本号是否正确 */
    int loadType(entry *e) /* 为entry赋上obj的Type */
    int peekType() /* 弹出版本号 */
    int processTime(int type) /* 去除用来表示时间的字节 */
    uint32_t loadLength(int *isencoded) /* 分type读取长度 */
    char *loadIntegerObject(int enctype) /* 根据当前整型的编码方式,获取数值,以字符形式返回 */
    char* loadLzfStringObject() /* 获得解压后的字符串 */
    char* loadStringObject() /* 获取当前文件信息字符串对象 */
    int processStringObject(char** store) /* 将字符串对象赋给所传入的参数 */
    double* loadDoubleValue() /* 文件中读取double类型值 */
    int processDoubleValue(double** store) /* 对double类型进行赋予给参数 */
    int loadPair(entry *e) /* 读取键值对 */
    entry loadEntry() /* 获取entry的key结构体 */
    void printCentered(int indent, int width, char* body) /* 输出界面对称的信息 */
    void printValid(uint64_t ops, uint64_t bytes) /* 输出有效信息 */
    void printSkipped(uint64_t bytes, uint64_t offset) /* 输出Skipped跳过bytes字节信息 */
    void printErrorStack(entry *e) /* 输出错误栈的信息 */
    void process(void) /* process方法是执行检测的主要方法 */
    

    方法里面好多loadXXX()方法,这几个load方法的确比较有用,在这个检测文件中,编写者又很人性化的构造了error的结构体,用于模拟错误信息栈的输出。

    /* Hold a stack of errors */
    /* 错误信息结构体 */
    typedef struct {
    	//具体的错误信息字符串
        char error[16][1024];
        //内部偏移量
        size_t offset[16];
        //错误信息等级
        size_t level;
    } errors_t;
    static errors_t errors;

    不同的level等级对应不同的出错信息。在API里有个比较关键的方法,loadEntry,获取key相关的结构体;

    /* 获取entry的key结构体 */
    entry loadEntry() {
        entry e = { NULL, -1, 0 };
        uint32_t length, offset[4];
    
        /* reset error container */
        errors.level = 0;
    
        offset[0] = CURR_OFFSET;
        //此处赋值type
        if (!loadType(&e)) {
            return e;
        }
    
        offset[1] = CURR_OFFSET;
        if (e.type == REDIS_SELECTDB) {
            if ((length = loadLength(NULL)) == REDIS_RDB_LENERR) {
                SHIFT_ERROR(offset[1], "Error reading database number");
                return e;
            }
            if (length > 63) {
                SHIFT_ERROR(offset[1], "Database number out of range (%d)", length);
                return e;
            }
        } else if (e.type == REDIS_EOF) {
            if (positions[level].offset < positions[level].size) {
                SHIFT_ERROR(offset[0], "Unexpected EOF");
            } else {
                e.success = 1;
            }
            return e;
        } else {
            /* optionally consume expire */
            if (e.type == REDIS_EXPIRETIME ||
                e.type == REDIS_EXPIRETIME_MS) {
                if (!processTime(e.type)) return e;
                if (!loadType(&e)) return e;
            }
    
            offset[1] = CURR_OFFSET;
            //调用loadPair为Entry赋值key
            if (!loadPair(&e)) {
                SHIFT_ERROR(offset[1], "Error for type %s", types[e.type]);
                return e;
            }
        }
    
        /* all entries are followed by a valid type:
         * e.g. a new entry, SELECTDB, EXPIRE, EOF */
        offset[2] = CURR_OFFSET;
        if (peekType() == -1) {
            SHIFT_ERROR(offset[2], "Followed by invalid type");
            SHIFT_ERROR(offset[0], "Error for type %s", types[e.type]);
            e.success = 0;
        } else {
            e.success = 1;
        }
    
        return e;
    }
    

    其中里面的关键的赋值key,value在loadPair()方法:

    /* 读取键值对 */
    int loadPair(entry *e) {
        uint32_t offset = CURR_OFFSET;
        uint32_t i;
    
        /* read key first */
        //首先从文件中读取key值
        char *key;
        if (processStringObject(&key)) {
            e->key = key;
        } else {
            SHIFT_ERROR(offset, "Error reading entry key");
            return 0;
        }
    
        uint32_t length = 0;
        if (e->type == REDIS_LIST ||
            e->type == REDIS_SET  ||
            e->type == REDIS_ZSET ||
            e->type == REDIS_HASH) {
            if ((length = loadLength(NULL)) == REDIS_RDB_LENERR) {
                SHIFT_ERROR(offset, "Error reading %s length", types[e->type]);
                return 0;
            }
        }
    
    	//读取key值后面跟着的value值
        switch(e->type) {
        case REDIS_STRING:
        case REDIS_HASH_ZIPMAP:
        case REDIS_LIST_ZIPLIST:
        case REDIS_SET_INTSET:
        case REDIS_ZSET_ZIPLIST:
        case REDIS_HASH_ZIPLIST:
        	//因为类似ziplist,zipmap等结构体其实是一个个结点连接而成的超级字符串,所以是直接读取
            if (!processStringObject(NULL)) {
                SHIFT_ERROR(offset, "Error reading entry value");
                return 0;
            }
        break;
        case REDIS_LIST:
        case REDIS_SET:
            //而上面这2种是传统的结构,要分结点读取
            for (i = 0; i < length; i++) {
                offset = CURR_OFFSET;
                if (!processStringObject(NULL)) {
                    SHIFT_ERROR(offset, "Error reading element at index %d (length: %d)", i, length);
                    return 0;
                }
            }
        break;
        case REDIS_ZSET:
            for (i = 0; i < length; i++) {
                offset = CURR_OFFSET;
                if (!processStringObject(NULL)) {
                    SHIFT_ERROR(offset, "Error reading element key at index %d (length: %d)", i, length);
                    return 0;
                }
                offset = CURR_OFFSET;
                if (!processDoubleValue(NULL)) {
                    SHIFT_ERROR(offset, "Error reading element value at index %d (length: %d)", i, length);
                    return 0;
                }
            }
        break;
        case REDIS_HASH:
            for (i = 0; i < length; i++) {
                offset = CURR_OFFSET;
                if (!processStringObject(NULL)) {
                    SHIFT_ERROR(offset, "Error reading element key at index %d (length: %d)", i, length);
                    return 0;
                }
                offset = CURR_OFFSET;
                if (!processStringObject(NULL)) {
                    SHIFT_ERROR(offset, "Error reading element value at index %d (length: %d)", i, length);
                    return 0;
                }
            }
        break;
        default:
            SHIFT_ERROR(offset, "Type not implemented");
            return 0;
        }
        /* because we're done, we assume success */
        //只要执行过了,我们就认定为成功
        e->success = 1;
        return 1;
    }
    

    如果e-success=1则说明这个key的检测就过关了。为什么这么说呢,我们来看主检测方法process()方法:

    /* process方法是执行检测的主要方法 */
    void process(void) {
        uint64_t num_errors = 0, num_valid_ops = 0, num_valid_bytes = 0;
        entry entry;
        //读取文件头部获取快照文件版本号
        int dump_version = processHeader();
    
        /* Exclude the final checksum for RDB >= 5. Will be checked at the end. */
        if (dump_version >= 5) {
            if (positions[0].size < 8) {
                printf("RDB version >= 5 but no room for checksum.
    ");
                exit(1);
            }
            positions[0].size -= 8;
        }
    
        level = 1;
        while(positions[0].offset < positions[0].size) {
            positions[1] = positions[0];
    
            entry = loadEntry();
            if (!entry.success) {
            	//如果Entry不为成功状态
                printValid(num_valid_ops, num_valid_bytes);
                printErrorStack(&entry);
                num_errors++;
                num_valid_ops = 0;
                num_valid_bytes = 0;
    
                /* search for next valid entry */
                uint64_t offset = positions[0].offset + 1;
                int i = 0;
                
                //接着寻找后面3个有效entries
                while (!entry.success && offset < positions[0].size) {
                    positions[1].offset = offset;
    
                    /* find 3 consecutive valid entries */
                    //寻找3个有效的entries
                    for (i = 0; i < 3; i++) {
                        entry = loadEntry();
                        if (!entry.success) break;
                    }
                    /* check if we found 3 consecutive valid entries */
                    if (i < 3) {
                        offset++;
                    }
                }
    
                /* print how many bytes we have skipped to find a new valid opcode */
                if (offset < positions[0].size) {
                    printSkipped(offset - positions[0].offset, offset);
                }
    
                positions[0].offset = offset;
            } else {
                num_valid_ops++;
                num_valid_bytes += positions[1].offset - positions[0].offset;
    
                /* advance position */
                positions[0] = positions[1];
            }
            free(entry.key);
        }
    
        /* because there is another potential error,
         * print how many valid ops we have processed */
        printValid(num_valid_ops, num_valid_bytes);
    
        /* expect an eof */
        if (entry.type != REDIS_EOF) {
            /* last byte should be EOF, add error */
            errors.level = 0;
            SHIFT_ERROR(positions[0].offset, "Expected EOF, got %s", types[entry.type]);
    
            /* this is an EOF error so reset type */
            entry.type = -1;
            printErrorStack(&entry);
    
            num_errors++;
        }
    
        /* Verify checksum */
        //版本号>=5的时候,需要检验校验和
        if (dump_version >= 5) {
            uint64_t crc = crc64(0,positions[0].data,positions[0].size);
            uint64_t crc2;
            unsigned char *p = (unsigned char*)positions[0].data+positions[0].size;
            crc2 = ((uint64_t)p[0] << 0) |
                   ((uint64_t)p[1] << 8) |
                   ((uint64_t)p[2] << 16) |
                   ((uint64_t)p[3] << 24) |
                   ((uint64_t)p[4] << 32) |
                   ((uint64_t)p[5] << 40) |
                   ((uint64_t)p[6] << 48) |
                   ((uint64_t)p[7] << 56);
            if (crc != crc2) {
                SHIFT_ERROR(positions[0].offset, "RDB CRC64 does not match.");
            } else {
                printf("CRC64 checksum is OK
    ");
            }
        }
    
        /* print summary on errors */
        if (num_errors) {
            printf("
    ");
            printf("Total unprocessable opcodes: %llu
    ",
                (unsigned long long) num_errors);
        }
    }
    

    如果想了解检测的详细原理,事先了解dump.rdb的文件内容结构也许会对我们又很大帮助。


  • 相关阅读:
    SharePoint 2013 安装.NET Framework 3.5 报错
    SharePoint 2016 配置工作流环境
    SharePoint 2016 站点注册工作流服务报错
    Work Management Service application in SharePoint 2016
    SharePoint 2016 安装 Cumulative Update for Service Bus 1.0 (KB2799752)报错
    SharePoint 2016 工作流报错“没有适用于此应用程序的地址”
    SharePoint 2016 工作流报错“未安装应用程序管理共享服务代理”
    SharePoint JavaScript API in application pages
    SharePoint 2016 每天预热脚本介绍
    SharePoint 无法删除搜索服务应用程序
  • 原文地址:https://www.cnblogs.com/bianqi/p/12184263.html
Copyright © 2011-2022 走看看