zoukankan      html  css  js  c++  java
  • Redis5设计与源码分析 (第19章 其他命令)

    了解Redis中事务、发布-订阅的实现原理及其适用范围,以及Redis如何执行Lua脚本命令。

    19.1 事务

    Redis中的事务保证一批命令原子性的执行。并且在事务执行过程中不会为任何其他命令提供服务。当Redis重新启动加载AOF文件时也会保证事务命令的完整性。

    19.1.1 事务简介

    Redis中提供了multiexec命令来显式开启与提交事务。开启事务之后,接着所有的命令会首先入队而不是直接执行,只有显示地提交事务之后该事务才会执行。

    简单示例:

    127.0.0.1:6379> multi //开启事务

    127.0.0.1:6379> incr counter1 //counter1加1

    QUEUED //命令入队

    127.0.0.1:6379> incr counter2 //counter2加1

    QUEUED //命令入队

    127.0.0.1:6379> exec //执行事务

    1) (integer) 1

    2) (integer) 1

    说明:

    以multi开启一个事务,然后逐条将命令入队,以exec表示提交并开始执行事务。exec命令会逐条返回入队命令执行的结果。

    放弃一个事务用discard命令,例如上文示例,如exec替换为discard,即会放弃该事务。

    另外,Redis使用watch命令提供了一种乐观锁机制watch命令可以监听多个key,只有当被监听的key未修改时,事务才会执行。示例如下:

    当被监听的键counter1被修改后,事务并没有执行。注意,当一个事务发送exec或者discard命令后,所有watch的key会自动unwatch。

    19.1.2 事务命令实现

    watch和unwatch实现了一种乐观锁机制;

    multi用来显式地开启一个事务;

    exec用来提交并执行事务;

    discard用来放弃事务;

    1.watch与unwatch命令

    1)watch:监听指定的key,指定的key没有变化时该连接上的事务才会执行。返回ok。

    格式:watch key [key …]

    2)unwatch:不再监听该连接上watch指定的所有key。返回值为ok。

    格式:unwatch

    Redis中有一个redisServer结构的全局变量server,负责记录服务的状态信息,每个连接有自己单独的client结构体,负责记录每个连接相关的信息。

    /* watchCommand循环调用 watchForKey*/

    void watchForKey(client *c, robj *key) {

    list *clients = NULL;

    listIter li;

    listNode *ln;

    watchedKey *wk;

    /* key被watch 则 returun */ ......

    /* key没有被watch,需要添加它 */

    //先找到key的clients,如果没有clients则创建

    clients = dictFetchValue(c->db->watched_keys,key);

    if (!clients) {

    clients = listCreate();

    // clients添加到redisDb的watched_keys字典

    dictAdd(c->db->watched_keys,key,clients);

    incrRefCount(key);

    }

    // 再将 c 添加到 clients 末尾

    listAddNodeTail(clients,c);

    /* Add the new key to the list of keys watched by this client */

    wk = zmalloc(sizeof(*wk));

    wk->key = key;

    wk->db = c->db;

    incrRefCount(key);

    //添加wk 到 客户端 c 的 watched_keys链表中

    listAddNodeTail(c->watched_keys,wk);

    }

    watch命令执行时,会进行如下两步操作。

    ①在client上的一个链表(list *watched_keys)中保存监听key的信息,如图19-1所示。

    图19-1 client链表结构

    typedef struct client {

    ...

    list *watched_keys; //watch_keys(链表)

    ...

    } client;

    watched_keys链表中的每个节点会保存监听的key以及该key属于哪个db。

    ②在server上的一个字典类型的结构中保存监听key的信息,如图19-2所示。

    图19-2 server dict结构

    struct redisServer {

    ...

    redisDb *db; //server中每个db会有一个redisDb类型的结构

    ...

    } server

    typedef struct redisDb {

    ...

    dict *watched_keys; //watched_keys(字典)

    ...

    } redisDb;

    server中每个db由一个redisDb结构体表示,每个redisDb结构体中有一个名为watched_keys的dict。dict的key为监听的key,而值为一个链表,链表中的节点为监听该key的client结构体。

    当Redis中执行写命令时,每个写命令都会调用touchWatchedKey函数。该函数从该dict中查找被修改的key是否处于监听状态,如果是,则依次遍历dict中该key对应的值链表,将每个client置一个修改标志,如下所示:

    /* "Touch" 一个key,如果某个客户端正在监视此key,则下一个EXEC将失败 */

    void touchWatchedKey(redisDb *db, robj *key) {

    list *clients;

    listIter li;

    listNode *ln;

    if (dictSize(db->watched_keys) == 0) return; //如果没有监听的key,直接返回

    clients = dictFetchValue(db->watched_keys, key); //查看被修改的key是否处于监听状态

    if (!clients) return; //如果未监听,直接返回

    /* 否则依次遍历链表,设置标志CLIENT_DIRTY_CAS */

    listRewind(clients,&li);

    while((ln = listNext(&li))) {

    client *c = listNodeValue(ln);

    c->flags |= CLIENT_DIRTY_CAS;

    }

    }

    被监听的key只是在对应的client端设置了一个标志(CLIENT_DIRTY_CAS),发送exec命令执行事务时,会具体检测client端的标志,然后决定相应的处理流程。

    unwatch命令:

    其实就是删除相应client端和server端的监听状态。

    首先从client端的链表中取出key和对应的db

    然后删除server端相应的监听信息

    成功后再将client端的对应链表节点删除

    执行完毕后,该连接所有被监听的key都会恢复到未监听状态。

    2.开启事务

    事务需要显式执行一个开启命令,Redis读取到该命令后会认为接下来输入的命令属于一个事务,会首先将命令放入队列而不是直接执行并返回。

    void multiCommand(client *c) {

    if (c->flags & CLIENT_MULTI) { //如果已经执行过multi命令,则不能再次执行

    addReplyError(c,"MULTI calls can not be nested");

    return;

    }

    c->flags |= CLIENT_MULTI; //client结构体置CLIENT_MULTI标志

    addReply(c,shared.ok);

    }

    multi命令只是给client结构体置一个CLIENT_MULTI标志位,并且Redis的事务不能嵌套,即不能在一个开启的事务内再次调用multi命令开启一个新事务。

    3.命令入队

    Redis首先会调用processCommand。当开启一个事务后,client结构体置一个CLIENT_MULTI标志位,processComamnd函数会根据客户端是否有此标志来决定接下来的命令是入队处理还是直接执行,代码如下:

    int processCommand(client *c) {

    moduleCallCommandFilters(c);

    /* 校验命令是否错误, 命令参数个数是否符合要求 ,是否 开启了密码校验 等 . */

    ...

    /* Exec the command */

    if (c->flags & CLIENT_MULTI &&

    c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&

    c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)

    {

    //如果client有CLIENT_MULTI标志并且不是exec,discard,multi和watch命令,则命令放入队列

    queueMultiCommand(c);

    addReply(c,shared.queued);

    } else { //否则调用call命令

    call(c,CMD_CALL_FULL);

    ...

    }

    }

    保存命令的队列实现:

    typedef struct client {

    ...

    multiState mstate; //命令队列

    ...

    } client;

    client结构体中有一个mstate结构体,会将所有命令按顺序排列好并保存;

    图19-3 命令队列

    4.执行事务

    exec命令来显式提交一个事务。exec命令执行所有入队命令并将命令返回值依次返回。

    处理流程:

    exec命令首先会检查client的标志位,查看是否开启了事务或者被监听的key是否有改动以及入队命令是否有错误。只有通过这些检验之后才会开始真正执行事务。

    void execCommand(client *c) {

    ...

    //检测是否有CLIENT_MULTI标志,事务未开启,直接返回错误

    if (!(c->flags & CLIENT_MULTI)) {

    addReplyError(c,"EXEC without MULTI");

    return;

    }

    /* 是否有CLIENT_DIRTY_CAS和CLIENT_DIRTY_EXEC标志 */

    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {

    addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :

    shared.nullarray[c->resp]);

    discardTransaction(c); //放弃事务

    goto handle_monitor;

    }

    /*执行队列中所有命令 */

    unwatchAllKeys(c); /*unwatch所有的key*/

    orig_argv = c->argv;

    orig_argc = c->argc;

    orig_cmd = c->cmd;

    addReplyArrayLen(c,c->mstate.count);

    for (j = 0; j < c->mstate.count; j++) {

    ...

    //依次调用每条入队的命令

    call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);

             ...

    }

    }

    5.放弃事务

    discard命令显式放弃一个事务。放弃以multi开启的事务。返回值为ok。

    void discardCommand(client *c) {

    if (!(c->flags & CLIENT_MULTI)) { //检查是否已经开启事务

    addReplyError(c,"DISCARD without MULTI");

    return;

    }

    discardTransaction(c); //放弃事务

    addReply(c,shared.ok);

    }

     

    void discardTransaction(client *c) {

    freeClientMultiState(c); //首先会将所有入队命令清空,

    initClientMultiState(c); //然后将client上事务相关的flags清空

    c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);

    unwatchAllKeys(c); //最后将所有监听的keys取消监听。

    }

    19.2 发布-订阅命令实现

    解耦了生产者和消费者,生产者可以向指定的channel发送消息而无须关心是否有消费者以及消费者是谁,而消费者订阅指定的channel之后可以接收发送给该channel的消息,也无须关心由谁发送;

    简单的发布-订阅实例:

    subscribe channel1 //订阅channel1

    publish channel1 message1 //发布一条消息

    模式匹配类型:

    Redis中还提供了一种的订阅发布,例如可以按如下命令格式订阅:

    PSUBSCRIBE news.*

    此时所有向匹配news.*类型的channel发送的信息,该订阅都能够接收。

    struct redisServer {

    ...

    dict *pubsub_channels; //key为channel值,value为一个个clients

    list *pubsub_patterns; //链表,节点值为一个个pubsubPattern

    ...

    }

    ·pubsub_channels 字典,key为channel的值,而value是每个订阅了该channel的clients

    ·pubsub_patterns 链表,链表中的节点值为一个个pubsubPattern结构体。

    我们看下该结构体的组成。

    typedef struct pubsubPattern {

    client *client; //订阅该模式的客户端

    robj *pattern; //模式结构体

    } pubsubPattern;

    所有的订阅发布命令其实都是在操作pubsub_channels和pubsub_patterns这两个结构;

    1.发布命令(publish)

    作用是向指定channel发送消息。

    格式:publish channel message

    返回值: 整型数,收到消息的客户端个数。

    publish执行流程如下。

    1)从pubsub_channels字典中以推送到的channel为key,取出所有订阅了该channel的客户端,依次向每个客户端返回推送的数据。

    2)依次遍历pubsub_patterns,将链表中每个节点的模式字段pattern和推送的channel进行比较,如果能够匹配,则向节点中订阅了该pattern的客户端返回推送的数据。

    int pubsubPublishMessage(robj *channel, robj *message) {

    int receivers = 0;

    dictEntry *de;

    dictIterator *di;

    listNode *ln;

    listIter li;

    // pubsub_channels(dict) 取出订阅该channel的客户端

    de = dictFind(server.pubsub_channels,channel);

    // 如果有订阅该channel的客户端,依次向客户端发送该条消息

    if (de) {

    ... // li 是迭代器

    while ((ln = listNext(&li)) != NULL) {

    client *c = ln->value;

    addReplyPubsubMessage(c,channel,message);

    receivers++;

    }

    }

    /* 遍历链表,将推送的channel和每个模式比较,如果匹配将消息发送给订阅该模的客户端*/

    di = dictGetIterator(server.pubsub_patterns_dict); //从模式字典中获取迭代器

    if (di) {

    channel = getDecodedObject(channel);

    while((de = dictNext(di)) != NULL) {

    robj *pattern = dictGetKey(de);

    list *clients = dictGetVal(de);

    if (!stringmatchlen((char*)pattern->ptr,

    sdslen(pattern->ptr),

    (char*)channel->ptr,

    sdslen(channel->ptr),0)) continue;

    listRewind(clients,&li);

    while ((ln = listNext(&li)) != NULL) {

    client *c = listNodeValue(ln);

    addReplyPubsubPatMessage(c,pattern,channel,message);

    receivers++;

    }

    }

    decrRefCount(channel);

    dictReleaseIterator(di);

    }

    return receivers;

    }

    注意,publish命令执行完毕之后会同步到Redis从服务中。这样,如果一个客户端订阅了从服务的channel,在主服务中向该channel推送消息时,该客户端也能收到推送的消息。

    2.订阅命令(subscribe)

    作用是订阅指定的渠道。

    格式:subscribe channel [channel ...]

    返回值: 数组,第一个元素固定为subscribe,第二个元素为订阅的channel,第三个元素为该客户端总共订阅的channel个数(包括模式订阅)。

    当一个客户端执行subscribe命令后会进入pub/sub模式,该种模式下,该客户端只能执行如下几类命令:ping、subscribe、unsubscribe、psubscribe和punsubscribe。

    实现方法也很简单,当执行subscribe命令后会将该client打一个CLIENT_PUBSUB标记,然后执行其他命令时会在processCommand函数中进行判断;如下:

    if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&

    c->cmd->proc != pingCommand &&

    c->cmd->proc != subscribeCommand &&

    c->cmd->proc != unsubscribeCommand &&

    c->cmd->proc != psubscribeCommand &&

    c->cmd->proc != punsubscribeCommand) {

    rejectCommandFormat(c,

    "Can't execute '%s': only (P)SUBSCRIBE / "

    "(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",

    c->cmd->name);

    return C_OK;

    }

    subscribe命令执行流程: (有点类似上面watch key的过程)

    1)首先看clients结构体中的pubsub_channels[1]中是否有该channel,如果已经存在则直接返回,否则将其加入pubsub_channels字典中。

    2)将订阅该channel的客户端加入server结构体的pubsub_channels中(key仍然是订阅的channelvalue为一个订阅该channel的客户端链表)。

    void subscribeCommand(client *c) {

    int j; //依次对每个订阅的channel执行订阅操作

    for (j = 1; j < c->argc; j++)

    pubsubSubscribeChannel(c,c->argv[j]);

    c->flags |= CLIENT_PUBSUB; //客户端置CLIENT_PUBSUB标志,进入pub/sub模式

    }

    int pubsubSubscribeChannel(client *c, robj *channel) {

    dictEntry *de;

    list *clients = NULL;

    int retval = 0;

    /* 首先将订阅的channel加入client的pubsub_channels中,如果存在则直接返回*/

    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {

    retval = 1;

    incrRefCount(channel);

     

    de = dictFind(server.pubsub_channels,channel);

    if (de == NULL) {

    clients = listCreate();

    dictAdd(server.pubsub_channels,channel,clients);

    incrRefCount(channel);

    } else {

    clients = dictGetVal(de);

    }

    //将该client加入server.pubsub_channels中订阅该channel的值链表

    listAddNodeTail(clients,c);

    }

    /* Notify the client */

    addReplyPubsubSubscribed(c,channel);

    return retval;

    }

    3.取消订阅命令(unsubscribe)

    作用是取消订阅某个渠道,如果不指定,则该客户端所有的订阅都会取消。

    格式:unsubscribe [channel [channel ...]]

    将client和server结构体中的相应结构进行修改即可。如果客户端所有的channel都取消订阅,则退出pub/sub模式。代码流程如下:

    void unsubscribeCommand(client *c) {

    if (c->argc == 1) { //如果未指定channel,则取消所有channel的订阅

    pubsubUnsubscribeAllChannels(c,1);

    } else {

    int j;

    for (j = 1; j < c->argc; j++) //依次取消订阅指定的channel

    pubsubUnsubscribeChannel(c,c->argv[j],1);

    } //如果客户端不再订阅任何channel,则退出pubsub模式

    if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;

    }

    4.订阅指定模式渠道命令(psubscribe)

    作用是订阅由指定模式表示的所有渠道,匹配该模式的所有渠道发送的消息都会被接收。

    格式:psubscribe pattern [pattern ...]

    ·pattern字符:"?"代表一个字符;"*"代表多个字符;"[]"代表选取其中任意一个字符。

    返回值: 数组,第一项为固定的psubscribe,第二项为订阅的pattern,第三项为当前客户端订阅的所有channel和pattern个数。

    命令的执行流程:

    ①在client的pubsub_patterns链表中查找该pattern,如果找到说明已经订阅,则直接返回;

    ②否则将pattern加入client的pubsub_patterns和server的pubsub_patterns链表中。

    int pubsubSubscribePattern(client *c, robj *pattern) {

    ...

    // client的pubsub_patterns链表中寻找需要订阅的pattern,未找到则添加该pattern

    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {

    retval = 1;

    pubsubPattern *pat;

    //将该pattern加入client的pubsub_patterns链表中

    listAddNodeTail(c->pubsub_patterns,pattern);

    incrRefCount(pattern);

    pat = zmalloc(sizeof(*pat));

    pat->pattern = getDecodedObject(pattern);

    pat->client = c;

    listAddNodeTail(server.pubsub_patterns,pat);

    //将该pattern加入server的pubsub_patterns链表中

    de = dictFind(server.pubsub_patterns_dict,pattern);

    if (de == NULL) {

    clients = listCreate();

    dictAdd(server.pubsub_patterns_dict,pattern,clients);

    incrRefCount(pattern);

    } else {

    clients = dictGetVal(de);

    }

    listAddNodeTail(clients,c);

    }

    /* Notify the client */

    addReplyPubsubPatSubscribed(c,pattern);

    return retval;

    }

    5.取消订阅指定渠道命令(punsubscribe)

    用于取消订阅指定的模式,如果没有指定,则取消所有该客户端订阅的模式。

    格式:punsubscribe [pattern [pattern ...]]

    该命令做psubscribe的反向操作即可。

     

    6.查看订阅状态命令(pubsub)

    用来查看所有发布订阅的相关状态。

    格式:pubsub subcommand [argument [argument ...]]

    子命令分别:

    CHANNELS子命令格式:PUBSUB CHANNELS [pattern]

    返回值: 不指定pattern时返回该客户端订阅的所有channels列表。

    指定pattern时只返回匹配该pattern的channels列表。

    NUMSUB子命令格式:pubsub NUMSUB [channel-1 ... channel-N]

    返回值:返回指定channel有几个订阅的客户端。

    NUMPAT子命令格式:pubsub NUMPAT

    返回值:返回模式订阅的个数。

    pubsub命令执行流程:

    为根据指定的子命令,去获取相应的订阅channel或者订阅pattern个数。

    void pubsubCommand(client *c) {

    ...

    //channels子命令

    if (!strcasecmp(c->argv[1]->ptr,"channels") &&

    (c->argc == 2 || c->argc == 3))

    {

    /* PUBSUB CHANNELS [<pattern>] */

    sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;

    dictIterator *di = dictGetIterator(server.pubsub_channels);

    dictEntry *de;

    long mblen = 0;

    void *replylen;

     

    replylen = addReplyDeferredLen(c);

    while((de = dictNext(di)) != NULL) {

    robj *cobj = dictGetKey(de);

    sds channel = cobj->ptr;

    //根据是否指定模式,返回所有订阅的channels或者匹配pattern的channels

    if (!pat || stringmatchlen(pat, sdslen(pat),

    channel, sdslen(channel),0))

    { ... }

    }

    ...

    } else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {

    //numsub子命令,返回所有订阅了指定channel的客户端个数

    ...

    for (j = 2; j < c->argc; j++) {

    list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);

    addReplyBulk(c,c->argv[j]);

    addReplyLongLong(c,l ? listLength(l) : 0);

    }

    } else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {

    //numpat子命令,返回模式订阅的个数

    addReplyLongLong(c,listLength(server.pubsub_patterns));

    } else {

    addReplySubcommandSyntaxError(c);

    }

    }

    小结:

    发布订阅相关命令的具体实现原理。

    注意,每个clients也有相应的一对pubsub_channels和pubsub_patterns,

    clients中的pubsub_channels也是一个字典,key是订阅的channel,value是空。

    19.3 Lua脚本

    Redis可以运行Lua代码,还保证脚本以原子方式执行:在执行脚本时不会执行其他脚本或Redis命令。这种语义类似于MULTI/EXEC,也就是说我们可以编写带逻辑操作的Lua代码让Redis服务端执行,这解决了Redis命令执行之间非原子性的问题。

    下面是一个简单例子,将key1的值作为key2的自增步长

    set key1 3

    set key2 0

    eval "local v1 = redis.call('get', KEYS[1]); return redis.call('incrby', KEYS[2], v1);" 2 key1 key2

    说明:

    上述的Lua代码中getincrby命令之间不会有别的客户端能执行Redis命令,因为eval命令还没有执行完。同时,也不会有别的脚本在执行,因为在Lua初始化时只有一个Lua解释器存在。从其他客户端的角度来看,脚本的效果要么不可见,要么已经完成。

    Lua环境初始化下面这些工作。

    19.3.1 初始化Lua环境

    redisServer结构体中Lua脚本相关的属性及作用;

    struct redisServer {

    ...

    lua_State *lua; /* Lua解释器,所有客户端共用*/

    client *lua_client; /* Lua中向Redis查询的"伪客户端 */

    client *lua_caller; /* 正在执行(eval)脚本调用的客户端 */

    dict *lua_scripts; /* SHA1和原始脚本的字典映射 */

    unsigned long long lua_scripts_mem; /*缓存脚本使用的内存,单位:字节 */ */

    mstime_t lua_time_limit; / 脚本超时,单位:毫秒 */

    mstime_t lua_time_start; /*脚本启动时间,单位:毫秒 */

    int lua_write_dirty; /* 脚本执行期间有调用写命令,则为true */ */

    int lua_random_dirty; /*脚本执行期间有调用随机命令,则为true */. */

    int lua_replicate_commands; /* 如果是脚本效果复制,则为True *. */

    int lua_multi_emitted;/* 如果传播事务,则为true */

    int lua_repl; /* 脚本复制标志 */

    int lua_timedout; /* 脚本执行超时,则为true */

    int lua_kill; /* 杀死脚本,则为true. */

    int lua_always_replicate_commands; /* D默认复制类型 . */

    ...

    }

    Lua为所有客户端共用,这也是Lua脚本原子性的一个保证,避免了同时有多个Lua脚本的执行;lua_client是向Redis查询时的伪客户端,

    例如: eval "return redis.call('get', KEYS[1]);" 1 key1

    上面代码执行流程为:

    Redis客户端调用eval执行Lua脚本,Lua客户端调用get命令查询Redis数据库;Redis数据库将数据返回给Lua客户端,Lua客户端再将数据返回给Redis客户端。

     

    Lua环境初始化具体流程:

    在Redis服务端初始化程序(initServer)靠后的地方会调用脚本初始化函数(scriptingInit)进行Lua环境的初始化和修改。

    1)载入Lua库,其中包含Lua标准库(base、table、string、math等)和其他非标准库(struct、cjson、cmsgpack等);为了安全执行Lua代码,Redis删除了loadfile和dofile函数,禁止文件读写操作。

    luaLoadLibraries(lua);

    luaRemoveUnsupportedFunctions(lua);

    2)初始化server.lua_scripts字典变量,用于SHA(key)到脚本(value)的映射,在复制、创建Lua函数,执行了script exists等命令时会用到。

    server.lua_scripts = dictCreate(&shaScriptObjectDictType,NULL);

    server.lua_scripts_mem = 0;

    3)注册Redis命令表和字段,并将Redis设置为全局变量。Redis全局表函数,见表19-1。

    表19-1 Redis全局表函数

    4)用Redis实现的随机函数redis_math_random替换Lua随机函数(math.random和math.randomseed),在不同的机器上,Redis保证相同的输出,避免数据不一致

    5)同样是避免数据不一致问题,添加排序辅助函数__redis__compare_helper),用于需要排序的命令(命令标志类型含S的命令,参考第9章)。

    6)添加用于pcall错误报告的辅助函数__redis__err__handler),禁用Lua全局变量,试图读写全局变量将会出错。

    7)最后将Lua环境变量挂在服务器Lua属性下(server.lua)。至此,Redis的Lua环境初始化工作完成,

    server.lua = lua;

    下面介绍Lua中的redis.callpcall命令的处理函数(luaRedisGenericCommand)如何执行Redis命令的。

    19.3.2 在Lua中调用Redis命令

    使用redis.call或者redis.pcall函数,每个命令的参数不一样,所以redis.call(pcall)的参数也不一样(根据命令格式来传参数);

    #define LUA_CMD_OBJCACHE_SIZE 32

    #define LUA_CMD_OBJCACHE_MAX_LEN 64

    int luaRedisGenericCommand(lua_State *lua, int raise_error) {

    int j, argc = lua_gettop(lua);

    struct redisCommand *cmd;

    client *c = server.lua_client;

    sds reply;

    /* Cached across calls. */

    static robj **argv = NULL;

    static int argv_size = 0;

    static robj *cached_objects[LUA_CMD_OBJCACHE_SIZE];

    static size_t cached_objects_len[LUA_CMD_OBJCACHE_SIZE];

    static int inuse = 0; /* 递归呼叫检测 */

    /* 递归 调用 luaRedisGenericCommand函数 */

    if (inuse) {

    char *recursion_warning =

    "luaRedisGenericCommand() recursive call detected. "

    "Are you doing funny stuff with Lua debug hooks?";

    serverLog(LL_WARNING,"%s",recursion_warning);

    luaPushError(lua,recursion_warning);

    return 1;

    }

    inuse++;

    /* 校验至少需要一个参数 */

    if (argc == 0) {

    luaPushError(lua,

    "Please specify at least one argument for redis.call()");

    inuse--;

    return raise_error ? luaRaiseError(lua) : 1;

    }

    /* 绑定参数向量 */

    if (argv_size < argc) {

    argv = zrealloc(argv,sizeof(robj*)*argc);

    argv_size = argc;

    }

    for (j = 0; j < argc; j++) {

    char *obj_s;

    size_t obj_len;

    char dbuf[64];

     

    if (lua_type(lua,j+1) == LUA_TNUMBER) {

    /* We can't use lua_tolstring() for number -> string conversion

    * since Lua uses a format specifier that loses precision. */

    lua_Number num = lua_tonumber(lua,j+1);

    obj_len = snprintf(dbuf,sizeof(dbuf),"%.17g",(double)num);

    obj_s = dbuf;

    } else {

    obj_s = (char*)lua_tolstring(lua,j+1,&obj_len);

    if (obj_s == NULL) break; /* Not a string. */

    }

     

    /* Try to use a cached object. */

    if (j < LUA_CMD_OBJCACHE_SIZE && cached_objects[j] &&

    cached_objects_len[j] >= obj_len)

    {

    sds s = cached_objects[j]->ptr;

    argv[j] = cached_objects[j];

    cached_objects[j] = NULL;

    memcpy(s,obj_s,obj_len+1);

    sdssetlen(s, obj_len);

    } else {

    argv[j] = createStringObject(obj_s, obj_len);

    }

    }

    /* 校验参数是不是字符串或整数. */

    if (j != argc) {

    j--;

    while (j >= 0) {

    decrRefCount(argv[j]);

    j--;

    }

    luaPushError(lua,

    "Lua redis() command arguments must be strings or integers");

    inuse--;

    return raise_error ? luaRaiseError(lua) : 1;

    }

    /* 设置伪客户端以执行命令* */

    c->argv = argv;

    c->argc = argc;

    c->user = server.lua_caller->user;

    /* Process module hooks */

    moduleCallCommandFilters(c);

    argv = c->argv;

    argc = c->argc;

     

    /* Log the command if debugging is active. */

    if (ldb.active && ldb.step) {

    sds cmdlog = sdsnew("<redis>");

    for (j = 0; j < c->argc; j++) {

    if (j == 10) {

    cmdlog = sdscatprintf(cmdlog," ... (%d more)",

    c->argc-j-1);

    break;

    } else {

    cmdlog = sdscatlen(cmdlog," ",1);

    cmdlog = sdscatsds(cmdlog,c->argv[j]->ptr);

    }

    }

    ldbLog(cmdlog);

    }

     

    /* )查找Redis命令是否存在,并验证该命令的参数数量。 */

    cmd = lookupCommand(argv[0]->ptr);

    if (!cmd || ((cmd->arity > 0 && cmd->arity != argc) ||

    (argc < -cmd->arity)))

    {

    if (cmd)

    luaPushError(lua,

    "Wrong number of args calling Redis command From Lua script");

    else

    luaPushError(lua,"Unknown Redis command called from Lua script");

    goto cleanup;

    }

    c->cmd = c->lastcmd = cmd;

     

    /* There are commands that are not allowed inside scripts. */

    if (cmd->flags & CMD_NOSCRIPT) {

    luaPushError(lua, "This Redis command is not allowed from scripts");

    goto cleanup;

    }

     

    /* Check the ACLs. */

    int acl_keypos;

    int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);

    if (acl_retval != ACL_OK) {

    addACLLogEntry(c,acl_retval,acl_keypos,NULL);

    if (acl_retval == ACL_DENIED_CMD)

    luaPushError(lua, "The user executing the script can't run this "

    "command or subcommand");

    else

    luaPushError(lua, "The user executing the script can't access "

    "at least one of the keys mentioned in the "

    "command arguments");

    goto cleanup;

    }

     

    /*)校验命令标志类型,比如:不能使用Lua脚本禁止执行的命令(s)、不能在不确定命令后调用写命令(w)、内存满了拒绝执行的命令(m)等,如果命令标志类型是写命令或者随机命令则会修改server的lua_write_dirty或lua_random_dirty属性。如果这是Redis集群节点,服务器不处于loading状态,也不是从主服务器接收的命令时,需要确保Lua不会尝试访问非本地key。 */

    if (cmd->flags & CMD_WRITE) {

    int deny_write_type = writeCommandsDeniedByDiskError();

    if (server.lua_random_dirty && !server.lua_replicate_commands) {

    luaPushError(lua,

    "Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode.");

    goto cleanup;

    } else if (server.masterhost && server.repl_slave_ro &&

    !server.loading &&

    !(server.lua_caller->flags & CLIENT_MASTER))

    {

    luaPushError(lua, shared.roslaveerr->ptr);

    goto cleanup;

    } else if (deny_write_type != DISK_ERROR_TYPE_NONE) {

    if (deny_write_type == DISK_ERROR_TYPE_RDB) {

    luaPushError(lua, shared.bgsaveerr->ptr);

    } else {

    sds aof_write_err = sdscatfmt(sdsempty(),

    "-MISCONF Errors writing to the AOF file: %s ",

    strerror(server.aof_last_write_errno));

    luaPushError(lua, aof_write_err);

    sdsfree(aof_write_err);

    }

    goto cleanup;

    }

    }

     

    /* 如果达到通过maxmemory配置的内存限制,则不允许可能扩大内存使用量的命令,但前提是这是该脚本上下文中的第一个写入操作,否则我们不能在中间停止 */

    if (server.maxmemory && /* Maxmemory is actually enabled. */

    !server.loading && /* Don't care about mem if loading. */

    !server.masterhost && /* Slave must execute the script. */

    server.lua_write_dirty == 0 && /* Script had no side effects so far. */

    server.lua_oom && /* Detected OOM when script start. */

    (cmd->flags & CMD_DENYOOM))

    {

    luaPushError(lua, shared.oomerr->ptr);

    goto cleanup;

    }

    if (cmd->flags & CMD_RANDOM) server.lua_random_dirty = 1;

    if (cmd->flags & CMD_WRITE) server.lua_write_dirty = 1;

     

    // 服务器处于集群和非loading状态,并且该命令不是从主服务器接收的,那么需要校验是否是本地keys

    if (server.cluster_enabled && !server.loading &&

    !(server.lua_caller->flags & CLIENT_MASTER))

    {

    int error_code;

    /* lua客户端中复制相关标志。. */

    c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);

    c->flags |= server.lua_caller->flags & (CLIENT_READONLY|CLIENT_ASKING);

    if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,&error_code) !=

    server.cluster->myself) // 查看key是否属于当前节点

    {

    if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {

    luaPushError(lua,

    "Lua script attempted to execute a write command while the "

    "cluster is down and readonly");

    } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {

    luaPushError(lua,

    "Lua script attempted to execute a command while the "

    "cluster is down");

    } else {

    luaPushError(lua,

    "Lua script attempted to access a non local key in a "

    "cluster node");

    }

    goto cleanup;

    }

    }

    /* )如果使用脚本效果复制,则需要将涉及的命令包装在MULTI/EXEC事务块中,保证副本的执行也是原子的 */

    if (server.lua_replicate_commands &&

    !server.lua_multi_emitted &&

    !(server.lua_caller->flags & CLIENT_MULTI) &&

    server.lua_write_dirty &&

    server.lua_repl != PROPAGATE_NONE)

    {

    execCommandPropagateMulti(server.lua_caller);

    server.lua_multi_emitted = 1;

    /* lua_client应该被*标记为CLIENT_MULTI。 */

    c->flags |= CLIENT_MULTI;

    }

     

    /* Run the command */

    int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;

    if (server.lua_replicate_commands) {

    /* Set flags according to redis.set_repl() settings. */

    if (server.lua_repl & PROPAGATE_AOF)

    call_flags |= CMD_CALL_PROPAGATE_AOF;

    if (server.lua_repl & PROPAGATE_REPL)

    call_flags |= CMD_CALL_PROPAGATE_REPL;

    }

    // )调用call执行命令

    call(c,call_flags);

     

    /* )将Redis命令的结果转换为合适的Lua类型 */

    if (listLength(c->reply) == 0 && c->bufpos < PROTO_REPLY_CHUNK_BYTES) {

    c->buf[c->bufpos] = '';

    reply = c->buf;

    c->bufpos = 0;

    } else {

    reply = sdsnewlen(c->buf,c->bufpos);

    c->bufpos = 0;

    while(listLength(c->reply)) {

    clientReplyBlock *o = listNodeValue(listFirst(c->reply));

    reply = sdscatlen(reply,o->buf,o->used);

    listDelNode(c->reply,listFirst(c->reply));

    }

    }

    if (raise_error && reply[0] != '-') raise_error = 0;

    redisProtocolToLuaType(lua,reply);

     

    /* If the debugger is active, log the reply from Redis. */

    if (ldb.active && ldb.step)

    ldbLogRedisReply(reply);

    // )函数redis.call调用完成,返回结果。

    if ((cmd->flags & CMD_SORT_FOR_SCRIPT) &&

    (server.lua_replicate_commands == 0) &&

    (reply[0] == '*' && reply[1] != '-')) {

    luaSortArray(lua);

    }

    if (reply != c->buf) sdsfree(reply);

    c->reply_bytes = 0;

     

    cleanup:

    for (j = 0; j < c->argc; j++) {

    robj *o = c->argv[j];

    /* 尝试将对象缓存在cached_objects数组中 */

    if (j < LUA_CMD_OBJCACHE_SIZE &&

    o->refcount == 1 &&

    (o->encoding == OBJ_ENCODING_RAW ||

    o->encoding == OBJ_ENCODING_EMBSTR) &&

    sdslen(o->ptr) <= LUA_CMD_OBJCACHE_MAX_LEN)

    {

    sds s = o->ptr;

    if (cached_objects[j]) decrRefCount(cached_objects[j]);

    cached_objects[j] = o;

    cached_objects_len[j] = sdsalloc(s);

    } else {

    decrRefCount(o);

    }

    }

    if (c->argv != argv) {

    zfree(c->argv);

    argv = NULL;

    argv_size = 0;

    }

    c->user = NULL;

    if (raise_error) {

    inuse--;

    return luaRaiseError(lua);

    }

    inuse--;

    return 1;

    }

    19.3.3 RedisLua数据类型转换

    在脚本中使用call或pcall调用Redis命令时 :

    Redis返回值 将转换为 Lua数据类型,对应函数为redisProtocolToLuaType

    在脚本返回给Redis时 :

    需要将Lua返回值转换为Redis数据类型,转换函数为luaReplyToRedisReply

    这种转换是一一对应的,如果将Redis类型转换为Lua类型,然后将Lua数据类型转换回Redis类型,则结果与初始值相同。

    redisProtocolToLuaType函数

    char *redisProtocolToLuaType(lua_State *lua, char* reply) {

    char *p = reply;

    switch(*p) {

    case ':': p = redisProtocolToLuaType_Int(lua,reply); break; // 整数回复

    case '$': p = redisProtocolToLuaType_Bulk(lua,reply); break; // 批量回复

    case '+': p = redisProtocolToLuaType_Status(lua,reply); break; // 状态回复

    case '-': p = redisProtocolToLuaType_Error(lua,reply); break; //错误回复

    case '*': p = redisProtocolToLuaType_Aggregate(lua,reply,*p); break; // 批量回复

    ... }

    return p;

    }

    表19-2 Redis与Lua回复类型对照

    在Lua返回浮点数时需要以string类型返回 ;

     

    luaReplyToRedisReply函数

    可对整数、字符串、布尔类型进行处理 ;

    void luaReplyToRedisReply(client *c, lua_State *lua) {

    int t = lua_type(lua,-1);

    switch(t) {

    case LUA_TSTRING: // 字符串

    addReplyBulkCBuffer(c,(char*)lua_tostring(lua,-1),lua_strlen(lua,-1));

    break;

    case LUA_TBOOLEAN: // 布尔

    if (server.lua_client->resp == 2)

    //shared.cone = createObject(OBJ_STRING,sdsnew(":1 "));

    addReply(c,lua_toboolean(lua,-1) ? shared.cone :

    shared.null[c->resp]);

    else

    addReplyBool(c,lua_toboolean(lua,-1));

    break;

    case LUA_TNUMBER: // 整数

    addReplyLongLong(c,(long long)lua_tonumber(lua,-1));

    break;

    case LUA_TTABLE: //

    /* 如果是表类型,则先检查是否是状态回复或者错误回复,

    如果都不是则递归调用 luaReplyToRedisReply处理。

    同时,如果Lua表中有nil类型,则转换停止。. */

    /* Handle error reply. */

    lua_pushstring(lua,"err");

    lua_gettable(lua,-2);

    t = lua_type(lua,-1);

    if (t == LUA_TSTRING) {

    sds err = sdsnew(lua_tostring(lua,-1));

    sdsmapchars(err," "," ",2);

    addReplySds(c,sdscatprintf(sdsempty(),"-%s ",err));

    sdsfree(err);

    lua_pop(lua,2);

    return;

    }

    lua_pop(lua,1); /* Discard field name pushed before. */

     

    /* Handle status reply. */

    lua_pushstring(lua,"ok");

    lua_gettable(lua,-2);

    t = lua_type(lua,-1);

    if (t == LUA_TSTRING) {

    sds ok = sdsnew(lua_tostring(lua,-1));

    sdsmapchars(ok," "," ",2);

    addReplySds(c,sdscatprintf(sdsempty(),"+%s ",ok));

    sdsfree(ok);

    lua_pop(lua,2);

    return;

    }

    lua_pop(lua,1); /* Discard field name pushed before. */

     

    /* Handle double reply. */

    lua_pushstring(lua,"double");

    lua_gettable(lua,-2);

    t = lua_type(lua,-1);

    if (t == LUA_TNUMBER) {

    addReplyDouble(c,lua_tonumber(lua,-1));

    lua_pop(lua,2);

    return;

    }

    lua_pop(lua,1); /* Discard field name pushed before. */

     

    /* Handle map reply. */

    lua_pushstring(lua,"map");

    lua_gettable(lua,-2);

    t = lua_type(lua,-1);

    if (t == LUA_TTABLE) {

    int maplen = 0;

    void *replylen = addReplyDeferredLen(c);

    lua_pushnil(lua); /* Use nil to start iteration. */

    while (lua_next(lua,-2)) {

    /* Stack now: table, key, value */

    luaReplyToRedisReply(c, lua); /* Return value. */

    lua_pushvalue(lua,-1); /* Dup key before consuming. */

    luaReplyToRedisReply(c, lua); /* Return key. */

    /* Stack now: table, key. */

    maplen++;

    }

    setDeferredMapLen(c,replylen,maplen);

    lua_pop(lua,2);

    return;

    }

    lua_pop(lua,1); /* Discard field name pushed before. */

     

    /* Handle set reply. */

    lua_pushstring(lua,"set");

    lua_gettable(lua,-2);

    t = lua_type(lua,-1);

    if (t == LUA_TTABLE) {

    int setlen = 0;

    void *replylen = addReplyDeferredLen(c);

    lua_pushnil(lua); /* Use nil to start iteration. */

    while (lua_next(lua,-2)) {

    /* Stack now: table, key, true */

    lua_pop(lua,1); /* Discard the boolean value. */

    lua_pushvalue(lua,-1); /* Dup key before consuming. */

    luaReplyToRedisReply(c, lua); /* Return key. */

    /* Stack now: table, key. */

    setlen++;

    }

    setDeferredSetLen(c,replylen,setlen);

    lua_pop(lua,2);

    return;

    }

    lua_pop(lua,1); /* Discard field name pushed before. */

     

    /* Handle the array reply. */

    void *replylen = addReplyDeferredLen(c);

    int j = 1, mbulklen = 0;

    while(1) {

    lua_pushnumber(lua,j++);

    lua_gettable(lua,-2);

    t = lua_type(lua,-1);

    if (t == LUA_TNIL) {

    lua_pop(lua,1);

    break;

    }

    luaReplyToRedisReply(c, lua);

    mbulklen++;

    }

    setDeferredArrayLen(c,replylen,mbulklen);

    break;

    default:

    addReplyNull(c);

    }

    lua_pop(lua,1);

    }

    需要注意的是浮点数的转换和布尔的转换 ;

    19.3.4 命令实现

    前面介绍 Lua环境初始化、Lua调用Redis命令以及Lua与Redis之间的类型转换,

    本节介绍Redis脚本命令源码实现,脚本命令包含eval、evalsha及script相关命令。

    1.脚本执行命令eval/evalsha

    (1)eval命令

    自Redis 2.6.0起,可以使用内置Lua解释器运行Lua脚本,在脚本中可以调用一个或多个Redis命令,且保持原子性。

    格式:eval script numkeys key [key ...] arg [arg ...]

    说明: 执行一个Lua脚本,script为Lua 5.1脚本,numkeys为后面key的数量,其中key和arg参数会被转化为Lua的全局变量KEYS和ARGV

    一个典型的例子:

    eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 val1 val2

    源码实现:

    void evalGenericCommand(client *c, int evalsha) {

    lua_State *lua = server.lua;

    char funcname[43];

    long long numkeys;

    long long initial_server_dirty = server.dirty;

    int delhook = 0, err;

     

    /* 1)先设置随机数种子,初始化server属性,以及lua_random_dirty、lua_write_dirty、lua_repl等,获取numkeys并校验,然后计算脚本的SHA值(如果是evalsha则校验),并在前面拼接字符串"f_",作为Lua函数名。 . */

    redisSrand48(0);

    server.lua_random_dirty = 0;

    server.lua_write_dirty = 0;

    server.lua_replicate_commands = server.lua_always_replicate_commands;

    server.lua_multi_emitted = 0;

    server.lua_repl = PROPAGATE_AOF|PROPAGATE_REPL;

     

    /* 获取numkeys并校验 */

    ...

    /* 获取脚本SHA1,然后检查此功能是否已经定义为Lua状态, 拼接字符串"f_",作为Lua函数名 */

    funcname[0] = 'f';

    funcname[1] = '_';

    if (!evalsha) {

    /* Hash the code if this is an EVAL call */

    sha1hex(funcname+2,c->argv[1]->ptr,sdslen(c->argv[1]->ptr));

    } else {

    /* We already have the SHA if it is a EVALSHA */

    int j;

    char *sha = c->argv[1]->ptr;

    /* 转换为小写 */

    for (j = 0; j < 40; j++)

    funcname[j+2] = (sha[j] >= 'A' && sha[j] <= 'Z') ?

    sha[j]+('a'-'A') : sha[j];

    funcname[42] = '';

    }

     

    /* Push the pcall error handler function on the stack. */

    lua_getglobal(lua, "__redis__err__handler");

     

    2)在Lua的全局环境里面查找这个函数是否存在,如果不存在则调用luaCreateFunction函数为Lua定义一个函数,函数名为"f_"+脚本sha1摘要,函数体就是script,之后将脚本和它的sha1摘要保存在server.lua_scripts字典中,脚本sha1为key。

    lua_getglobal(lua, funcname);

    if (lua_isnil(lua,-1)) {

    lua_pop(lua,1); /* stack中移除nil */

    if (evalsha) {

    lua_pop(lua,1); /* remove the error handler from the stack. */

    addReply(c, shared.noscripterr);

    return;

    }

    //子函数luaCreateFunction

    if (luaCreateFunction(c,lua,c->argv[1]) == NULL) {

    lua_pop(lua,1); /* remove the error handler from the stack. */

    return;

    }

    lua_getglobal(lua, funcname);

    serverAssert(!lua_isnil(lua,-1));

    }

     

    /* 3)用命令行参数key和arg填充Lua全局变量KEYS和ARGV,并选择数据库、设置超时时间、设置脚本开始执行的时间等 */

    luaSetGlobalArray(lua,"KEYS",c->argv+3,numkeys); //填充KEYS

    luaSetGlobalArray(lua,"ARGV",c->argv+3+numkeys,c->argc-3-numkeys);

     

    // 如果在脚本中调用select命令,从Redis 2.8.12开始,仅影响脚本本身的执行,

    不会修改调用脚本的客户端选择的数据库。

    server.lua_caller = c; //将当前客户端设置为正在执行Lua脚本的客户端

    server.lua_cur_script = funcname + 2;

    server.lua_time_start = mstime(); // 脚本启动时间

    server.lua_kill = 0;

    if (server.lua_time_limit > 0 && ldb.active == 0) {

    // 设置超时回调 ,子函数luaMaskCountHook

    lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000);

    delhook = 1;

    } else if (ldb.active) {

    lua_sethook(server.lua,luaLdbLineHook,LUA_MASKLINE|LUA_MASKCOUNT,100000);

    delhook = 1;

    }

    prepareLuaClient();

    /4)执行Lua脚本。 */

    err = lua_pcall(lua,0,1,-2);

     

    resetLuaClient();

     

    /*5)进行清理工作,删除超时相关设置,调用lua_gc清理Lua数据

    (每运行50次脚本,执行一次lua_gc). */

    if (delhook) lua_sethook(lua,NULL,0,0); //关闭hook

    if (server.lua_timedout) {

    server.lua_timedout = 0;

    /* 恢复检测到脚本超时时受保护的客户端 */

    unprotectClient(c);

    if (server.masterhost && server.master)

    // 将阻塞的客户端移动到非阻塞队列等待处理

    queueClientForReprocessing(server.master);

    }

    server.lua_caller = NULL; // 当前脚本执行完毕,清除lua_caller变量

    server.lua_cur_script = NULL;

     

    /*lua gc */

    #define LUA_GC_CYCLE_PERIOD 50

    {

    static long gc_count = 0;

    gc_count++;

    if (gc_count == LUA_GC_CYCLE_PERIOD) { //50脚本执行调用一次lua_gc

    lua_gc(lua,LUA_GCSTEP,LUA_GC_CYCLE_PERIOD);

    gc_count = 0;

    }

    }

    // 6)如果脚本执行错误,则返回错误,反之调用luaReplyToRedisReply将Lua类型转换为Redis回复类型。如果我们使用单个命令复制,则在至少有写入时发出exec命令。至此eval执行完毕

    if (err) {

    addReplyErrorFormat(c,"Error running script (call to %s): %s ",

    funcname, lua_tostring(lua,-1));

    lua_pop(lua,2); /* Consume the Lua reply and remove error handler. */

    } else {

    /*成功则将Lua类型转换为Redis回复类型*/

    luaReplyToRedisReply(c,lua); /* Convert and consume the reply. */

    lua_pop(lua,1); /* Remove the error handler. */

    }

     

    /*如果我们使用单个命令复制,则在至少有写入时发出exec命令。至此eval执行完毕*/

    if (server.lua_replicate_commands) {

    preventCommandPropagation(c);

    if (server.lua_multi_emitted) {

    execCommandPropagateExec(c);

    }

    }

    if (evalsha && !server.lua_replicate_commands) {

    if (!replicationScriptCacheExists(c->argv[1]->ptr)) {

    /* This script is not in our script cache, replicate it as

    * EVAL, then add it into the script cache, as from now on

    * slaves and AOF know about it. */

    robj *script = dictFetchValue(server.lua_scripts,c->argv[1]->ptr);

     

    replicationScriptCacheAdd(c->argv[1]->ptr);

    serverAssertWithInfo(c,NULL,script != NULL);

    if (server.dirty == initial_server_dirty) {

    rewriteClientCommandVector(c,3,

    resetRefCount(createStringObject("SCRIPT",6)),

    resetRefCount(createStringObject("LOAD",4)),

    script);

    } else {

    rewriteClientCommandArgument(c,0,

    resetRefCount(createStringObject("EVAL",4)));

    rewriteClientCommandArgument(c,1,script);

    }

    forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);

    }

    }

    }

     

    子函数:

    sds luaCreateFunction(client *c, lua_State *lua, robj *body) {

    char funcname[43];

    dictEntry *de;

    funcname[0] = 'f';

    funcname[1] = '_';

    sha1hex(funcname+2,body->ptr,sdslen(body->ptr));

    sds sha = sdsnewlen(funcname+2,40);

    if ((de = dictFind(server.lua_scripts,sha)) != NULL) {

    sdsfree(sha);

    return dictGetKey(de);

    }

    sds funcdef = sdsempty(); //lua函数定义字符串

    funcdef = sdscat(funcdef,"function ");

    funcdef = sdscatlen(funcdef,funcname,42); // 函数名=f_{sha1}

    funcdef = sdscatlen(funcdef,"() ",3);

    funcdef = sdscatlen(funcdef,body->ptr,sdslen(body->ptr));

    funcdef = sdscatlen(funcdef," end",4);

    if (luaL_loadbuffer(lua,funcdef,sdslen(funcdef),"@user_script")) {

    // 载入函数

    if (c != NULL) {

    addReplyErrorFormat(c,

    "Error compiling script (new function): %s ",

    lua_tostring(lua,-1));

    }

    lua_pop(lua,1);

    sdsfree(sha);

    sdsfree(funcdef);

    return NULL;

    }

    ...

    }

     

    子函数: 第3步中的超时回调函数luaMaskCountHook

    void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {

    long long elapsed = mstime() - server.lua_time_start;

    UNUSED(ar);

    UNUSED(lua);

    //如果脚本超时

    if (elapsed >= server.lua_time_limit && server.lua_timedout == 0) {

    serverLog(LL_WARNING, //打印超时日志

    "Lua slow script detected: still in execution after %lld milliseconds. "

    "You can try killing the script using the SCRIPT KILL command. "

    "Script SHA1 is: %s",

    elapsed, server.lua_cur_script);

    server.lua_timedout = 1; //设置超时标志

    /* 客户端状态加上CLIENT_PROTECTED(受保护的客户端,删除了该客户端的文件读写事件,并保持连接) 此时服务端可以接受别的客户端的命令,但只能接受script kill和shutdown nosave命令,别的命令都回复busy错误。. */

    protectClient(server.lua_caller);

    }

    if (server.lua_timedout) processEventsWhileBlocked();

    //如果脚本没有执行写入操作,则SCRIPT KILL命令可以杀死脚本,否则只能使用shutdown nosave停止服务器而不保存磁盘上的当前数据集,以避免半写数据存在

    if (server.lua_kill) {

    serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL.");

    lua_pushstring(lua,"Script killed by user with SCRIPT KILL...");

    lua_error(lua);

    }

    }

    (2)evalsha命令

    使用Redis内置的Lua解释器运行Lua脚本, evalsha使用脚本的sha1校验和作为参数,这样可以有效减少发送的数据,节约带宽

    格式:evalsha sha1 numkeys key [key ...] arg [arg ...]

    说明: 通过sha1摘要执行服务器端缓存的脚本。使用script load命令在服务器端缓存脚本。该命令与eval相同。因为eval命令每次都需要花费较多带宽到服务器,而服务器使用内部缓存机制不需要每次都编译脚本,那么eval命令就显得浪费带宽了。

    2.脚本管理命令

    脚本管理命令(script)用于管理Lua脚本,可以执行载入脚本、清空脚本、杀死脚本等操作。

    格式:script <subcommand> arg arg ... arg.

    说明:脚本相关命令,子命令如下所示。

    ·DEBUG(yes|sync|no):为后续执行的脚本设置调试模式。

    ·EXISTS<sha1>[<sha1>...]:sha1摘要对应的脚本是否存在。

    ·FLUSH:删除Lua脚本缓存。

    ·KILL:杀死当前正在执行的Lua脚本(尚未执行写入操作)。

    ·LOAD<script>:载入脚本,不执行。

    DEBUG子命令

    选项"yes"表示启用Lua脚本的非阻塞异步调试(更改将被丢弃),

    "sync"表示启用阻塞的Lua脚本的同步调试(保存对数据的更改),

    "no"则表示禁用脚本调试模式,

    if (!strcasecmp(c->argv[2]->ptr,"no")) {

    ldbDisable(c); // 关闭ldb调试模式

    addReply(c,shared.ok);

    } else if (!strcasecmp(c->argv[2]->ptr,"yes")) {

    ldbEnable(c); // 开启ldb调试模式

    addReply(c,shared.ok);

    } else if (!strcasecmp(c->argv[2]->ptr,"sync")) {

    ldbEnable(c);

    addReply(c,shared.ok);

    c->flags |= CLIENT_LUA_DEBUG_SYNC; // 开启同步调试模式

    } else { // 语法错误

    addReplyError(c,"Use SCRIPT DEBUG yes/sync/no");

    return;

    }

    EXISTS子命令

    其实是调用dictFind在server.lua_scripts字典中查找是否有对应的脚本,返回0或1。

    addReplyArrayLen(c, c->argc-2);

    for (j = 2; j < c->argc; j++) {

    if (dictFind(server.lua_scripts,c->argv[j]->ptr))

    addReply(c,shared.cone);

    else

    addReply(c,shared.czero);

    }

    FLUSH子命令

    清空服务器lua_scripts和lua_scripts_mem,并关闭server.lua之后再重新初始化,最后清空副本的脚本缓存。

    scriptingReset(); //脚本reset

    addReply(c,shared.ok);

    replicationScriptCacheFlush(); // 清空副本的脚本缓存

    KILL子命令

    需要根据服务器状态进行不同的操作:

    如果服务器没有脚本执行则返回错误;

    如果脚本是Master发送过来的则不能杀死;

    如果脚本已经执行写入命令,只能SHUTDOWN NOSAVE杀死脚本,防止半写信息持续存在。

    if (server.lua_caller == NULL) { // 没有脚本运行不需要kill

    addReplySds(c,sdsnew("-NOTBUSY No scripts in execution right now. "));

    } else if (server.lua_caller->flags & CLIENT_MASTER) { //master发送过来的脚本,不能kill

    addReplySds(c,sdsnew("-UNKILLABLE The busy scrip... "));

    } else if (server.lua_write_dirty) { // 脚本已经执行了写入命令

    addReplySds(c,sdsnew("-UNKILLABLE So.... "));

    } else {

    server.lua_kill = 1;

    addReply(c,shared.ok);

    }

    LOAD子命令

    调用luaCreateFunction函数来创建lua函数,在执行evalsha命令时如果没有对应的脚本,则会返回一个特殊错误,告知客户端使用eval。

    sds sha = luaCreateFunction(c,server.lua,c->argv[2]);

    if (sha == NULL) return;

    addReplyBulkCBuffer(c,sha,40);

    19.4 本章小结

    介绍了事务、发布-订阅、Lua脚本在Redis中的实现。

    事务和Lua脚本都可以实现原子性,但Lua脚本的功能更加强大;

    发布订阅功能也可以使用Redis 5.0中新引入的Stream实现。

      

  • 相关阅读:
    Mongodb $in $or 性能比较
    c# mongo 数组里对象更新
    C# mongodb $set或$addToSet批量更新很慢原因
    docker 使用
    Ubuntu 下更简单的防火墙 Uncomplicated Firewall
    Docker 介绍及安装
    linux 内核分析工具 Dtrace、SystemTap、火焰图、crash等
    golang 中处理大规模tcp socket网络连接的方法,相当于c语言的 poll 或 epoll
    golang 对struct进行Serialize的方法,即将存取二进制文件到struct的方法
    golang 中创建daemon的方法
  • 原文地址:https://www.cnblogs.com/coloz/p/13812859.html
Copyright © 2011-2022 走看看