zoukankan      html  css  js  c++  java
  • Redis学习--命令执行过程中写AOF日志和同步从库顺序

    主从数据同步和AOF日志追加

    Redis在命令成功执行后,会先将命令追加到AOF日志中,然后再依次推送给每个从节点。

    /* Propagate the specified command (in the context of the specified database id)
     * to AOF and Slaves.
     *
     * flags are an xor between:
     * + PROPAGATE_NONE (no propagation of command at all)
     * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
     * + PROPAGATE_REPL (propagate into the replication link)
     *
     * This should not be used inside commands implementation since it will not
     * wrap the resulting commands in MULTI/EXEC. Use instead alsoPropagate(),
     * preventCommandPropagation(), forceCommandPropagation().
     *
     * However for functions that need to (also) propagate out of the context of a
     * command execution, for example when serving a blocked client, you
     * want to use propagate().
     */
    void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
                   int flags)
    {
        if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
            feedAppendOnlyFile(cmd,dbid,argv,argc);
        if (flags & PROPAGATE_REPL)
            replicationFeedSlaves(server.slaves,dbid,argv,argc);
    }
    
    
    /* Propagate expires into slaves and the AOF file.
     * When a key expires in the master, a DEL operation for this key is sent
     * to all the slaves and the AOF file if enabled.
     *
     * This way the key expiry is centralized in one place, and since both
     * AOF and the master->slave link guarantee operation ordering, everything
     * will be consistent even if we allow write operations against expiring
     * keys. */
    void propagateExpire(redisDb *db, robj *key, int lazy) {
        robj *argv[2];
    
        argv[0] = lazy ? shared.unlink : shared.del;
        argv[1] = key;
        incrRefCount(argv[0]);
        incrRefCount(argv[1]);
    
        if (server.aof_state != AOF_OFF)
            feedAppendOnlyFile(server.delCommand,db->id,argv,2);
        replicationFeedSlaves(server.slaves,db->id,argv,2);
    
        decrRefCount(argv[0]);
        decrRefCount(argv[1]);
    }
    

    向从节点传播命令

    先将切换数据库命令写入repl_backlog中,再将切换数据库命令发给每个从库执行。

    先将执行成功命令写入到repl_backlog中,再将执行成功命令发送给每个从库执行。

    /* Propagate write commands to slaves, and populate the replication backlog
     * as well. This function is used if the instance is a master: we use
     * the commands received by our clients in order to create the replication
     * stream. Instead if the instance is a slave and has sub-slaves attached,
     * we use replicationFeedSlavesFromMaster() */
    void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
        listNode *ln;
        listIter li;
        int j, len;
        char llstr[LONG_STR_SIZE];
    
        /* If the instance is not a top level master, return ASAP: we'll just proxy
         * the stream of data we receive from our master instead, in order to
         * propagate *identical* replication stream. In this way this slave can
         * advertise the same replication ID as the master (since it shares the
         * master replication history and has the same backlog and offsets). */
        if (server.masterhost != NULL) return;
    
        /* If there aren't slaves, and there is no backlog buffer to populate,
         * we can return ASAP. */
        if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
    
        /* We can't have slaves attached and no backlog. */
        serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
    
        /* Send SELECT command to every slave if needed. */
        if (server.slaveseldb != dictid) {
            robj *selectcmd;
    
            /* For a few DBs we have pre-computed SELECT command. */
            if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
                selectcmd = shared.select[dictid];
            } else {
                int dictid_len;
    
                dictid_len = ll2string(llstr,sizeof(llstr),dictid);
                selectcmd = createObject(OBJ_STRING,
                    sdscatprintf(sdsempty(),
                    "*2
    $6
    SELECT
    $%d
    %s
    ",
                    dictid_len, llstr));
            }
    
            /* Add the SELECT command into the backlog. */
            if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
    
            /* Send it to slaves. */
            listRewind(slaves,&li);
            while((ln = listNext(&li))) {
                client *slave = ln->value;
                if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
                addReply(slave,selectcmd);
            }
    
            if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
                decrRefCount(selectcmd);
        }
        server.slaveseldb = dictid;
    
        /* Write the command to the replication backlog if any. */
        if (server.repl_backlog) {
            char aux[LONG_STR_SIZE+3];
    
            /* Add the multi bulk reply length. */
            aux[0] = '*';
            len = ll2string(aux+1,sizeof(aux)-1,argc);
            aux[len+1] = '
    ';
            aux[len+2] = '
    ';
            feedReplicationBacklog(aux,len+3);
    
            for (j = 0; j < argc; j++) {
                long objlen = stringObjectLen(argv[j]);
    
                /* We need to feed the buffer with the object as a bulk reply
                 * not just as a plain string, so create the $..CRLF payload len
                 * and add the final CRLF */
                aux[0] = '$';
                len = ll2string(aux+1,sizeof(aux)-1,objlen);
                aux[len+1] = '
    ';
                aux[len+2] = '
    ';
                feedReplicationBacklog(aux,len+3);
                feedReplicationBacklogWithObject(argv[j]);
                feedReplicationBacklog(aux+len+1,2);
            }
        }
    
        /* Write the command to every slave. */
        listRewind(slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;
    
            /* Don't feed slaves that are still waiting for BGSAVE to start */
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
    
            /* Feed slaves that are waiting for the initial SYNC (so these commands
             * are queued in the output buffer until the initial SYNC completes),
             * or are already in sync with the master. */
    
            /* Add the multi bulk length. */
            addReplyMultiBulkLen(slave,argc);
    
            /* Finally any additional argument that was not stored inside the
             * static buffer if any (from j to argc). */
            for (j = 0; j < argc; j++)
                addReplyBulk(slave,argv[j]);
        }
    }
    
    

    AOF日志追加命令

    在将命令追加到AOF日志过程中,会对部分命令进行重写:

    void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
        sds buf = sdsempty();
        robj *tmpargv[3];
    
        /* The DB this command was targeting is not the same as the last command
         * we appended. To issue a SELECT command is needed. */
        if (dictid != server.aof_selected_db) {
            char seldb[64];
    
            snprintf(seldb,sizeof(seldb),"%d",dictid);
            buf = sdscatprintf(buf,"*2
    $6
    SELECT
    $%lu
    %s
    ",
                (unsigned long)strlen(seldb),seldb);
            server.aof_selected_db = dictid;
        }
    
        if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
            cmd->proc == expireatCommand) {
            /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
            buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
        } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
            /* Translate SETEX/PSETEX to SET and PEXPIREAT */
            tmpargv[0] = createStringObject("SET",3);
            tmpargv[1] = argv[1];
            tmpargv[2] = argv[3];
            buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
            decrRefCount(tmpargv[0]);
            buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
        } else if (cmd->proc == setCommand && argc > 3) {
            int i;
            robj *exarg = NULL, *pxarg = NULL;
            /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
            buf = catAppendOnlyGenericCommand(buf,3,argv);
            for (i = 3; i < argc; i ++) {
                if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
                if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
            }
            serverAssert(!(exarg && pxarg));
            if (exarg)
                buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
                                                   exarg);
            if (pxarg)
                buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
                                                   pxarg);
        } else {
            /* All the other commands don't need translation or need the
             * same translation already operated in the command vector
             * for the replication itself. */
            buf = catAppendOnlyGenericCommand(buf,argc,argv);
        }
    
        /* Append to the AOF buffer. This will be flushed on disk just before
         * of re-entering the event loop, so before the client will get a
         * positive reply about the operation performed. */
        if (server.aof_state == AOF_ON)
            server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
    
        /* If a background append only file rewriting is in progress we want to
         * accumulate the differences between the child DB and the current one
         * in a buffer, so that when the child process will do its work we
         * can append the differences to the new append only file. */
        if (server.aof_child_pid != -1)
            aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
    
        sdsfree(buf);
    }
    

    CALL执行命令

    CALL执行命令步骤:

    • 将要执行的命令发送给开启MONITOR的客户端
    • 执行命令并统计执行时间
    • 如果执行时间超过阈值,生成慢日志并写入到慢日志文件
    • 将命令写入AOF日志和传播给所有从库。
    /* Call() is the core of Redis execution of a command.
     *
     * The following flags can be passed:
     * CMD_CALL_NONE        No flags.
     * CMD_CALL_SLOWLOG     Check command speed and log in the slow log if needed.
     * CMD_CALL_STATS       Populate command stats.
     * CMD_CALL_PROPAGATE_AOF   Append command to AOF if it modified the dataset
     *                          or if the client flags are forcing propagation.
     * CMD_CALL_PROPAGATE_REPL  Send command to salves if it modified the dataset
     *                          or if the client flags are forcing propagation.
     * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL.
     * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE.
     *
     * The exact propagation behavior depends on the client flags.
     * Specifically:
     *
     * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
     *    and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
     *    in the call flags, then the command is propagated even if the
     *    dataset was not affected by the command.
     * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
     *    are set, the propagation into AOF or to slaves is not performed even
     *    if the command modified the dataset.
     *
     * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
     * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
     * slaves propagation will never occur.
     *
     * Client flags are modified by the implementation of a given command
     * using the following API:
     *
     * forceCommandPropagation(client *c, int flags);
     * preventCommandPropagation(client *c);
     * preventCommandAOF(client *c);
     * preventCommandReplication(client *c);
     *
     */
    void call(client *c, int flags) {
        long long dirty;
        ustime_t start, duration;
        int client_old_flags = c->flags;
        struct redisCommand *real_cmd = c->cmd;
    
        server.fixed_time_expire++;
    
        /* Sent the command to clients in MONITOR mode, only if the commands are
         * not generated from reading an AOF. */
        if (listLength(server.monitors) &&
            !server.loading &&
            !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
        {
            replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
        }
    
        /* Initialization: clear the flags that must be set by the command on
         * demand, and initialize the array for additional commands propagation. */
        c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
        redisOpArray prev_also_propagate = server.also_propagate;
        redisOpArrayInit(&server.also_propagate);
    
        /* Call the command. */
        dirty = server.dirty;
        updateCachedTime(0);
        start = server.ustime;
        c->cmd->proc(c);
        duration = ustime()-start;
        dirty = server.dirty-dirty;
        if (dirty < 0) dirty = 0;
    
        /* When EVAL is called loading the AOF we don't want commands called
         * from Lua to go into the slowlog or to populate statistics. */
        if (server.loading && c->flags & CLIENT_LUA)
            flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
    
        /* If the caller is Lua, we want to force the EVAL caller to propagate
         * the script if the command flag or client flag are forcing the
         * propagation. */
        if (c->flags & CLIENT_LUA && server.lua_caller) {
            if (c->flags & CLIENT_FORCE_REPL)
                server.lua_caller->flags |= CLIENT_FORCE_REPL;
            if (c->flags & CLIENT_FORCE_AOF)
                server.lua_caller->flags |= CLIENT_FORCE_AOF;
        }
    
        /* Log the command into the Slow log if needed, and populate the
         * per-command statistics that we show in INFO commandstats. */
        if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
            char *latency_event = (c->cmd->flags & CMD_FAST) ?
                                  "fast-command" : "command";
            latencyAddSampleIfNeeded(latency_event,duration/1000);
            slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
        }
        if (flags & CMD_CALL_STATS) {
            /* use the real command that was executed (cmd and lastamc) may be
             * different, in case of MULTI-EXEC or re-written commands such as
             * EXPIRE, GEOADD, etc. */
            real_cmd->microseconds += duration;
            real_cmd->calls++;
        }
    
        /* Propagate the command into the AOF and replication link */
        if (flags & CMD_CALL_PROPAGATE &&
            (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
        {
            int propagate_flags = PROPAGATE_NONE;
    
            /* Check if the command operated changes in the data set. If so
             * set for replication / AOF propagation. */
            if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
    
            /* If the client forced AOF / replication of the command, set
             * the flags regardless of the command effects on the data set. */
            if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
            if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
    
            /* However prevent AOF / replication propagation if the command
             * implementations called preventCommandPropagation() or similar,
             * or if we don't have the call() flags to do so. */
            if (c->flags & CLIENT_PREVENT_REPL_PROP ||
                !(flags & CMD_CALL_PROPAGATE_REPL))
                    propagate_flags &= ~PROPAGATE_REPL;
            if (c->flags & CLIENT_PREVENT_AOF_PROP ||
                !(flags & CMD_CALL_PROPAGATE_AOF))
                    propagate_flags &= ~PROPAGATE_AOF;
    
            /* Call propagate() only if at least one of AOF / replication
             * propagation is needed. Note that modules commands handle replication
             * in an explicit way, so we never replicate them automatically. */
            if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
                propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
        }
    
        /* Restore the old replication flags, since call() can be executed
         * recursively. */
        c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
        c->flags |= client_old_flags &
            (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    
        /* Handle the alsoPropagate() API to handle commands that want to propagate
         * multiple separated commands. Note that alsoPropagate() is not affected
         * by CLIENT_PREVENT_PROP flag. */
        if (server.also_propagate.numops) {
            int j;
            redisOp *rop;
    
            if (flags & CMD_CALL_PROPAGATE) {
                for (j = 0; j < server.also_propagate.numops; j++) {
                    rop = &server.also_propagate.ops[j];
                    int target = rop->target;
                    /* Whatever the command wish is, we honor the call() flags. */
                    if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
                    if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
                    if (target)
                        propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
                }
            }
            redisOpArrayFree(&server.also_propagate);
        }
        server.also_propagate = prev_also_propagate;
        server.fixed_time_expire--;
        server.stat_numcommands++;
    }
    
  • 相关阅读:
    网鼎杯_2018 _Web_fakebook
    CISCN 2019-ikun
    流浪者-CTF
    wtf.sh-150
    bug-ctf
    EasyRE
    MySQL 存储引擎特性和线程模型
    InnoDB体系结构---物理存储结构
    mysql数据页结构及行格式
    linux系统清理缓存
  • 原文地址:https://www.cnblogs.com/gaogao67/p/15062974.html
Copyright © 2011-2022 走看看