今天我是看完了t_list和t_string的代码,从名字知道,这也是和之前的t_hash非常类似的,无非就是各种形式,转化来转化去的。先讲讲t_list,对比于昨天的t_hash,t_hash是ziplist和dict之间的转换,t_list则是描述的是ziplist压缩表和linedlist普通链表,换句话说,当client这个robj作为参数传入的时候,都分为ENCODING_ZIIPLIST和ENCODING_LINKEDLIST 2类编码方式处理。还有一个在t_list出现的一个 比较新颖的东西是出现了锁的概念,操作系统的东西在这里也会碰到了,这里的代码也非常值得学习。下面,我们看看一般的t_list中包括的一些方法和命令:
/* List方法 */
/* list的操作分为2种,LINKEDLIST普通链表和ZIPLIST压缩列表的相关操作 */
void listTypeTryConversion(robj *subject, robj *value) /* 判断是否需要将ziplist转为linkedlist */
void listTypePush(robj *subject, robj *value, int where) /* 在头部或尾部插入value元素 */
robj *listTypePop(robj *subject, int where) /* 在列表的头部或尾弹出元素 */
unsigned long listTypeLength(robj *subject) /* 列表的长度 */
listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) /* 返回列表迭代器,方向有头尾之分 */
void listTypeReleaseIterator(listTypeIterator *li) /* 释放列表迭代器 */
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) /* 根据列表迭代器,获取下一个元素 */
robj *listTypeGet(listTypeEntry *entry) /* 获取listType元素,有ziplist和linkedlist */
void listTypeInsert(listTypeEntry *entry, robj *value, int where) /* listType了类型插入元素操作 */
int listTypeEqual(listTypeEntry *entry, robj *o) /* 判断2个元素是否相等 */
void listTypeDelete(listTypeEntry *entry) /* listType类型删除元素 */
void listTypeConvert(robj *subject, int enc) /* listType类型的转换操作,这里指的是往linkedList上转 */
/* List的相关命令 */
void pushGenericCommand(redisClient *c, int where) /* 插入操作命令的原始操作 */
void lpushCommand(redisClient *c) /* 左边插入元素命令 */
void rpushCommand(redisClient *c) /* 右边插入元素命令 */
void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) /* 有返回状态的插入操作命令,假设首先都是能够实现插入命令的 */
void lpushxCommand(redisClient *c) /* 左边插入元素有返回消息的命令 */
void rpushxCommand(redisClient *c) /* 右边插入元素有返回消息的命令 */
void linsertCommand(redisClient *c) /* 列表指定位置插入元素操作命令 */
void llenCommand(redisClient *c) /* 列表返回长度命令 */
void lindexCommand(redisClient *c) /* 获取index位置的上的元素 */
void lsetCommand(redisClient *c) /* listType类型设置value命令 */
void popGenericCommand(redisClient *c, int where) /* 实现弹出操作的原始命令 */
void lpopCommand(redisClient *c) /* 左边弹出元素命令 */
void rpopCommand(redisClient *c) /* 右边弹出操作命令 */
void lrangeCommand(redisClient *c) /* 移动listType位置操作 */
void ltrimCommand(redisClient *c) /* listType实现截取操作,把多余范围的元素删除 */
void lremCommand(redisClient *c) /* 移除在listType中出现的与指定的元素相等的元素 */
void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) /* 元素从一个obj右边弹出,在从左侧推入另一个obj列表操作 */
void rpoplpushCommand(redisClient *c) /* 右边弹出,左边推入元素的命令 */
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) >/* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */
void unblockClientWaitingData(redisClient *c) /* 客户端解锁操作 */
void signalListAsReady(redisDb *db, robj *key) /* 将key存入server中,后续可以用于客户端的存取 */
int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where) /* 根据server,Client的key,value情况,判断server此时能否服务于Client,否则Client将被阻塞 */
void handleClientsBlockedOnLists(void) /* 服务端解除阻塞住的Client */
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) /* 获取超时时间 */
void blockingPopGenericCommand(redisClient *c, int where) /* 阻塞弹出命令的原始操作 */
void blpopCommand(redisClient *c) /* 左边弹出数据的阻塞式命令 */
void brpopCommand(redisClient *c) /* 右边弹出数据的阻塞式命令 */
void brpoplpushCommand(redisClient *c) /* 弹出推入阻塞式命令 */
看到这么多API,是否有被吓到的感觉,但里面其实很多的方法都很简单,我只挑几个比较典型的方法做一下分析,向什么返回长度了,获取,设置键值的大家可以自己看具体的方法,不难的。一般的t_list的处理流程,比如获取迭代器的方法:
/* Stores pointer to current the entry in the provided entry structure
* and advances the position of the iterator. Returns 1 when the current
* entry is in fact an entry, 0 otherwise. */
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
/* Protect from converting when iterating */
redisAssert(li->subject->encoding == li->encoding);
entry->li = li;
if (li->encoding == REDIS_ENCODING_ZIPLIST) {
entry->zi = li->zi;
if (entry->zi != NULL) {
if (li->direction == REDIS_TAIL)
//根据方向调用pre或next的方法
li->zi = ziplistNext(li->subject->ptr,li->zi);
else
li->zi = ziplistPrev(li->subject->ptr,li->zi);
return 1;
}
} else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
entry->ln = li->ln;
if (entry->ln != NULL) {
if (li->direction == REDIS_TAIL)
//普通的列表的调用方式同上
li->ln = li->ln->next;
else
li->ln = li->ln->prev;
return 1;
}
} else {
redisPanic("Unknown list encoding");
}
return 0;
}
很多API和t_hash的方法都是极其类似的,我就不多说了,我们将关注点,放在列表的锁控制上,先讲个前提,在redis客户端执行操作的时候,会有个请求超时时间,在这个请求的时间内,客户端如果没有找到key对应的数据,是会被阻塞的,什么意思呢,比如:
/* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */
/* 这个客户端阻塞的意思:当客户端请求list中某个特定key值时,如果key存在且列表非空,当然 */
/* 当然不会阻塞,正常返回数据,如果当客户端请求某个key不存在,或列表为empty的时候,客户端将被阻塞 */
/* 只有当有这个key被写入的时候,客户端才会被解锁 *
这个其实就是操作系统中的PV操作类似的原理,跟java里的wait(),signal()方法的模型是一样的,他的实现代码如下:
/* This is how the current blocking POP works, we use BLPOP as example:
* - If the user calls BLPOP and the key exists and contains a non empty list
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
* if blocking is not required.
* - If instead BLPOP is called and the key does not exists or the list is
* empty we need to block. In order to do so we remove the notification for
* new data to read in the client socket (so that we'll not serve new
* requests if the blocking request is not served). Also we put the client
* in a dictionary (db->blocking_keys) mapping keys to a list of clients
* blocking for this keys.
* - If a PUSH operation against a key with blocked clients waiting is
* performed, we mark this key as "ready", and after the current command,
* MULTI/EXEC block, or script, is executed, we serve all the clients waiting
* for this list, from the one that blocked first, to the last, accordingly
* to the number of elements we have in the ready list.
*/
/* Set a client in blocking mode for the specified key, with the specified
* timeout */
/* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */
/* 这个客户端阻塞的意思:当客户端请求list中某个特定key值时,如果key存在且列表非空,当然 */
/* 当然不会阻塞,正常返回数据,如果当客户端请求某个key不存在,或列表为empty的时候,客户端将被阻塞 */
/* 只有当有这个key被写入的时候,客户端才会被解锁 */
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
dictEntry *de;
list *l;
int j;
//设置超时时间
c->bpop.timeout = timeout;
c->bpop.target = target;
if (target != NULL) incrRefCount(target);
for (j = 0; j < numkeys; j++) {
//下面为为找到的key上锁
/* If the key already exists in the dict ignore it. */
//如果此时,某些key已经存在了,直接忽略
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
//根据key找到对于请求该key的客户端,也就是将要被阻塞的Client
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
//为c客户端添加一个阻塞的key
retval = dictAdd(c->db->blocking_keys,keys[j],l);
//增加key的引用次数
incrRefCount(keys[j]);
redisAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
listAddNodeTail(l,c);
}
/* Mark the client as a blocked client */
/* 标记Client为阻塞的客户端 */
c->flags |= REDIS_BLOCKED;
//服务端的阻塞客户端计数递增
server.bpop_blocked_clients++;
}
所有的key未来都会存在于server.readykeys里面,客户端判断自己所请求的key是否存在于服务端中的readykeys中,如果不存在就会被阻塞了。将key存入服务单的数据库的操作:
/* If the specified key has clients blocked waiting for list pushes, this
* function will put the key reference into the server.ready_keys list.
* Note that db->ready_keys is a hash table that allows us to avoid putting
* the same key again and again in the list in case of multiple pushes
* made by a script or in the context of MULTI/EXEC.
*
* The list will be finally processed by handleClientsBlockedOnLists() */
/* 将key存入server中,后续可以用于客户端的存取 */
void signalListAsReady(redisDb *db, robj *key) {
readyList *rl;
/* No clients blocking for this key? No need to queue it. */
/* 如果没有客户端为此key所阻塞的,直接不添加 */
if (dictFind(db->blocking_keys,key) == NULL) return;
/* Key was already signaled? No need to queue it again. */
/* 如果key已经存在,不不要重复添加 */
if (dictFind(db->ready_keys,key) != NULL) return;
/* Ok, we need to queue this key into server.ready_keys. */
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = db;
incrRefCount(key);
//添加到list列表尾部
listAddNodeTail(server.ready_keys,rl);
/* We also add the key in the db->ready_keys dictionary in order
* to avoid adding it multiple times into a list with a simple O(1)
* check. */
incrRefCount(key);
redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
其实这里我不免会有个小问题,为什么list出现Client被阻塞的操作,而在t_hash中不会出现类似的操作呢?略疑惑。
下面说说t_string的用法,t_string里面没有涉及什么特定的结构体,就是最直接的键值对的设置,直接改变数据库中的键值,先列出里面的主要方法:
/* 方法 API */
static int checkStringLength(redisClient *c, long long size) /* 检查字符串长度是否超出限制最大长度,512*1024*1024个字节长度 */
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) /* 设置的泛型指令操作 */
void setCommand(redisClient *c) /* 设置的综合方法,设置都是根据Client中的参数而定 */
void setnxCommand(redisClient *c) /* key不存在的时候才设置值,flag为REDIS_SET_NX时 */
void setexCommand(redisClient *c) /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为秒 */
void psetexCommand(redisClient *c) /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为毫秒 */
int getGenericCommand(redisClient *c) /* 获取命令的泛型命令 */
void getCommand(redisClient *c) /* 获取命令的操作 */
void getsetCommand(redisClient *c) /* 获取set命令的操作 */
void setrangeCommand(redisClient *c) /* 设置rangge命令的操作 */
void getrangeCommand(redisClient *c) /* 获取range命令的操作 */
void mgetCommand(redisClient *c) /* 执行多次getCommand指令 */
void msetGenericCommand(redisClient *c, int nx) /* 一次运行多个设置操作泛型命令 */
void msetCommand(redisClient *c) /* 多次设置指令的非NX模式 */
void msetnxCommand(redisClient *c) /* 多次设置指令的NX模式,即只在不存在的时候才设置 */
void incrDecrCommand(redisClient *c, long long incr) /* 增加或减少固定值的操作,减少其实就是增加-1 */
void incrCommand(redisClient *c) /* 递增1操作,incr递增设置为1 */
void decrCommand(redisClient *c) /* 递减1操作,incr递增设置为-1 */
void incrbyCommand(redisClient *c) /* 增加指令,从Client中获取递增值 */
void decrbyCommand(redisClient *c) /* 减少指令,从Client中获取减少值 */
void incrbyfloatCommand(redisClient *c) /* 增加float类型值的操作命令,在获取原始值的时候,获取的方法不一样,为getLongDoubleFromObjectOrReply*/
void appendCommand(redisClient *c) /* 追加命令操作,其实也是key:value的形式 */
void strlenCommand(redisClient *c) /* 获取命令长度 */
里面都是一些键值对的简单操作,有一个不同点是,里面的设置操作分为3种:
/* SET操作的一些FLAG */
#define REDIS_SET_NO_FLAGS 0
#define REDIS_SET_NX (1<<0) /* Set if key not exists. */
#define REDIS_SET_XX (1<<1) /* Set if key exists. */
如上面所说,nx模式指的是key不存在的时候才设置值,xx为存在的时候设置,这种模式设置有命令到期时间的限制,分为单位为秒级和毫秒级,而nx模式下,没有时间上的限制,调用的泛型方法:
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply)
注意其中的expire,和时间单位unit:
/* key不存在的时候才设置值,flag为REDIS_SET_NX时 */
void setnxCommand(redisClient *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,REDIS_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero);
}
/* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为秒 */
void setexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL);
}
/* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为毫秒 */
void psetexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL);
}
nx模式时expire和unit参数分别为NULL,0,所以我做出了如下猜想,可能是因为,当键值对存在的时候,考虑多并发设置的情况,有些Client可能被阻塞,所以有超时时间的存在,而key不存在的时候,就看谁更快了吧,就是直接设置。下面还有一个这个文件比较有意思的方法,有点批处理的味道,换句话说,一个方法里执行多次set操作:
/* 一次运行多个设置操作泛型命令 */
void msetGenericCommand(redisClient *c, int nx) {
int j, busykeys = 0;
/* 设置操作一定是成对出现的 */
if ((c->argc % 2) == 0) {
addReplyError(c,"wrong number of arguments for MSET");
return;
}
/* Handle the NX flag. The MSETNX semantic is to return zero and don't
* set nothing at all if at least one already key exists. */
if (nx) {
for (j = 1; j < c->argc; j += 2) {
if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
busykeys++;
}
}
//如果key存在,返回0
if (busykeys) {
addReply(c, shared.czero);
return;
}
}
//隔2个执行设置key操作指令一次
for (j = 1; j < c->argc; j += 2) {
c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
setKey(c->db,c->argv[j],c->argv[j+1]);
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[j],c->db->id);
}
server.dirty += (c->argc-1)/2;
addReply(c, nx ? shared.cone : shared.ok);
}
也不是什么特别的操作,应该redis编写者为了方便效率的提高吧。其他的一些方法,请读者可以学习t_string.c的文件,还有提醒一点,在redis数据库中,存储的形式都是K-V形式的,所有的获取,设置都是通过KEY的形式操作,你会看到很多这样的方法:
dbAdd(c->db,c->argv[1],new);
setKey(c->db,c->argv[j],c->argv[j+1]);