今天学习了Redis中比較高大上的名词,“公布订阅模式”。公布订阅模式这个词在我最開始接触听说的时候是在JMS(Java Message Service)java消息服务中听说的。这个名次用通俗的一点话说。就是我订阅了这类消息,当仅仅有这类的消息进行广播发送的时候。我才会。其它的消息直接过滤,保证了一个高效的传输效率。以下切入正题。学习一下Redis是怎样实现这个公布订阅模式的。先看看里面的简单的API构造;
/*----------------------------------------------------------------------------- * Pubsub low level API *----------------------------------------------------------------------------*/ void freePubsubPattern(void *p) /* 释放公布订阅的模式 */ int listMatchPubsubPattern(void *a, void *b) /* 公布订阅模式是否匹配 */ int clientSubscriptionsCount(redisClient *c) /* 返回client的所订阅的数量,包含channels + patterns管道和模式 */ int pubsubSubscribeChannel(redisClient *c, robj *channel) /* Client订阅一个Channel管道 */ int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) /* 取消订阅Client中的Channel */ int pubsubSubscribePattern(redisClient *c, robj *pattern) /* Clientclient订阅一种模式 */ int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) /* Clientclient取消订阅pattern模式 */ int pubsubUnsubscribeAllChannels(redisClient *c, int notify) /* client取消自身订阅的全部Channel */ int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) /* client取消订阅全部的pattern模式 */ int pubsubPublishMessage(robj *channel, robj *message) /* 为全部订阅了Channel的Client发送消息message */ /* ------------PUB/SUB API ---------------- */ void subscribeCommand(redisClient *c) /* 订阅Channel的命令 */ void unsubscribeCommand(redisClient *c) /* 取消订阅Channel的命令 */ void psubscribeCommand(redisClient *c) /* 订阅模式命令 */ void punsubscribeCommand(redisClient *c) /* 取消订阅模式命令 */ void publishCommand(redisClient *c) /* 公布消息命令 */ void pubsubCommand(redisClient *c) /* 公布订阅命令 */在这里面出现了高频的词Pattern(模式)和Channel(频道,叫管道比較别扭)。也就是说,兴许全部的关于公布订阅的东东都是基于这2者展开进行的。如今大致解说一下在Redis中是怎样实现此中模式的:
1.在RedisClient 内部维护了一个pubsub_channels的Channel列表。记录了此client所订阅的频道
2.在Server服务端。相同维护着一个类似的变量叫做,pubsub_channels,这是一个dict字典变量,每个Channel相应着一批订阅了此频道的Client,也就是Channel-->list of Clients
3.当一个Client publish一个message的时候。会先去服务端的pubsub_channels找对应的Channel,遍历里面的Client。然后发送通知,即完毕了整个公布订阅模式。
我们能够简单的看一下Redis订阅一个Channel的方法实现;
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ /* Client订阅一个Channel管道 */ int pubsubSubscribeChannel(redisClient *c, robj *channel) { struct dictEntry *de; list *clients = NULL; int retval = 0; /* Add the channel to the client -> channels hash table */ //在Client的字典pubsub_channels中加入Channel if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ //加入Clietn到server中的pubsub_channels,相应的列表中 de = dictFind(server.pubsub_channels,channel); if (de == NULL) { //假设此频道的Client列表为空,则创建新列表并加入 clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { //否则,获取这个频道的客户端列表。在尾部加入新的客户端 clients = dictGetVal(de); } listAddNodeTail(clients,c); } /* Notify the client */ //加入给回复客户端 addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,clientSubscriptionsCount(c)); return retval; }加入操作主要分2部,Client自身的内部维护的pubsub_channels的加入。是一个dict字典对象,然后,是server端维护的pubsub_channels中的client列表的加入。在进行Channel频道的删除的时候,也是运行的这2步骤操作:
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or * 0 if the client was not subscribed to the specified channel. */ /* 取消订阅Client中的Channel */ int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { struct dictEntry *de; list *clients; listNode *ln; int retval = 0; /* Remove the channel from the client -> channels hash table */ incrRefCount(channel); /* channel may be just a pointer to the same object we have in the hash tables. Protect it... */ //字典删除Client中pubsub_channels中的Channel if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { retval = 1; /* Remove the client from the channel -> clients list hash table */ //再移除Channel相应的Client列表 de = dictFind(server.pubsub_channels,channel); redisAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); redisAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); if (listLength(clients) == 0) { /* Free the list and associated hash entry at all if this was * the latest client, so that it will be possible to abuse * Redis PUBSUB creating millions of channels. */ dictDelete(server.pubsub_channels,channel); } } /* Notify the client */ if (notify) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.unsubscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns)); } decrRefCount(channel); /* it is finally safe to release it */ return retval; }里面还有相应的模式的订阅和取消订阅的操作,原理和channel全然一致。二者的差别在于,pattern是用来匹配的Channel的,这个是什么意思呢。在后面会做出答案,接着看。最后看一个最最核心的方法,客户端发步消息方法:
/* Publish a message */ /* 为全部订阅了Channel的Client发送消息message */ int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; struct dictEntry *de; listNode *ln; listIter li; /* Send to clients listening for that channel */ //找到Channel所相应的dictEntry de = dictFind(server.pubsub_channels,channel); if (de) { //获取此Channel相应的客户单列表 list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { //依次取出List中的客户单,加入消息回复 redisClient *c = ln->value; addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); //加入消息回复 addReplyBulk(c,message); receivers++; } } /* Send to clients listening to matching channels */ /* 发送给尝试匹配该Channel的客户端消息 */ if (listLength(server.pubsub_patterns)) { listRewind(server.pubsub_patterns,&li); channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; //客户端的模式假设匹配了Channel。也会发送消息 if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { addReply(pat->client,shared.mbulkhdr[4]); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++; } } decrRefCount(channel); } return receivers; }pattern的作用就在上面体现了,假设某种pattern匹配了Channel频道,则模式的客户端也会接收消息。在server->pubsub_patterns中,pubsub_patterns是一个list列表,里面的每个pattern仅仅相应一个Client,就是上面的pat->client,这一点和Channel还是有本质的差别的。
讲完公布订阅模式的基本操作后。顺便把与此相关的notify通知类也稍稍讲讲,通知仅仅有3个方法。
/* ----------------- API ------------------- */ int keyspaceEventsStringToFlags(char *classes) /* 键值字符类型转为相应的Class类型 */ sds keyspaceEventsFlagsToString(int flags) /* 通过输入的flag值类,转为字符类型*/ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) /* 公布通知方法,分为2类,keySpace的通知。keyEvent的通知 */涉及到string To flag 和flag To String 的转换,也不知道这个会在哪里用到;
/* Turn a string representing notification classes into an integer * representing notification classes flags xored. * * The function returns -1 if the input contains characters not mapping to * any class. */ /* 键值字符类型转为相应的Class类型 */ int keyspaceEventsStringToFlags(char *classes) { char *p = classes; int c, flags = 0; while((c = *p++) != '