zoukankan      html  css  js  c++  java
  • REDIS 事务机制

    基本事务操作:

        任何数据库都必须要保证一种原子执行操作:最基本的原子执行操作肯定是需要提供:

        举一个例子来说明: 当对某个Key 做一个统计: 可能不同的Client做它那部分的统计,一段时间后,服务器端需要得出那个key的具体值

      Client1: GET number

    number = number +N1;

        SET number number+N1;

    Client2: GET number

    number = number +N2;

        SET number number+N2;

    原本来讲 ,期望的值是NUMBER=NUMBER+N1+N2; 但是可能结果有其他的可能性,需要将上面的3个操作原子化,也就是这样的操作流是一个完整体,而不让pipeline被打乱~!

    REDIS事务机制

       像上述情况 必须得以解决 不然redis很难做?作者提供了2个事务机制

    19812860

    利用multi/exec来完成 multi被认为是放在同一个序列中的,按照序列化去执行命令操作

    看看源码是如何写的:

    MULTI操作

    void multiCommand(redisClient *c) {
    if (c->flags & REDIS_MULTI) {
            addReplyError(c,"MULTI calls can not be nested");
    return;
        }
        c->flags |= REDIS_MULTI;
        addReply(c,shared.ok);
    }

    每个redisClient 在同一时间都只能压入一个multi 首先检测client是否含有multi标记,如果没有  就将标记位置为REDIS_MULTI.

    命令入队操作

    redis设计中,对于某个redis客户端来讲,server端首先需要查看是否含有MULTI标记位【src/redis.c】来判断是否该讲multi命令写入到待执行队列中~!如果满足要求 就执行下面函数体:

    在执行MULTI之后  都需要做 一个命令入队操作:

    【src/multi.c】

    1. void queueMultiCommand(redisClient *c) {
    2.     multiCmd *mc;
    3. int j;
    4.     c->mstate.commands = zrealloc(c->mstate.commands,
    5. sizeof(multiCmd)*(c->mstate.count+1));
    6.     mc = c->mstate.commands+c->mstate.count;
    7.     printf("mc:%p ",mc);
    8.     mc->cmd = c->cmd;
    9.     mc->argc = c->argc;
    10.     mc->argv = zmalloc(sizeof(robj*)*c->argc);
    11.     printf("mc->argv:%d ",sizeof(robj*)*c->argc);
    12.     memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    13. for (j = 0; j < c->argc; j++)
    14.         incrRefCount(mc->argv[j]);
    15.     c->mstate.count++;
    16. }
    17. typedef struct multiCmd {
    18.     robj **argv;
    19. int argc;
    20. struct redisCommand *cmd;
    21. } multiCmd;

    line 5:给mstate重新分配一个命令所需要的空间 预分配count+1的指针

    line 6: 指针操作,定位到commands的count个命令指针

    line11以后:申请c->argc个robj*地址,将c中的argv的内容都复制给mc的argv中  这里的robj指针暂时没有看懂~!em6em6[1]

    执行命令操作

    执行主体函数就是下面的

    1. void execCommand(redisClient *c) {
    2. int j;
    3.     robj **orig_argv;
    4. int orig_argc;
    5. struct redisCommand *orig_cmd;
    6. int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
    7. if (!(c->flags & REDIS_MULTI)) {
    8.         addReplyError(c,"EXEC without MULTI-----SFWTOMS");
    9. return;
    10.     }
    11. /* Check if we need to abort the EXEC because:
    12.      * 1) Some WATCHed key was touched.
    13.      * 2) There was a previous error while queueing commands.
    14.      * A failed EXEC in the first case returns a multi bulk nil object
    15.      * (technically it is not an error but a special behavior), while
    16.      * in the second an EXECABORT error is returned. */
    17. if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
    18.         addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
    19.                                                   shared.nullmultibulk);
    20.         discardTransaction(c);
    21. goto handle_monitor;
    22.     }
    23. /* Exec all the queued commands */
    24.     unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    25.     orig_argv = c->argv;
    26.     orig_argc = c->argc;
    27.     orig_cmd = c->cmd;
    28.     addReplyMultiBulkLen(c,c->mstate.count);
    29. for (j = 0; j < c->mstate.count; j++) {
    30.         c->argc = c->mstate.commands[j].argc;
    31.         c->argv = c->mstate.commands[j].argv;
    32.         c->cmd = c->mstate.commands[j].cmd;
    33. /* Propagate a MULTI request once we encounter the first write op.
    34.          * This way we'll deliver the MULTI/..../EXEC block as a whole and
    35.          * both the AOF and the replication link will have the same consistency
    36.          * and atomicity guarantees. */
    37. if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
    38.             execCommandPropagateMulti(c);
    39.             must_propagate = 1;
    40.         }
    41.         call(c,REDIS_CALL_FULL);
    42. /* Commands may alter argc/argv, restore mstate. */
    43.         c->mstate.commands[j].argc = c->argc;
    44.         c->mstate.commands[j].argv = c->argv;
    45.         c->mstate.commands[j].cmd = c->cmd;
    46.     }
    47.     c->argv = orig_argv;
    48.     c->argc = orig_argc;
    49.     c->cmd = orig_cmd;
    50.     discardTransaction(c);
    51. /* Make sure the EXEC command will be propagated as well if MULTI
    52.      * was already propagated. */
    53. if (must_propagate) server.dirty++;
    54. handle_monitor:
    55. /* Send EXEC to clients waiting data from MONITOR. We do it here
    56.      * since the natural order of commands execution is actually:
    57.      * MUTLI, EXEC, ... commands inside transaction ...
    58.      * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
    59.      * table, and we do it here with correct ordering. */
    60. if (listLength(server.monitors) && !server.loading)
    61.         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    62. }

    从line27开始: 逐步执行每个指令

    must_propagate 初始是0  经过line 40-41的一个2重判断:

    发现c的命名不是只含有读的成分,也就是含有写的成分,execCommandPropagateMulti就成为必须的了

    execCommandPropagateMulti 这个function是用来给所有的SLAVE和AOF文件发出这个命令, 要异步到SLAVE中 为啥要这样做呢?

    首先想下: SLAVE在所有读操作是无需理会的,不会改变数据 写操作是必须得处理的~! AOF也是同样一个道理

    而40-43只会执行一次  所以也就有一个mush_propagate就只有一个变量

    执行45:调用命令

    执行完成之后就进行销毁命令 然后将dirty更改变量

    所以可以看出 整个执行过程不会去支持事务的回滚机制,不管命令M是否执行成功  都不会影响M+1的一个执行


    REDIS乐观锁

    利用上面的事务机制  你会发现上面的任务是最开始的那个例子也是无法完全避免的  REDIS提出一种乐观锁的机制: 【我个人认为很有技巧性 】

    check-and-set:检查并且重新更新  什么意思呢  就是说get  和set能够同时保持本地原子性,不会被其他客户端干扰~!

    利用调用watch命令 来监控redis某个数据库的某些Key  确保某个Key是不会被其他客户来修改  如果其他客户想要修改 那么这个redis就会执行一个任务失败的操作

    你会怎么写呢?

    首先要有一个监控watchkey的数据结构  存放被监控的watchkey 如果每次做修改操作时,不同redisClient来尝试修改这个操作时,应该都会检查是不是这个watchkey 如果是  就不让其修改。

    具体看下源代码如何实现:

    这是WATCH的更底层实现

    入口参数:客户端信息指针c

        被监控的key

    line 8-13: 判断是否是不是含有这个watchkey

    如果含有这个key 就不需要监控  映射同一个key2个条件: db相同  内容相同

    line 15 -20 如果没有key 就添加这个key

    注意line18:

    watch_key是一个观察key字典,把所有的被观察的key都放在一个dic结构体中。

    怎么添加key和value呢?

    key: 被观察的key

    value:client客户端

    这样更加验证了前面用void*作为value的必要性啊~!

    这里就又有一个watch_key 新的字典

    加入乐观锁
    1. /* Watch for the specified key */
    2. void watchForKey(redisClient *c, robj *key) {
    3.     list *clients = NULL;
    4.     listIter li;
    5.     listNode *ln;
    6.     watchedKey *wk;
    7. /* Check if we are already watching for this key */
    8.     listRewind(c->watched_keys,&li);
    9. while((ln = listNext(&li))) {
    10.         wk = listNodeValue(ln);
    11. if (wk->db == c->db && equalStringObjects(key,wk->key))
    12. return; /* Key already watched */
    13.     }
    14. /* This key is not already watched in this DB. Let's add it */
    15.     clients = dictFetchValue(c->db->watched_keys,key);
    16. if (!clients) {
    17.         clients = listCreate();
    18.         dictAdd(c->db->watched_keys,key,clients);
    19.         incrRefCount(key);
    20.     }
    21.     listAddNodeTail(clients,c);
    22. /* Add the new key to the list of keys watched by this client */
    23.     wk = zmalloc(sizeof(*wk));
    24.     wk->key = key;
    25.     wk->db = c->db;
    26.     incrRefCount(key);
    27.     listAddNodeTail(c->watched_keys,wk);
    28. }

    因为这里确实有点抽象,我利用图标加上文字来说明:

    Step 1:

    寻找本客户端是否还有相同的key

    33833528

    发现没有相同的key  Ok

    进行Step 2: 就讲new_key里进行加入到watch_keys的数据库里 

    33874400

    加入的方式是Key:就是watchkey

    而value:是一个List指针  如果含有watchkey 则把redisClient加入到改List指针末尾

    34111802

    Step3: NewKey在第一步中的WATCH_Key中

    34168898

    touch乐观锁

    如果触碰到乐观锁,会怎么样呢? 不管怎么样,至少要保证一点:不能再乐观锁解除之前执行这个key的所有写操作

    /* "Touch" a key, so that if this key is being WATCHed by some client the
    * next EXEC will fail. */
    void touchWatchedKey(redisDb *db, robj *key) {
        list *clients;
        listIter li;
        listNode *ln;
    if (dictSize(db->watched_keys) == 0) return;
        clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;
    /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
    /* Check if we are already watching for this key */
        listRewind(clients,&li);
    while((ln = listNext(&li))) {
            redisClient *c = listNodeValue(ln);
            c->flags |= REDIS_DIRTY_CAS;
        }
    }

    在这里可以看出: 触碰了之后,主调客户端没有失败,而那么加锁监控的客户端是失败的

    这里的touchWatchedKey只有看了被调用的例子才能理解真正的redis怎么处理这个触碰乐观锁的情况:

    36577055

    当Session1是没有办法执行这个age的~!也就是chang-and-set操作

    只要被watch的key在其他客户端修改  而该客户端也已经进入了multi,那么我们在mutli之间的操作将会无法做成功~!

    通过源码内部看具体看下怎么实现的~

    对于Session1 :

    Step1:src/multi.c里的watchForKey(session1,age)  加入watch的key中

    Step2:执行multi函数:准备把接下来所有的命令都加入到QUEUE队列中

    Step3:Session2 执行set(age,30) 这个时候set命令会查看这个是不是在watch_key里 到相应的watch_key字典中,如果含有 那就傻逼了,就调用Db.c里的singalTtachKey(),改写这个key的每个需要监控的客户端的REDIS_DIRTY_CAS字段为1

    Step4:Session1就非常高兴的调用execCommand(session1)结果一发现现在这个REDIS_DIRTY_CAS字段就是一个1,就全部不执行 直接返回。

    引入这个MULTI的原因

        redis本身是一个单线程,按照常理来说,指令都是序列化的,一堆需要原子操作的命令放在服务器端执行 也是按照顺序往下执行,Client A 和Client B 只需要一个或者加Watch 某个Key  不管有没有multi命令是不是就确保了其会进行原子操作呢? 在ClientA 和ClientB中,如果watch 了一个age,如果没有multi,那么假设Client A加了watch 执行了一个其中命令,而另外一个命令准备执行时,ClientB就修改了这个age,而那个时候ClientA即使读到REDIS_DIRTY_CAS为1 也起不到作用了,因为没办法进行事务的回滚操作~!

    所以只能把操作放在队列中,要么不执行,要么一下子全部执行完~!

  • 相关阅读:
    Spring Boot 配置文件 bootstrap vs application 到底有什么区别?
    一份完整的 Java 成神路线图,值得收藏!
    12 岁开始学编程,17 岁总结了 7 个重要教训!
    想成为顶尖 Java 程序员?先过了下面这些问题!
    Dubbo面试20问!这些题你都遇到过吗?
    yum安装出现No package crontabs available解决办法
    表结构设计方法
    后端token认证模板
    vue 用户登录 路由拦截 vuex cookie
    pyharm无法安装包的问题
  • 原文地址:https://www.cnblogs.com/sfwtoms/p/3946561.html
Copyright © 2011-2022 走看看