zoukankan      html  css  js  c++  java
  • redis代码解析-事务

    redis 的事务相关的几个命令分别为 watch multi exec.

    watch 可以监控一个变量在事务开始执行之前是否有被修改.使用方式为: WATCH key [key ...]

        在redis内部的实现是每个db有一个名为watched_keys的dict,这个dict的key为监控的key,value为所有监控该key的client组成的链表.

        所有的涉及到修改数据库的命令执行成功之后都会执行signalModifiedKey->touchWatchedKey,将修改的db中的watched_keys里面记录的所有监控了本次修改的key的client设置c->flags |= CLIENT_DIRTY_CAS;

    multi 表示开始一次事务,使用方式为: MULTI

    实现为只是对client标记上 c->flags |= CLIENT_MULTI;它会先检查一下是否已经标记了 CLIENT_MULTI,如果标记了本次multi命令会失败,因为redis不允许事务嵌套.
     
    在multi开始之后所有的命令(除了multi,exec,discard,watch)都会进入multicommand queue.
    我们知道一条命令进入server的流程为,
    第一步,底层的io多路复用处理器会上报client可读事件,调用注册的处理可读事件的函数readQueryFromClient将命令读入client的querybuf,
    第二步,进入到processInputBuffer处理读入的buffer,分解出argv,argc;
    第三步,再进入processCommand,先根据argv[1]在命令表的dict中找到对应的command,找到command后会检查本次的command是否合法(名字是否合法,参数个数是否合法),如果command不合法在返回错误的同时会检查是否处于事务状态中,如果命令
    不合法CLIENT_DIRTY_EXEC标志将导致事务执行失败.
    /* Flag the transacation as DIRTY_EXEC so that EXEC will fail.
     * Should be called every time there is an error while queueing a command. */
    void flagTransaction(client *c) {
        if (c->flags & CLIENT_MULTI)
            c->flags |= CLIENT_DIRTY_EXEC;
    }

    然后(略过一些与事务无关的步骤),检查c->flags & CLIENT_MULTI,来决定是否直接执行命令还是将命令进入queueMultiCommand

    第四步,在queueMultiCommand中,将本次命令command,argc,argv加入到c->mstate.commands队列中.
    /* Client MULTI/EXEC state */
    typedef struct multiCmd {
        robj **argv;
        int argc;
        struct redisCommand *cmd;
    } multiCmd;
    
    typedef struct multiState {
        multiCmd *commands;     /* Array of MULTI commands */
        int count;              /* Total number of MULTI commands */
        int minreplicas;        /* MINREPLICAS for synchronous replication */
        time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
    } multiState;
    typedef
    struct client { ... multiState mstate; /* MULTI/EXEC state */ ... } client;
    exec 命令开始执行事务队列中的全部命令,使用方式为:EXEC
    exec会检查当前是否处于事务状态
    if (!(c->flags & CLIENT_MULTI)) {
            addReplyError(c,"EXEC without MULTI");
            return;
        }

    然后检查该条命令来自的client监控的key是否有被touch

    /* Check if we need to abort the EXEC because:
         * 1) Some WATCHed key was touched.
         * 2) There was a previous error while queueing commands.
         * A failed EXEC in the first case returns a multi bulk nil object
         * (technically it is not an error but a special behavior), while
         * in the second an EXECABORT error is returned. */
        if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
            addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                      shared.nullmultibulk);
            discardTransaction(c);
            goto handle_monitor;
        }

    最后是遍历c->mstate.commands执行所有事务命令队列中的命令

    /* Exec all the queued commands */
        unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
        orig_argv = c->argv;
        orig_argc = c->argc;
        orig_cmd = c->cmd;
        addReplyMultiBulkLen(c,c->mstate.count);
        for (j = 0; j < c->mstate.count; j++) {
            c->argc = c->mstate.commands[j].argc;
            c->argv = c->mstate.commands[j].argv;
            c->cmd = c->mstate.commands[j].cmd;
    
            /* Propagate a MULTI request once we encounter the first command which
             * is not readonly nor an administrative one.
             * This way we'll deliver the MULTI/..../EXEC block as a whole and
             * both the AOF and the replication link will have the same consistency
             * and atomicity guarantees. */
            if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
                execCommandPropagateMulti(c);
                must_propagate = 1;
            }
    
            call(c,CMD_CALL_FULL);
    
            /* Commands may alter argc/argv, restore mstate. */
            c->mstate.commands[j].argc = c->argc;
            c->mstate.commands[j].argv = c->argv;
            c->mstate.commands[j].cmd = c->cmd;
        }
        c->argv = orig_argv;
        c->argc = orig_argc;
        c->cmd = orig_cmd;
        discardTransaction(c);
  • 相关阅读:
    Python PEP—Python增强提案
    容器技术介绍:Docker简介及安装
    Python笔记:List相关操作
    Python笔记:字符串操作
    Python笔记:属性值设置和判断变量是否存在
    mitmproxy 代理工具介绍:rewrite和map local实现
    接口测试代理工具charles mock测试
    接口测试框架Requests
    JMeter性能测试:JMeter安装及脚本录制回放
    PHP Parse error: parse error, unexpected T_OBJECT_OPERATOR
  • 原文地址:https://www.cnblogs.com/nicganon/p/7765596.html
Copyright © 2011-2022 走看看