zoukankan      html  css  js  c++  java
  • Redis(四):独立功能的实现

    发布与订阅

    Redis 的发布与订阅功能有PUBLISH命令,SUBSCRIBE命令,PSUBSCRIBE命令,PUBSUB命令等组成。
    客户端可以通过SUBSCRIBE命令订阅一个或多个频道,当其它客户端向被订阅的频道发送消息时,频道所有的订阅者都会收到这消息。

    频道的订阅与退订

    Redis会在redisServer中用pubsub_channels字典来记录订阅的客户端和频道的关系。其中字典的键是被订阅的频道,而字典的值是一个客户端链表,保存了订阅这个频道的所有客户端。
    比如有一个客户端执行了SUBSCRIBE HEllO,另一个客户端执行SUBSCRIBE HELLO WORLD,那么此时redisServerpubsub_channels的结构如下:

    订阅频道

    当客户端执行SUBSCRIBE <channel1> <channel2...>命令时,服务器会现在pubsub_channels字典中查询是否有对应的键,如果存在,则将客户端添加到键对应的链表的末端,如果不存在,则在字典中添加键,并关联新的链表,然后将客户端加入链表。

    退订频道

    当客户端执行UNSUBSCRIBE命令时,服务器会在pubsub_channels的字典中找到对应的键,然后遍历链表,找到客户端未自身的节点移除。如果移除完节点后,链表为空,那么会在字典中删除该键。

    模式的订阅与退订

    Redis会在redisServer中用pubsub_patterns链表保存客户端模式订阅的关系。其中链表的一个节点是一个pubsubPattern

    typedef struct pubsubPattern {
        client *client;
        robj *pattern;
    } pubsubPattern;
    

    其中client指向客户端,而pattern代表订阅的模式。
    其结构大概如下:

    订阅模式

    当客户端执行PSUBSCRIBE命令时,服务器会把客户端创建一个新的pubsubPattern结构,用来记录客户端和模式,并添加到链表的末端。

    退订模式

    当客户端执行PUNSUBCRIBE命令时,服务器会在pubsub_patterns链表中遍历查找客户端和模式都符合的pubsubPattern节点,并从链表中移除。

    消息发送

    当服务器收到来自客户端的PUBLISH <channel> <message>命令时,

    • 首先服务器会先从pubsub_channels字典中找对应的键,然后遍历链表中的客户端,发送message消息。
    • 之后服务器再遍历pubsub_patterns链表,对符合channel的模式的客户端发送消息。
    查询订阅信息

    Redis 提供了PUBSUB命令用来查询订阅信息。PUBSUB一共有三个子命令,PUBSUB CHANNELS <pattern>PUBSUB NUMSUB <channels>PUBSUB NUMPAT(即pubsub_channelssize)。

    • PUBSUB CHANNELS <pattern>:用来查询服务器当前有哪些符合模式(``pattern)的频道,如果不加pattern,则列出所有的channel(即pubsub_channels中每个键对应链表的size`)
    • PUBSUB NUMSUB <channels>:用来统计有多少客户端在订阅指定的频道,如果不加channels,则统计所有的channel
    • PUBSUB NUMPAT:用来统计有多少客户端在订阅模式(既pubsub_patternssize

    事务

    Redis 通过MULTIEXECWATCHDISCARD命令来实现事物的功能。一个事务会将多个命令打包,一次性,顺序的执行这些命令,且中间不会执行其他客户端请求的命令。

    事务的实现

    一个事务从开始到结束一般分为三个阶段:

    • 事务开始
    • 命令入队
    • 事物执行/丢弃
    事务开始

    一个事务的开始是通过MULTI命令来实现的,当客户端请求MULTI命令,那么服务器会打开该客户端的flags属性中的REDIS_MULTI标识,表示该客户端由非事务状态切换为事务状态。

    命令入队

    当事务状态中的客户端向服务器发送命令时,如果发送的命令不为WATCHMUTLIEXECDISCARD,那么服务器会将命令入队,并向客户端放回QUEUED

    redisClient结构中,有一个multiState,其结构定义如下:

    typedef struct multiState {
        //multiCmd数组
        multiCmd *commands;     /* Array of MULTI commands */
        //命令数量
        int count;              /* Total number of MULTI commands */
        int cmd_flags;          /* The accumulated command flags OR-ed together.
                                   So if at least a command has a given flag, it
                                   will be set in this field. */
        int minreplicas;        /* MINREPLICAS for synchronous replication */
        time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
    } multiState;
    

    其中multiCmd结构如下:

    typedef struct multiCmd {
        //用来保存命令参数
        robj **argv;
        //参数个数
        int argc;
        //指向对应的命令函数
        struct redisCommand *cmd;
    } multiCmd;  
    

    假设有一个客户端在发送了如下命令:

    192.168.1.102:6379> MULTI
    OK
    192.168.1.102:6379> SET TEST HAHAH
    QUEUED
    192.168.1.102:6379> GET TEST
    QUEUED
    192.168.1.102:6379> SET TEST HEIHEIHEI
    QUEUED
    192.168.1.102:6379>
    

    那么该客户端对应的结构显示为:

    事务结束

    当客户端向服务器发送DISCARD命令时,服务器会清楚客户端的事务状态,并且丢掉任务队列中入队的命令。

    执行事务

    当处于事务的客户端向服务器发送EXEC命令时,EXEC会立即被服务器执行,并且服务器会遍历命令队列中的命令,依次执行,然后将全部结果返回给客户端。

    例如,针对上述的命令输入,当客户端执行了EXEC后,获得的回复如下:

    192.168.1.102:6379> EXEC
    1) OK
    2) "HAHAH"
    3) OK
    192.168.1.102:6379>
    
    WATCH命令

    WATCH命令可以在执行EXEC命令之前,监视某些数据库键,如果在EXEC命令执行时,被监视的键被其他客户端修改,那么服务器将拒绝事务的执行。

    WATCH命令的实现

    当客户端提交WATCH命令时,客户端会将监视的键和客户端保存在redisDbwatched_keys中,watched_keys是一个字典,其中键表示被监视的数据库键,而值则是一个链表,其中的每个节点都指向监视该键的客户端。
    当服务器在执行完某些修改数据库的命令后,不如SETSADD等,会触发一次multi.c/touchWatchKey函数,该函数会在数据库中watched_keys中查询对应的键是否存在,如果存在,则修改对应链表中的客户端,打开REDIS_DIRTY_CAS标志
    当服务器收到一个客户端发的EXEC命令时,会先检查客户端的REDIS_DIRTY_CAS标志是否打开。如果打开则拒绝执行事务。

    Redis事务的ACID性质

    传统的数据库中,用 ACID 表示事务的可靠性和安全性。ACID是指原子性(Atomicity),一致性(Consistency),隔离性(lsolcation)和耐久性(Durablity)。

    原子性:

    是指事务要么全部执行,要么全部不执行。由于 Redis 事务会将命令打包,统一执行,因此Redis事物具有原子性。

    一致性:

    是指数据库在执行数据前后,数据库并不会存在非法或是错误的数据。
    Redis 通过入队命令检测拦截非法命令,在执行时,即使遇到错误命令,也会继续执行之后的命令。

    隔离性:

    是指事务执行过程中,其他事务或操作的执行不会互相收到影响。由于 Redis 通过单线程执行命令,因此保证了事务与事务执行的顺序一定是串行的,由此确保了隔离性。

    耐久性

    是指事物的执行结构能够得到保存。Redis 事务是否具备耐久性和持久化策略相关。只有在开启AOF模式,并且appendfsync值为true时,才具备耐久性。


    Lua脚本

    Redis客户端可以使用Lua脚本原子的执行多个任务(比如,用在分布式锁原子性的释放上)。

    Lua环境的创建过程

    在 Redis 服务器启动的过程中,initServer方法会调用scriptingInit方法。
    scripting.c/scriptingInit的代码如下:

    void scriptingInit(void) {
        //创建Lua环境
        lua_State *lua = lua_open();
        
        //载入函数库,移除其中不支持的函数
        luaLoadLibraries(lua);
        luaRemoveUnsupportedFunctions(lua);
    
        
        //创建lua_scripts字典,用来保存执行或载入过的脚本
        server.lua_scripts = dictCreate(&shaScriptObjectDictType,NULL);
    
        //创建全局表格,并添加函数
        lua_newtable(lua);
    
        /* redis.call */
        lua_pushstring(lua,"call");
        lua_pushcfunction(lua,luaRedisCallCommand);
        lua_settable(lua,-3);
    
        /* redis.pcall */
        lua_pushstring(lua,"pcall");
        lua_pushcfunction(lua,luaRedisPCallCommand);
        lua_settable(lua,-3);
    
        /* redis.log and log levels. */
        lua_pushstring(lua,"log");
        lua_pushcfunction(lua,luaLogCommand);
        lua_settable(lua,-3);
    
        lua_pushstring(lua,"LOG_DEBUG");
        lua_pushnumber(lua,REDIS_DEBUG);
        lua_settable(lua,-3);
    
        lua_pushstring(lua,"LOG_VERBOSE");
        lua_pushnumber(lua,REDIS_VERBOSE);
        lua_settable(lua,-3);
    
        lua_pushstring(lua,"LOG_NOTICE");
        lua_pushnumber(lua,REDIS_NOTICE);
        lua_settable(lua,-3);
    
        lua_pushstring(lua,"LOG_WARNING");
        lua_pushnumber(lua,REDIS_WARNING);
        lua_settable(lua,-3);
    
        /* redis.sha1hex */
        lua_pushstring(lua, "sha1hex");
        lua_pushcfunction(lua, luaRedisSha1hexCommand);
        lua_settable(lua, -3);
    
        /* redis.error_reply and redis.status_reply */
        lua_pushstring(lua, "error_reply");
        lua_pushcfunction(lua, luaRedisErrorReplyCommand);
        lua_settable(lua, -3);
        lua_pushstring(lua, "status_reply");
        lua_pushcfunction(lua, luaRedisStatusReplyCommand);
        lua_settable(lua, -3);
    
        /* Finally set the table as 'redis' global var. */
        lua_setglobal(lua,"redis");
    
        //替换部分函数
        lua_getglobal(lua,"math");
    
        lua_pushstring(lua,"random");
        lua_pushcfunction(lua,redis_math_random);
        lua_settable(lua,-3);
    
        lua_pushstring(lua,"randomseed");
        lua_pushcfunction(lua,redis_math_randomseed);
        lua_settable(lua,-3);
    
        lua_setglobal(lua,"math");
    
        //创建辅助函数
        {
            char *compare_func =    "function __redis__compare_helper(a,b)
    "
                                    "  if a == false then a = '' end
    "
                                    "  if b == false then b = '' end
    "
                                    "  return a<b
    "
                                    "end
    ";
            luaL_loadbuffer(lua,compare_func,strlen(compare_func),"@cmp_func_def");
            lua_pcall(lua,0,0,0);
        }
    
        
        {
            char *errh_func =       "function __redis__err__handler(err)
    "
                                    "  local i = debug.getinfo(2,'nSl')
    "
                                    "  if i and i.what == 'C' then
    "
                                    "    i = debug.getinfo(3,'nSl')
    "
                                    "  end
    "
                                    "  if i then
    "
                                    "    return i.source .. ':' .. i.currentline .. ': ' .. err
    "
                                    "  else
    "
                                    "    return err
    "
                                    "  end
    "
                                    "end
    ";
            luaL_loadbuffer(lua,errh_func,strlen(errh_func),"@err_handler_def");
            lua_pcall(lua,0,0,0);
        }
    
        //创建伪客户端
        if (server.lua_client == NULL) {
            server.lua_client = createClient(-1);
            server.lua_client->flags |= REDIS_LUA_CLIENT;
        }
    
        //全局变量保护
        scriptingEnableGlobalsProtection(lua);
        
        //将lua环境变量保存到服务器中
        server.lua = lua;
    }
    

    对应上述过程总结如下:

    1. 创建 Lua 环境(lua_open)
    2. 载入 Lua 的函数库
    3. 在服务器中创建lua_scripts字典,其中键为脚本的 SHA1 值,值为执行或载入过的 Lua 脚本
    4. 创建全局表格用来保存基本函数(如callpcall等,可以通过 Lua 执行 Redis 的命令)
    5. 替换 Lua 的随机函数(保持数据库的一致性)
    6. 创建辅助函数
    7. 创建执行Redis命令的伪客户端
    8. 设置全局变量保护(避免执行脚本时,攻击全局变量)
    9. 将Lua环境保存到redisServer
    环境协作组件
    伪客户端

    Lua 环境中创建了伪客户端(没有TCP连接的客户端,和启动时载入AOF文件的客户端相似),用来执行Redis命令。

    当Lua调用redis.call或是redis.pcall函数时,函数中需要执行的 Redis 命令将传给伪客户端,伪客户端又将命令交给命令执行器执行,并向 Lua 环境返回结果。

    lua_scripts 字典

    redisServer.lua_scripts字典是用来保存该服务器执行或是载入过的Lua脚本的。其中键是脚本的SHA1校验和,而值是Lua脚本。

    EVAL命令的实现
    EVAL命令格式
    EVAL script numskey <key> <key...> <arg> <arg...> 
    

    其中script是我们要执行的脚本,numskey表示键名参数的个数,key表示键名参数,arg表示附加参数,这些参数都可以在script中通过KEYS[]ARGV[]被引用(其中基准下标为1)。
    例如我们执行如下Lua脚本等同于执行了SET HELLO WORLD命令

    192.168.1.102:6379> EVAL "return redis.call('SET', KEYS[1], ARGV[1])" 1 HELLO WORLD
    OK
    
    EVAL命令执行过程
    • 定义脚本函数:服务器为客户端发送的脚本创建一个对应的函数,其中函数名为f_开头,并加上脚本的SHA1校验和,函数体就是脚本本身。
    • 将脚本保存至lua_scripts字典
    • 执行脚本函数
    EVALSHA命令的实现

    EVALSHA命令就是通过SHA1值从lua_scripts查找对应的脚本是否存在,如果存在,则执行通过f_SHA1校验和确认函数名,直接执行函数。

    脚本管理命令
    SCRIPT FLUSH

    可以清楚lua_scripts字典保存的脚本,并重新创建 lua 环境。

    SCRIPT EXISTS

    通过脚本的SHA1校验和确认脚本是否存在于服务器

    SCRIPT LOAD

    上传脚本,但是不执行(只进行EVAL过程的前两步)

    SCRIPT KILL

    当脚本处理超时时,可以通过该命令关闭脚本

    脚本复制

    当服务器处于复制模式下时,具有写性质的脚本命令(EVALEVALSHASCRIPT FLUSHSCRIPT )也需要被复制到从服务器。

    EVALSCRIPT FLUSHSCRIPT LOAD的复制

    这三种命令复制不会存在主从服务器执行结果不一致的情况,因此复制时只需要简单的命令传播即可实现。

    EVALSHA命令

    由于可能存在主从服务器lua_scrips中保存的脚本不一致的问题,会发生主服务器执行EVALSHA时,脚本确实存在,而从服务却不存在,EVALSHA执行失败,导致主从数据不一致的问题。

    例如,主服务器A先执行了一个EVAL LOAD命令,载入了一个脚本,而后,服务器B上线,并成为A的从服务器。 此时A在执行EVALSHA命令,运行刚载入的脚本,并将命令传播给从服务器B,由于B不存在该脚本,EVALSHA命令就会执行失败。

    为了避免这种情况,redisServer服务器会通过repl_scriptcache_dict字典保存已经复制给全部从服务器的命令(也就是说,当出现一个新的从服务器,字典需要清空),其中键为脚本的SHA1,而值为NULL。当执行EVALSHA命令的复制过程时,如果repl_scriptcache_dict中可以找到该脚本,那么直接命令传播,如果找不到,那么服务器将根据lua_scripts中脚本的内容,将EVALSHA转换成等价的EVAL命令,再传播,并添加到repl_scriptcache_dict字典中。


    排序

    Redis 通过SORT命令实现对给定列表,集合,有序集合key中的元素进行排序的功能。
    排序默认以数组为权重,值被解释为双精度浮点数,然后进行排序。

    SORT命令格式如下:

    SORT key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern ...]] [ASC | DESC] [ALPHA] [STORE destination]
    

    上述命令中小写的均为参数,每个[]表示一个选项,[]内大写的部分为具体选项,后面小写的部分为选项参数,GET选项可以同时使用多个。

    验证适合用SORT命令的数据类型

    在客户端中做如下测试:

    192.168.1.102:6379[15]> set stringtest 1
    OK
    192.168.1.102:6379[15]> sort stringtest
    (error) WRONGTYPE Operation against a key holding the wrong kind of value
    192.168.1.102:6379[15]> lpush listtest 1 2 3
    (integer) 3
    192.168.1.102:6379[15]> sort listtest
    1) "1"
    2) "2"
    3) "3"
    192.168.1.102:6379[15]> hmset hashtest key1 1 key2 2 key3 3
    OK
    192.168.1.102:6379[15]> sort hashtest
    (error) WRONGTYPE Operation against a key holding the wrong kind of value
    192.168.1.102:6379[15]> sadd settest 1 2 3
    (integer) 3
    192.168.1.102:6379[15]> sort settest
    1) "1"
    2) "2"
    3) "3"
    192.168.1.102:6379[15]> zadd zsettest 1 1 2 2 3 3
    (integer) 3
    192.168.1.102:6379[15]> sort zsettest
    1) "1"
    2) "2"
    3) "3"
    

    可以看到,字符串和哈希表无法使用SORT命令,而链表,集合和有序集合都可以使用SORT命令。

    各选项的说明
    ALPHA

    ALPHA选项可以让SORT命令从默认的以数字为权重的排序改成以字母为权重。

    假设有一个保存字符串的链表:

    192.168.1.102:6379[15]> lpush alphatest a b c
    (integer) 3
    192.168.1.102:6379[15]>
    

    当我们不添加ALPHA选项直接执行SORT命令时,会报错:

    192.168.1.102:6379[15]> sort alphatest
    (error) ERR One or more scores can't be converted into double
    192.168.1.102:6379[15]>
    

    说明SORT命令默认以双精度浮点数做权重进行排序,针对不能转换为双精度的浮点数的值,会运行出错。

    当我们带上ALPHA选项后在运行SORT命令:

    192.168.1.102:6379[15]> sort alphatest ALPHA
    1) "a"
    2) "b"
    3) "c"
    

    命令可以正常执行,且是以字母顺序排序。

    LIMIT

    LIMIT选项可以控制排序后的结果输出个数,接受offsetcount两个参数,其中offset表示跳过几个结果,count表示输出几个结果。

    同样以上面的例子,我们增加LIMIT选项,让其输出第二和第三个结果:

    192.168.1.102:6379[15]> sort alphatest ALPHA LIMIT 1 2
    1) "b"
    2) "c"
    
    ASC | DESC

    ASC表示以升序排列,DESC表示以降序排列,SORT命令默认以升序排列,当需要降序结果时,可以添加DESC选项。

    上面的例子再增加DESC选项后,观察下输出的结果:

    192.168.1.102:6379[15]> sort alphatest ALPHA LIMIT 1 2 DESC
    1) "b"
    2) "a"
    

    从输出结果中,可以确认DESC能够降序输出,而且LIMIT是在排序完之后再控制输出个数。

    BY

    BY选项可以外部的KEY作为权重,代替默认以键值为权重的排序方式。

    首先我们增加一些键值对作为辅助的排序权重:

    192.168.1.102:6379[15]> mset a_weight 1 b_weight 2 c_weight 3
    OK
    

    然后针对一开始的链表,我们增加BY选项,在进行排序:

    192.168.1.102:6379[15]> sort alphatest BY *_weight
    1) "a"
    2) "b"
    3) "c"
    

    可以发现,虽然我们并没有加ALPHA选项,但是通过BY选项,我们实际的权重是*_weight的键值,能够被正常转为双精度浮点型,因此也可以正常排序。

    GET选项

    GET选项是通过排序结果在去查询键值。
    比如我们继续增加一些键值对:

    192.168.1.102:6379[15]> mset a_toUpperSize A b_toUpperSize B c_toUpperSize C
    OK
    

    然后通过GET选项将输出的结果,作为键去查询:

    192.168.1.102:6379[15]> sort alphatest ALPHA GET *_toUpperSize
    1) "A"
    2) "B"
    3) "C"
    

    再做个测试,假设此时我们删除了c_toUpperSize,然后再去SORTGET,会发生什么?

    192.168.1.102:6379[15]> del c_toUpperSize
    (integer) 1
    192.168.1.102:6379[15]> sort alphatest ALPHA GET *_toUpperSize
    1) "A"
    2) "B"
    3) (nil)
    

    发现c对应输出结果变成了nil,结果等价于直接mget三个 key:

    192.168.1.102:6379[15]> mget a_toUpperSize b_toUpperSize c_toUpperSize
    1) "A"
    2) "B"
    3) (nil)
    
    STORE

    STORE选项可以将排序后的结果集存在一个新的键中。

    192.168.1.102:6379[15]> sort alphatest ALPHA STORE sorted_result
    (integer) 3
    192.168.1.102:6379[15]> TYPE sorted_result
    list
    192.168.1.102:6379[15]> lrange sorted_result 0 -1
    1) "a"
    2) "b"
    3) "c"
    

    可以发现结果集被存在了新的键中。而且键的类型是链表。

    SORT命令的实现

    SORT命令相关的数据结构是redis.h/_redisSortObject

    typedef struct _redisSortObject {
        //*obj指针指向被排序的数据库键
        robj *obj;
        //u用来记录分值,默认用score记录双精度浮点型,而使用BY或是ALPHA的情况下会使用cmpobj
        union {
            double score;
            robj *cmpobj;
        } u;
    } redisSortObject;
    

    SORT执行步骤:

    1. 当服务器执行SORT命令时,首先会根据被执行的数据库键的大小创建一个同等长度的_redisSortObject数组,然后遍历数组,将*obj指针指向数据库键中的每一项,
    2. 然后根据SORT命令的选项,确定u.score或是u.cmpobj。在根据u进行快速排序。
    3. 根据LIMITASC|DESC选项确定输出数组中哪些结果。
    4. 如果还有GET选项,服务器将根据排序的结果去数据库中查找对应数据库键。
    5. 如果存在STORE选项,则保存结果集。

    二进制位数组

    位数组的表示

    Redis 中使用SDS对象表示数组,其中sdshdr.len表示保存了几个字节长度的数组(最后一个字节是)。

    GETBIT命令

    GETBIT命令用来获取某个二进制位数组中指定位的二进制值。

    SETBIT命令

    SETBIT命令用来设置某个二进制位数组中指定位的二进制值。

    BITCOUNT命令

    BITCOUNT命令用来统计二进制位数组总一共存在多少个1的位。实现方式参考查表法和汉明重量。

    BITTOP命令

    BITTOP命令可以用来对多个二进制位数组计算按位与,按位或,按位异或运算。或者可以对某个二进制进行取反。


    慢日志查询

    Redis 慢日志查询功能用来记录执行超过给定时长的命令请求,命令请求会被保存在一个链表中。有两个相关的配置:slowlog-log-slower-than(时长阈值)和slow-log-max-len(链表长度,先进先出)。

    SLOWLOG GET命令可以获取保存在服务器上的慢查询日志。


    监视器

    普通Redis客户端可以通过发送MONITOR命令,成为服务器的监视器,当服务器在收到命令后,会向监视器发送命令信息。

  • 相关阅读:
    事件循环(Event Loop)promise、setTimeout、async的先后执行顺序
    计算机网络方面
    深拷贝与浅拷贝
    从输入url到页面加载发生了什么?
    手写jQuery插件
    vue与微信小程序的区别
    Webpack打包工具
    SpringCloud Feign的分析
    SpringCloud Ribbon的分析(二)
    SpringCloud Ribbon的分析
  • 原文地址:https://www.cnblogs.com/insaneXs/p/11913786.html
Copyright © 2011-2022 走看看