zoukankan      html  css  js  c++  java
  • Redis源码分析(十四)--- rdb.c本地数据库操作

                 过去2,3天内把redis内部的测试相关包分析了一遍,总体感觉还是比较容易的,总共5个文件,也让我们涨了一下见识,什么叫内置的测试函数。今天,我把目标进行了转移,下面我准备继续学习与代码逻辑稍稍无关的模块,数据层,在我的分类中,就是在Data的文件包。在这个里面,首当其冲,我研究了rdb.c,直接与数据库操作相关。什么叫数据库操作相关呢,最直接的意思就是,数据库的相关操作到最后到会直接映射到这个文件中的函数操作。所以在理解这些操作之前,我得先介绍一下里面的一些东西,免得会比较乱。我们知道,redis内部支持很多中类型,

    1.list列表

    2.hash类型

    3.set类型

    4.string类型

    其中如果是list列表类型,其实内部的编码方式又可分为2种,linkedList普通链表模式,ziplist压缩列表模式,所以说里面的代码里的类型是非常多的,所以建议读者阅读学习源码的时候,不要搞混了。rdb中的数据存储的基本格式为[len][data],前面使用字节表示的长度,后面是真实的数据,当然我这说的是普通的字符串类型的key:value的值,如果是纯数字,直接用字节表示值,根据值的大小分配不同的字节表示,不得不说,redis在数据存储方面上,把数据存储的内存消耗降到了极致。比如只要是在数据库中保存的长度等数字的,必须经过计算判断,然后再分配相应的字节保存(跟前面压缩列表等的原理类型):

    /* Load an encoded length. The "isencoded" argument is set to 1 if the length
     * is not actually a length but an "encoding type". See the REDIS_RDB_ENC_*
     * definitions in rdb.h for more information. */
    /* 加载长度,也需要根据编码方式,读取不同的buf获取长度 */
    uint32_t rdbLoadLen(rio *rdb, int *isencoded) {
        unsigned char buf[2];
        uint32_t len;
        int type;
    
        if (isencoded) *isencoded = 0;
        if (rioRead(rdb,buf,1) == 0) return REDIS_RDB_LENERR;
        type = (buf[0]&0xC0)>>6;
        if (type == REDIS_RDB_ENCVAL) {
            /* Read a 6 bit encoding type. */
            if (isencoded) *isencoded = 1;
            return buf[0]&0x3F;
        } else if (type == REDIS_RDB_6BITLEN) {
            /* Read a 6 bit len. */
            return buf[0]&0x3F;
        } else if (type == REDIS_RDB_14BITLEN) {
            /* Read a 14 bit len. */
            if (rioRead(rdb,buf+1,1) == 0) return REDIS_RDB_LENERR;
            return ((buf[0]&0x3F)<<8)|buf[1];
        } else {
            /* Read a 32 bit len. */
            if (rioRead(rdb,&len,4) == 0) return REDIS_RDB_LENERR;
            return ntohl(len);
        }
    }
    

    只要通过编码方式存储的字符串,普通字符串都要先经过压缩再存入,取出的时候先做解压操作:

    /* rdb加载字符串对象的泛型方法 */
    robj *rdbGenericLoadStringObject(rio *rdb, int encode) {
        int isencoded;
        uint32_t len;
        sds val;
    
        len = rdbLoadLen(rdb,&isencoded);
        if (isencoded) {
        	//返回值主要为加载数值对象,和获取解压后的字符串对象
            switch(len) {
            case REDIS_RDB_ENC_INT8:
            case REDIS_RDB_ENC_INT16:
            case REDIS_RDB_ENC_INT32:
                return rdbLoadIntegerObject(rdb,len,encode);
            case REDIS_RDB_ENC_LZF:
                return rdbLoadLzfStringObject(rdb);
            default:
                redisPanic("Unknown RDB encoding type");
            }
        }
    
    	//无编码方式,直接读取rdb
        if (len == REDIS_RDB_LENERR) return NULL;
        val = sdsnewlen(NULL,len);
        if (len && rioRead(rdb,val,len) == 0) {
            sdsfree(val);
            return NULL;
        }
        return createObject(REDIS_STRING,val);
    }
    
    综上,我总结了几点,redis数据量在存储数据上的做的调优

    1.长度等数值数据存储,根据数值大小的不同,分配不同的字节存储,1个字节,2个字节,后面直接到5个字节,避免直接像int32,int64一样,直接占去4,8个字节。一般字符串的长度都是比较小的,如果每个字符串的长度是10,你用4,8个字节去存的话,大大的浪费空间了。
    2.字符串等非数值存储,redis在这里采用了lzf压缩算法,当然取出的时候,你要进行解压,或者你从最开始的时候不选择的压缩存储,而是直接存储。

    所以,这样的设计非常棒,数据库的任何操作结果都会最终赋值到robj->ptr上:

    if (o->encoding == REDIS_ENCODING_INTSET) {
                    /* Fetch integer value from element */
                    if (isObjectRepresentableAsLongLong(ele,&llval) == REDIS_OK) {
                    	//最后都会通过吧值赋在obj->ptr上
                        o->ptr = intsetAdd(o->ptr,llval,NULL);
                    } else {
                        setTypeConvert(o,REDIS_ENCODING_HT);
                        dictExpand(o->ptr,len);
                    }
                }

    在这些个方法里面,还有一个比较特殊的后台保存到数据库的方法,为什么会有这样的操作呢,因为redis其实和mencached一样,是内存数据库,如果对数据的操作都直接是写入磁盘,I/O开销肯定很大,所以一般内存数据库都是先把操作结构都存放在内存中,等到了内存的数据满了,再持久化到磁盘中,就是保存数据库操作到文件中了。redis在这里还很人性化的提供了backgroundSave()的方式:,如果这个问题出现在java里面,我的直接做法肯定开个线程让他直接运行Save的方法就行了,但是想在C语言中实现这种类似多线程的操作,我还真想不出来,最终他的答案是fork(),在Linux编程中,肯定接触过了这个方法,在C语言的应用编程中基本没看到过,我也是头次领略到fork方法还能这么用,先看看原方法调用细节:

    /* 后台进行rbd保存操作 */
    int rdbSaveBackground(char *filename) {
        pid_t childpid;
        long long start;
    
        if (server.rdb_child_pid != -1) return REDIS_ERR;
    
        server.dirty_before_bgsave = server.dirty;
        server.lastbgsave_try = time(NULL);
    
        start = ustime();
        //利用fork()创建子进程用来实现rdb的保存操作
        //此时有2个进程在执行这段函数的代码,在子进行程返回的pid为0,
        //所以会执行下面的代码,在父进程中返回的代码为孩子的pid,不为0,所以执行else分支的代码
        //在父进程中放返回-1代表创建子进程失败
        if ((childpid = fork()) == 0) {
        	//在这个if判断的代码就是在子线程中后执行的操作
            int retval;
    
            /* Child */
            closeListeningSockets(0);
            redisSetProcTitle("redis-rdb-bgsave");
            //这个就是刚刚说的rdbSave()操作
            retval = rdbSave(filename);
            if (retval == REDIS_OK) {
                size_t private_dirty = zmalloc_get_private_dirty();
    
                if (private_dirty) {
                    redisLog(REDIS_NOTICE,
                        "RDB: %zu MB of memory used by copy-on-write",
                        private_dirty/(1024*1024));
                }
            }
            exitFromChild((retval == REDIS_OK) ? 0 : 1);
        } else {
        	//执行父线程的后续操作
            /* Parent */
            server.stat_fork_time = ustime()-start;
            server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
            latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
            if (childpid == -1) {
                server.lastbgsave_status = REDIS_ERR;
                redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                    strerror(errno));
                return REDIS_ERR;
            }
            redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
            server.rdb_save_time_start = time(NULL);
            server.rdb_child_pid = childpid;
            updateDictResizePolicy();
            return REDIS_OK;
        }
        return REDIS_OK; /* unreached */
    }
    

    父进程fork()出的子线程是基本完全复用父亲线程的,所以也就是说,父子线程都会执行这个函数,但是唯一的区别是执行fork函数返回值是不同的,子线程因为是被fork出来的,返回的就是0代表自身,父亲线程就是返回子线程的PID,然后根据返回的PID不同,执行不同的操作,子线程就完全独立于父亲线程,做自己的保存操作。这也是头次我知道了fork还能这么用。下面亮出.h头文件中的API,其实和.c文件里的差了很多的方法:

    int rdbSaveType(rio *rdb, unsigned char type); /* 保存类型操作 */
    int rdbLoadType(rio *rdb); /* 加载RDB中的格式类型 */
    int rdbSaveTime(rio *rdb, time_t t);
    time_t rdbLoadTime(rio *rdb); /* 加载时间,都是间接调用的是rioRead()方法 */
    int rdbSaveLen(rio *rdb, uint32_t len); /* 保存一个字符串对象的长度时,根据长度的不同,分不同的编码方式 */
    uint32_t rdbLoadLen(rio *rdb, int *isencoded); /* 加载长度,也需要根据编码方式,读取不同的buf获取长度 */
    int rdbSaveObjectType(rio *rdb, robj *o); /* 根据robj中的编码方式,保存到rbd中 */
    int rdbLoadObjectType(rio *rdb); /* 加载rbd中的obj Type */
    int rdbLoad(char *filename); /* 加载rdb数据库文件 */
    int rdbSaveBackground(char *filename); /* 后台进行rbd保存操作 */
    void rdbRemoveTempFile(pid_t childpid); /* 移除子进程操作的相关保存rdb文件 */
    int rdbSave(char *filename); /* 保存rdb数据库的内容到磁盘中 */
    int rdbSaveObject(rio *rdb, robj *o); /* 保存redis obj对象到rdb中 */
    off_t rdbSavedObjectLen(robj *o); /* 获取保存后的长度,其实就是获取了保存数据时计算的偏移量 */
    off_t rdbSavedObjectPages(robj *o);
    robj *rdbLoadObject(int type, rio *rdb); /* 加载redis obj对象,有特定的Type类型 */
    void backgroundSaveDoneHandler(int exitcode, int bysignal); /* 后台保存数据库操作完成后的处理方法 */
    int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, long long now);
    robj *rdbLoadStringObject(rio *rdb); /* 无编码方式加载字符串对象 */
    void saveCommand(redisClient *c) /* 将保存操作封装成命令的形式 */
    void bgsaveCommand(redisClient *c) /* 将后台保存数据库操作封装成命令的模式 */
  • 相关阅读:
    splay区间模板-1331-序列终结者1
    splay单点模板-5203-BZOJ3224 普通平衡树
    线段树模板-1204-影子的宽度
    树状数组模板-1200-序列和
    YAML配置复杂集合类型
    webpack vue-router vue 打包上线一些列问题
    idea 注释模板
    JavaScript中reduce()方法
    ES6实用语法糖
    axios 备忘
  • 原文地址:https://www.cnblogs.com/bianqi/p/12184258.html
Copyright © 2011-2022 走看看