zoukankan      html  css  js  c++  java
  • Redis(5.0.0)持久化AOF和 RDB 结合源码分析

    主要是挖个坑。候补(代码还没看完。。)

    https://github.com/antirez/redis/tree/5.0

    一、Redis保存持久化文件

    二、Redis启动加载持久化文件

    src/server.c loadDataFromDisk函数 AOF 和 RDB 通过不同的方式加载

     1 /* Function called at startup to load RDB or AOF file in memory. */
     2 void loadDataFromDisk(void) {
     3     long long start = ustime();
     4     if (server.aof_state == AOF_ON) {
     5         if (loadAppendOnlyFile(server.aof_filename) == C_OK)
     6             serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
     7     } else {
     8         rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
     9         if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
    10             serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
    11                 (float)(ustime()-start)/1000000);
    12 
    13             /* Restore the replication ID / offset from the RDB file. */
    14             if (server.masterhost &&
    15                 rsi.repl_id_is_set &&
    16                 rsi.repl_offset != -1 &&
    17                 /* Note that older implementations may save a repl_stream_db
    18                  * of -1 inside the RDB file in a wrong way, see more information
    19                  * in function rdbPopulateSaveInfo. */
    20                 rsi.repl_stream_db != -1)
    21             {
    22                 memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
    23                 server.master_repl_offset = rsi.repl_offset;
    24                 /* If we are a slave, create a cached master from this
    25                  * information, in order to allow partial resynchronizations
    26                  * with masters. */
    27                 replicationCacheMasterUsingMyself();
    28                 selectDb(server.cached_master,rsi.repl_stream_db);
    29             }
    30         } else if (errno != ENOENT) {
    31             serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
    32             exit(1);
    33         }
    34     }
    35 }
    loadDataFromDisk

    AOF路线

    src/aof.c loadAppendOnlyFile 主要通过检查是否含有RDB前缀文件,如果有则加载,创建一个FakeClient一条条的执行AOF文件中的指令 。

    特别地,事务会单独加载。

      1 int loadAppendOnlyFile(char *filename) {
      2     struct client *fakeClient;
      3     FILE *fp = fopen(filename,"r");
      4     struct redis_stat sb;
      5     int old_aof_state = server.aof_state;
      6     long loops = 0;
      7     off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
      8     off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */
      9 
     10     if (fp == NULL) {
     11         serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
     12         exit(1);
     13     }
     14 
     15     /* Handle a zero-length AOF file as a special case. An empty AOF file
     16      * is a valid AOF because an empty server with AOF enabled will create
     17      * a zero length file at startup, that will remain like that if no write
     18      * operation is received. */
     19     if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
     20         server.aof_current_size = 0;
     21         fclose(fp);
     22         return C_ERR;
     23     }
     24 
     25     /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
     26      * to the same file we're about to read. */
     27     server.aof_state = AOF_OFF;
     28 
     29     fakeClient = createFakeClient();
     30     startLoading(fp);
     31 
     32     /* Check if this AOF file has an RDB preamble. In that case we need to
     33      * load the RDB file and later continue loading the AOF tail. */
     34     char sig[5]; /* "REDIS" */
     35     if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
     36         /* No RDB preamble, seek back at 0 offset. */
     37         if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
     38     } else {
     39         /* RDB preamble. Pass loading the RDB functions. */
     40         rio rdb;
     41 
     42         serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
     43         if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
     44         rioInitWithFile(&rdb,fp);
     45         if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
     46             serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
     47             goto readerr;
     48         } else {
     49             serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
     50         }
     51     }
     52 
     53     /* Read the actual AOF file, in REPL format, command by command. */
     54     while(1) {
     55         int argc, j;
     56         unsigned long len;
     57         robj **argv;
     58         char buf[128];
     59         sds argsds;
     60         struct redisCommand *cmd;
     61 
     62         /* Serve the clients from time to time */
     63         if (!(loops++ % 1000)) {
     64             loadingProgress(ftello(fp));
     65             processEventsWhileBlocked();
     66         }
     67 
     68         if (fgets(buf,sizeof(buf),fp) == NULL) {
     69             if (feof(fp))
     70                 break;
     71             else
     72                 goto readerr;
     73         }
     74         if (buf[0] != '*') goto fmterr;
     75         if (buf[1] == '') goto readerr;
     76         argc = atoi(buf+1);
     77         if (argc < 1) goto fmterr;
     78 
     79         argv = zmalloc(sizeof(robj*)*argc);
     80         fakeClient->argc = argc;
     81         fakeClient->argv = argv;
     82 
     83         for (j = 0; j < argc; j++) {
     84             if (fgets(buf,sizeof(buf),fp) == NULL) {
     85                 fakeClient->argc = j; /* Free up to j-1. */
     86                 freeFakeClientArgv(fakeClient);
     87                 goto readerr;
     88             }
     89             if (buf[0] != '$') goto fmterr;
     90             len = strtol(buf+1,NULL,10);
     91             argsds = sdsnewlen(SDS_NOINIT,len);
     92             if (len && fread(argsds,len,1,fp) == 0) {
     93                 sdsfree(argsds);
     94                 fakeClient->argc = j; /* Free up to j-1. */
     95                 freeFakeClientArgv(fakeClient);
     96                 goto readerr;
     97             }
     98             argv[j] = createObject(OBJ_STRING,argsds);
     99             if (fread(buf,2,1,fp) == 0) {
    100                 fakeClient->argc = j+1; /* Free up to j. */
    101                 freeFakeClientArgv(fakeClient);
    102                 goto readerr; /* discard CRLF */
    103             }
    104         }
    105 
    106         /* Command lookup */
    107         cmd = lookupCommand(argv[0]->ptr);
    108         if (!cmd) {
    109             serverLog(LL_WARNING,
    110                 "Unknown command '%s' reading the append only file",
    111                 (char*)argv[0]->ptr);
    112             exit(1);
    113         }
    114 
    115         if (cmd == server.multiCommand) valid_before_multi = valid_up_to;
    116 
    117         /* Run the command in the context of a fake client */
    118         fakeClient->cmd = cmd;
    119         if (fakeClient->flags & CLIENT_MULTI &&
    120             fakeClient->cmd->proc != execCommand)
    121         {
    122             queueMultiCommand(fakeClient);
    123         } else {
    124             cmd->proc(fakeClient);
    125         }
    126 
    127         /* The fake client should not have a reply */
    128         serverAssert(fakeClient->bufpos == 0 &&
    129                      listLength(fakeClient->reply) == 0);
    130 
    131         /* The fake client should never get blocked */
    132         serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);
    133 
    134         /* Clean up. Command code may have changed argv/argc so we use the
    135          * argv/argc of the client instead of the local variables. */
    136         freeFakeClientArgv(fakeClient);
    137         fakeClient->cmd = NULL;
    138         if (server.aof_load_truncated) valid_up_to = ftello(fp);
    139     }
    140 
    141     /* This point can only be reached when EOF is reached without errors.
    142      * If the client is in the middle of a MULTI/EXEC, handle it as it was
    143      * a short read, even if technically the protocol is correct: we want
    144      * to remove the unprocessed tail and continue. */
    145     if (fakeClient->flags & CLIENT_MULTI) {
    146         serverLog(LL_WARNING,
    147             "Revert incomplete MULTI/EXEC transaction in AOF file");
    148         valid_up_to = valid_before_multi;
    149         goto uxeof;
    150     }
    151 
    152 loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
    153     fclose(fp);
    154     freeFakeClient(fakeClient);
    155     server.aof_state = old_aof_state;
    156     stopLoading();
    157     aofUpdateCurrentSize();
    158     server.aof_rewrite_base_size = server.aof_current_size;
    159     return C_OK;
    160 
    161 readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */
    162     if (!feof(fp)) {
    163         if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
    164         serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
    165         exit(1);
    166     }
    167 
    168 uxeof: /* Unexpected AOF end of file. */
    169     if (server.aof_load_truncated) {
    170         serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file !!!");
    171         serverLog(LL_WARNING,"!!! Truncating the AOF at offset %llu !!!",
    172             (unsigned long long) valid_up_to);
    173         if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {
    174             if (valid_up_to == -1) {
    175                 serverLog(LL_WARNING,"Last valid command offset is invalid");
    176             } else {
    177                 serverLog(LL_WARNING,"Error truncating the AOF file: %s",
    178                     strerror(errno));
    179             }
    180         } else {
    181             /* Make sure the AOF file descriptor points to the end of the
    182              * file after the truncate call. */
    183             if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {
    184                 serverLog(LL_WARNING,"Can't seek the end of the AOF file: %s",
    185                     strerror(errno));
    186             } else {
    187                 serverLog(LL_WARNING,
    188                     "AOF loaded anyway because aof-load-truncated is enabled");
    189                 goto loaded_ok;
    190             }
    191         }
    192     }
    193     if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
    194     serverLog(LL_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.");
    195     exit(1);
    196 
    197 fmterr: /* Format error. */
    198     if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
    199     serverLog(LL_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
    200     exit(1);
    201 }
    loadAppendOnlyFile

    RDB路线

    RDB就像是一个redis快照

    src/rdb.c  redis抽象出一个rio层,负责处理IO操作的,通过rio进行rdb文件恢复redis(快照保存的逆操作)。

     1 /* Like rdbLoadRio() but takes a filename instead of a rio stream. The
     2  * filename is open for reading and a rio stream object created in order
     3  * to do the actual loading. Moreover the ETA displayed in the INFO
     4  * output is initialized and finalized.
     5  *
     6  * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
     7  * loading code will fiil the information fields in the structure. */
     8 int rdbLoad(char *filename, rdbSaveInfo *rsi) {
     9     FILE *fp;
    10     rio rdb;
    11     int retval;
    12 
    13     if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
    14     startLoading(fp);
    15     rioInitWithFile(&rdb,fp);
    16     retval = rdbLoadRio(&rdb,rsi,0);
    17     fclose(fp);
    18     stopLoading();
    19     return retval;
    20 }
    rdbload
      1 /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
      2  * otherwise C_ERR is returned and 'errno' is set accordingly. */
      3 int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
      4     uint64_t dbid;
      5     int type, rdbver;
      6     redisDb *db = server.db+0;
      7     char buf[1024];
      8 
      9     rdb->update_cksum = rdbLoadProgressCallback;
     10     rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
     11     if (rioRead(rdb,buf,9) == 0) goto eoferr;
     12     buf[9] = '';
     13     if (memcmp(buf,"REDIS",5) != 0) {
     14         serverLog(LL_WARNING,"Wrong signature trying to load DB from file");
     15         errno = EINVAL;
     16         return C_ERR;
     17     }
     18     rdbver = atoi(buf+5);
     19     if (rdbver < 1 || rdbver > RDB_VERSION) {
     20         serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);
     21         errno = EINVAL;
     22         return C_ERR;
     23     }
     24 
     25     /* Key-specific attributes, set by opcodes before the key type. */
     26     long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
     27     long long lru_clock = LRU_CLOCK();
     28     
     29     while(1) {
     30         robj *key, *val;
     31 
     32         /* Read type. */
     33         if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
     34 
     35         /* Handle special types. */
     36         if (type == RDB_OPCODE_EXPIRETIME) {
     37             /* EXPIRETIME: load an expire associated with the next key
     38              * to load. Note that after loading an expire we need to
     39              * load the actual type, and continue. */
     40             expiretime = rdbLoadTime(rdb);
     41             expiretime *= 1000;
     42             continue; /* Read next opcode. */
     43         } else if (type == RDB_OPCODE_EXPIRETIME_MS) {
     44             /* EXPIRETIME_MS: milliseconds precision expire times introduced
     45              * with RDB v3. Like EXPIRETIME but no with more precision. */
     46             expiretime = rdbLoadMillisecondTime(rdb,rdbver);
     47             continue; /* Read next opcode. */
     48         } else if (type == RDB_OPCODE_FREQ) {
     49             /* FREQ: LFU frequency. */
     50             uint8_t byte;
     51             if (rioRead(rdb,&byte,1) == 0) goto eoferr;
     52             lfu_freq = byte;
     53             continue; /* Read next opcode. */
     54         } else if (type == RDB_OPCODE_IDLE) {
     55             /* IDLE: LRU idle time. */
     56             uint64_t qword;
     57             if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
     58             lru_idle = qword;
     59             continue; /* Read next opcode. */
     60         } else if (type == RDB_OPCODE_EOF) {
     61             /* EOF: End of file, exit the main loop. */
     62             break;
     63         } else if (type == RDB_OPCODE_SELECTDB) {
     64             /* SELECTDB: Select the specified database. */
     65             if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
     66             if (dbid >= (unsigned)server.dbnum) {
     67                 serverLog(LL_WARNING,
     68                     "FATAL: Data file was created with a Redis "
     69                     "server configured to handle more than %d "
     70                     "databases. Exiting
    ", server.dbnum);
     71                 exit(1);
     72             }
     73             db = server.db+dbid;
     74             continue; /* Read next opcode. */
     75         } else if (type == RDB_OPCODE_RESIZEDB) {
     76             /* RESIZEDB: Hint about the size of the keys in the currently
     77              * selected data base, in order to avoid useless rehashing. */
     78             uint64_t db_size, expires_size;
     79             if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
     80                 goto eoferr;
     81             if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
     82                 goto eoferr;
     83             dictExpand(db->dict,db_size);
     84             dictExpand(db->expires,expires_size);
     85             continue; /* Read next opcode. */
     86         } else if (type == RDB_OPCODE_AUX) {
     87             /* AUX: generic string-string fields. Use to add state to RDB
     88              * which is backward compatible. Implementations of RDB loading
     89              * are requierd to skip AUX fields they don't understand.
     90              *
     91              * An AUX field is composed of two strings: key and value. */
     92             robj *auxkey, *auxval;
     93             if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
     94             if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
     95 
     96             if (((char*)auxkey->ptr)[0] == '%') {
     97                 /* All the fields with a name staring with '%' are considered
     98                  * information fields and are logged at startup with a log
     99                  * level of NOTICE. */
    100                 serverLog(LL_NOTICE,"RDB '%s': %s",
    101                     (char*)auxkey->ptr,
    102                     (char*)auxval->ptr);
    103             } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {
    104                 if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);
    105             } else if (!strcasecmp(auxkey->ptr,"repl-id")) {
    106                 if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) {
    107                     memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1);
    108                     rsi->repl_id_is_set = 1;
    109                 }
    110             } else if (!strcasecmp(auxkey->ptr,"repl-offset")) {
    111                 if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);
    112             } else if (!strcasecmp(auxkey->ptr,"lua")) {
    113                 /* Load the script back in memory. */
    114                 if (luaCreateFunction(NULL,server.lua,auxval) == NULL) {
    115                     rdbExitReportCorruptRDB(
    116                         "Can't load Lua script from RDB file! "
    117                         "BODY: %s", auxval->ptr);
    118                 }
    119             } else {
    120                 /* We ignore fields we don't understand, as by AUX field
    121                  * contract. */
    122                 serverLog(LL_DEBUG,"Unrecognized RDB AUX field: '%s'",
    123                     (char*)auxkey->ptr);
    124             }
    125 
    126             decrRefCount(auxkey);
    127             decrRefCount(auxval);
    128             continue; /* Read type again. */
    129         } else if (type == RDB_OPCODE_MODULE_AUX) {
    130             /* This is just for compatibility with the future: we have plans
    131              * to add the ability for modules to store anything in the RDB
    132              * file, like data that is not related to the Redis key space.
    133              * Such data will potentially be stored both before and after the
    134              * RDB keys-values section. For this reason since RDB version 9,
    135              * we have the ability to read a MODULE_AUX opcode followed by an
    136              * identifier of the module, and a serialized value in "MODULE V2"
    137              * format. */
    138             uint64_t moduleid = rdbLoadLen(rdb,NULL);
    139             moduleType *mt = moduleTypeLookupModuleByID(moduleid);
    140             char name[10];
    141             moduleTypeNameByID(name,moduleid);
    142 
    143             if (!rdbCheckMode && mt == NULL) {
    144                 /* Unknown module. */
    145                 serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
    146                 exit(1);
    147             } else if (!rdbCheckMode && mt != NULL) {
    148                 /* This version of Redis actually does not know what to do
    149                  * with modules AUX data... */
    150                 serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name);
    151                 exit(1);
    152             } else {
    153                 /* RDB check mode. */
    154                 robj *aux = rdbLoadCheckModuleValue(rdb,name);
    155                 decrRefCount(aux);
    156             }
    157         }
    158 
    159         /* Read key */
    160         if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
    161         /* Read value */
    162         if ((val = rdbLoadObject(type,rdb)) == NULL) goto eoferr;
    163         /* Check if the key already expired. This function is used when loading
    164          * an RDB file from disk, either at startup, or when an RDB was
    165          * received from the master. In the latter case, the master is
    166          * responsible for key expiry. If we would expire keys here, the
    167          * snapshot taken by the master may not be reflected on the slave. */
    168         if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) {
    169             decrRefCount(key);
    170             decrRefCount(val);
    171         } else {
    172             /* Add the new object in the hash table */
    173             dbAdd(db,key,val);
    174 
    175             /* Set the expire time if needed */
    176             if (expiretime != -1) setExpire(NULL,db,key,expiretime);
    177             
    178             /* Set usage information (for eviction). */
    179             objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
    180 
    181             /* Decrement the key refcount since dbAdd() will take its
    182              * own reference. */
    183             decrRefCount(key);
    184         }
    185 
    186         /* Reset the state that is key-specified and is populated by
    187          * opcodes before the key, so that we start from scratch again. */
    188         expiretime = -1;
    189         lfu_freq = -1;
    190         lru_idle = -1;
    191     }
    192     /* Verify the checksum if RDB version is >= 5 */
    193     if (rdbver >= 5) {
    194         uint64_t cksum, expected = rdb->cksum;
    195 
    196         if (rioRead(rdb,&cksum,8) == 0) goto eoferr;
    197         if (server.rdb_checksum) {
    198             memrev64ifbe(&cksum);
    199             if (cksum == 0) {
    200                 serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");
    201             } else if (cksum != expected) {
    202                 serverLog(LL_WARNING,"Wrong RDB checksum. Aborting now.");
    203                 rdbExitReportCorruptRDB("RDB CRC error");
    204             }
    205         }
    206     }
    207     return C_OK;
    208 
    209 eoferr: /* unexpected end of file is handled here with a fatal exit */
    210     serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
    211     rdbExitReportCorruptRDB("Unexpected EOF reading RDB file");
    212     return C_ERR; /* Just to avoid warning */
    213 }
    rdbLoadRio
  • 相关阅读:
    pinyin4j使用示例
    迭代器模式
    适配器模式
    策略模式
    装饰模式
    责任链模式
    命令模式
    中介者模式
    原型模式
    代理模式
  • 原文地址:https://www.cnblogs.com/Geek-xiyang/p/10340044.html
Copyright © 2011-2022 走看看