zoukankan      html  css  js  c++  java
  • redis代码解析-事务

    redis 的事务相关的几个命令分别为 watch multi exec.

    watch 可以监控一个变量在事务开始执行之前是否有被修改.使用方式为: WATCH key [key ...]

        在redis内部的实现是每个db有一个名为watched_keys的dict,这个dict的key为监控的key,value为所有监控该key的client组成的链表.

        所有的涉及到修改数据库的命令执行成功之后都会执行signalModifiedKey->touchWatchedKey,将修改的db中的watched_keys里面记录的所有监控了本次修改的key的client设置c->flags |= CLIENT_DIRTY_CAS;

    multi 表示开始一次事务,使用方式为: MULTI

    实现为只是对client标记上 c->flags |= CLIENT_MULTI;它会先检查一下是否已经标记了 CLIENT_MULTI,如果标记了本次multi命令会失败,因为redis不允许事务嵌套.
     
    在multi开始之后所有的命令(除了multi,exec,discard,watch)都会进入multicommand queue.
    我们知道一条命令进入server的流程为,
    第一步,底层的io多路复用处理器会上报client可读事件,调用注册的处理可读事件的函数readQueryFromClient将命令读入client的querybuf,
    第二步,进入到processInputBuffer处理读入的buffer,分解出argv,argc;
    第三步,再进入processCommand,先根据argv[1]在命令表的dict中找到对应的command,找到command后会检查本次的command是否合法(名字是否合法,参数个数是否合法),如果command不合法在返回错误的同时会检查是否处于事务状态中,如果命令
    不合法CLIENT_DIRTY_EXEC标志将导致事务执行失败.
    /* Flag the transacation as DIRTY_EXEC so that EXEC will fail.
     * Should be called every time there is an error while queueing a command. */
    void flagTransaction(client *c) {
        if (c->flags & CLIENT_MULTI)
            c->flags |= CLIENT_DIRTY_EXEC;
    }

    然后(略过一些与事务无关的步骤),检查c->flags & CLIENT_MULTI,来决定是否直接执行命令还是将命令进入queueMultiCommand

    第四步,在queueMultiCommand中,将本次命令command,argc,argv加入到c->mstate.commands队列中.
    /* Client MULTI/EXEC state */
    typedef struct multiCmd {
        robj **argv;
        int argc;
        struct redisCommand *cmd;
    } multiCmd;
    
    typedef struct multiState {
        multiCmd *commands;     /* Array of MULTI commands */
        int count;              /* Total number of MULTI commands */
        int minreplicas;        /* MINREPLICAS for synchronous replication */
        time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
    } multiState;
    typedef
    struct client { ... multiState mstate; /* MULTI/EXEC state */ ... } client;
    exec 命令开始执行事务队列中的全部命令,使用方式为:EXEC
    exec会检查当前是否处于事务状态
    if (!(c->flags & CLIENT_MULTI)) {
            addReplyError(c,"EXEC without MULTI");
            return;
        }

    然后检查该条命令来自的client监控的key是否有被touch

    /* Check if we need to abort the EXEC because:
         * 1) Some WATCHed key was touched.
         * 2) There was a previous error while queueing commands.
         * A failed EXEC in the first case returns a multi bulk nil object
         * (technically it is not an error but a special behavior), while
         * in the second an EXECABORT error is returned. */
        if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
            addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                      shared.nullmultibulk);
            discardTransaction(c);
            goto handle_monitor;
        }

    最后是遍历c->mstate.commands执行所有事务命令队列中的命令

    /* Exec all the queued commands */
        unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
        orig_argv = c->argv;
        orig_argc = c->argc;
        orig_cmd = c->cmd;
        addReplyMultiBulkLen(c,c->mstate.count);
        for (j = 0; j < c->mstate.count; j++) {
            c->argc = c->mstate.commands[j].argc;
            c->argv = c->mstate.commands[j].argv;
            c->cmd = c->mstate.commands[j].cmd;
    
            /* Propagate a MULTI request once we encounter the first command which
             * is not readonly nor an administrative one.
             * This way we'll deliver the MULTI/..../EXEC block as a whole and
             * both the AOF and the replication link will have the same consistency
             * and atomicity guarantees. */
            if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
                execCommandPropagateMulti(c);
                must_propagate = 1;
            }
    
            call(c,CMD_CALL_FULL);
    
            /* Commands may alter argc/argv, restore mstate. */
            c->mstate.commands[j].argc = c->argc;
            c->mstate.commands[j].argv = c->argv;
            c->mstate.commands[j].cmd = c->cmd;
        }
        c->argv = orig_argv;
        c->argc = orig_argc;
        c->cmd = orig_cmd;
        discardTransaction(c);
  • 相关阅读:
    Learn Prolog Now 翻译
    Learn Prolog Now 翻译
    Learn Prolog Now 翻译
    Learn Prolog Now 翻译
    Learn Prolog Now 翻译
    Learn Prolog Now 翻译
    Learn Prolog Now 翻译
    Learn Prolog Now 翻译
    Learn Prolog Now 翻译
    Learn Prolog Now 翻译
  • 原文地址:https://www.cnblogs.com/nicganon/p/7765596.html
Copyright © 2011-2022 走看看