zoukankan      html  css  js  c++  java
  • Redis源码分析(三十)--- pubsub发布订阅模式

            今天学习了Redis中比较高大上的名词,“发布订阅模式”,发布订阅模式这个词在我最开始接触听说的时候是在JMS(Java Message Service)java消息服务中听说的。这个名次用通俗的一点话说,就是我订阅了这类消息,当只有这类的消息进行广播发送的时候,我才会,其他的消息直接过滤,保证了一个高效的传输效率。下面切入正题,学习一下Redis是如何实现这个发布订阅模式的。先看看里面的简单的API构造;

    /*-----------------------------------------------------------------------------
     * Pubsub low level API
     *----------------------------------------------------------------------------*/
    void freePubsubPattern(void *p) /* 释放发布订阅的模式 */
    int listMatchPubsubPattern(void *a, void *b) /* 发布订阅模式是否匹配 */
    int clientSubscriptionsCount(redisClient *c) /* 返回客户端的所订阅的数量,包括channels + patterns管道和模式 */
    int pubsubSubscribeChannel(redisClient *c, robj *channel) /* Client订阅一个Channel管道 */
    int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) /* 取消订阅Client中的Channel */
    int pubsubSubscribePattern(redisClient *c, robj *pattern) /* Client客户端订阅一种模式 */
    int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) /* Client客户端取消订阅pattern模式 */
    int pubsubUnsubscribeAllChannels(redisClient *c, int notify) /* 客户端取消自身订阅的所有Channel */
    int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) /* 客户端取消订阅所有的pattern模式 */
    int pubsubPublishMessage(robj *channel, robj *message) /* 为所有订阅了Channel的Client发送消息message */
    
    /* ------------PUB/SUB API ---------------- */
    void subscribeCommand(redisClient *c) /* 订阅Channel的命令 */
    void unsubscribeCommand(redisClient *c) /* 取消订阅Channel的命令 */
    void psubscribeCommand(redisClient *c) /* 订阅模式命令 */
    void punsubscribeCommand(redisClient *c) /* 取消订阅模式命令 */
    void publishCommand(redisClient *c) /* 发布消息命令 */
    void pubsubCommand(redisClient *c) /* 发布订阅命令 */
    
    在这里面出现了高频的词Pattern(模式)和Channel(频道,叫管道比较别扭),也就是说,后续所有的关于发布订阅的东东都是基于这2者展开进行的。现在大致讲解一下在Redis中是如何实现此中模式的:

    1.在RedisClient 内部维护了一个pubsub_channels的Channel列表,记录了此客户端所订阅的频道

    2.在Server服务端,同样维护着一个类似的变量叫做,pubsub_channels,这是一个dict字典变量,每一个Channel对应着一批订阅了此频道的Client,也就是Channel-->list of Clients

    3.当一个Client publish一个message的时候,会先去服务端的pubsub_channels找相应的Channel,遍历里面的Client,然后发送通知,即完成了整个发布订阅模式。

        我们可以简单的看一下Redis订阅一个Channel的方法实现;

    /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
     * 0 if the client was already subscribed to that channel. */
    /* Client订阅一个Channel管道 */
    int pubsubSubscribeChannel(redisClient *c, robj *channel) {
        struct dictEntry *de;
        list *clients = NULL;
        int retval = 0;
    
        /* Add the channel to the client -> channels hash table */
        //在Client的字典pubsub_channels中添加Channel
        if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
            retval = 1;
            incrRefCount(channel);
            /* Add the client to the channel -> list of clients hash table */
            //添加Clietn到server中的pubsub_channels,对应的列表中
            de = dictFind(server.pubsub_channels,channel);
            if (de == NULL) {
            	//如果此频道的Client列表为空,则创建新列表并添加
                clients = listCreate();
                dictAdd(server.pubsub_channels,channel,clients);
                incrRefCount(channel);
            } else {
            	//否则,获取这个频道的客户端列表,在尾部添加新的客户端
                clients = dictGetVal(de);
            }
            listAddNodeTail(clients,c);
        }
        /* Notify the client */
        //添加给回复客户端
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.subscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,clientSubscriptionsCount(c));
        return retval;
    }
    
    添加操作主要分2部,Client自身的内部维护的pubsub_channels的添加,是一个dict字典对象,然后,是server端维护的pubsub_channels中的client列表的添加。在进行Channel频道的删除的时候,也是执行的这2步骤操作:

    /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
     * 0 if the client was not subscribed to the specified channel. */
    /* 取消订阅Client中的Channel */
    int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
        struct dictEntry *de;
        list *clients;
        listNode *ln;
        int retval = 0;
    
        /* Remove the channel from the client -> channels hash table */
        incrRefCount(channel); /* channel may be just a pointer to the same object
                                we have in the hash tables. Protect it... */
        //字典删除Client中pubsub_channels中的Channel
        if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
            retval = 1;
            /* Remove the client from the channel -> clients list hash table */
            //再移除Channel对应的Client列表
            de = dictFind(server.pubsub_channels,channel);
            redisAssertWithInfo(c,NULL,de != NULL);
            clients = dictGetVal(de);
            ln = listSearchKey(clients,c);
            redisAssertWithInfo(c,NULL,ln != NULL);
            listDelNode(clients,ln);
            if (listLength(clients) == 0) {
                /* Free the list and associated hash entry at all if this was
                 * the latest client, so that it will be possible to abuse
                 * Redis PUBSUB creating millions of channels. */
                dictDelete(server.pubsub_channels,channel);
            }
        }
        /* Notify the client */
        if (notify) {
            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.unsubscribebulk);
            addReplyBulk(c,channel);
            addReplyLongLong(c,dictSize(c->pubsub_channels)+
                           listLength(c->pubsub_patterns));
    
        }
        decrRefCount(channel); /* it is finally safe to release it */
        return retval;
    }
    
    里面还有对应的模式的订阅和取消订阅的操作,原理和channel完全一致,二者的区别在于,pattern是用来匹配的Channel的,这个是什么意思呢。在后面会做出答案,接着看。最后看一个最最核心的方法,客户端发步消息方法:

    /* Publish a message */
    /* 为所有订阅了Channel的Client发送消息message */
    int pubsubPublishMessage(robj *channel, robj *message) {
        int receivers = 0;
        struct dictEntry *de;
        listNode *ln;
        listIter li;
    
        /* Send to clients listening for that channel */
        //找到Channel所对应的dictEntry
        de = dictFind(server.pubsub_channels,channel);
        if (de) {
        	//获取此Channel对应的客户单列表
            list *list = dictGetVal(de);
            listNode *ln;
            listIter li;
    
            listRewind(list,&li);
            while ((ln = listNext(&li)) != NULL) {
            	//依次取出List中的客户单,添加消息回复
                redisClient *c = ln->value;
    
                addReply(c,shared.mbulkhdr[3]);
                addReply(c,shared.messagebulk);
                addReplyBulk(c,channel);
                //添加消息回复
                addReplyBulk(c,message);
                receivers++;
            }
        }
        /* Send to clients listening to matching channels */
        /* 发送给尝试匹配该Channel的客户端消息 */
        if (listLength(server.pubsub_patterns)) {
            listRewind(server.pubsub_patterns,&li);
            channel = getDecodedObject(channel);
            while ((ln = listNext(&li)) != NULL) {
                pubsubPattern *pat = ln->value;
    			
    			//客户端的模式如果匹配了Channel,也会发送消息
                if (stringmatchlen((char*)pat->pattern->ptr,
                                    sdslen(pat->pattern->ptr),
                                    (char*)channel->ptr,
                                    sdslen(channel->ptr),0)) {
                    addReply(pat->client,shared.mbulkhdr[4]);
                    addReply(pat->client,shared.pmessagebulk);
                    addReplyBulk(pat->client,pat->pattern);
                    addReplyBulk(pat->client,channel);
                    addReplyBulk(pat->client,message);
                    receivers++;
                }
            }
            decrRefCount(channel);
        }
        return receivers;
    }
    
    pattern的作用就在上面体现了,如果某种pattern匹配了Channel频道,则模式的客户端也会接收消息。在server->pubsub_patterns中,pubsub_patterns是一个list列表,里面的每一个pattern只对应一个Client,就是上面的pat->client,这一点和Channel还是有本质的区别的。讲完发布订阅模式的基本操作后,顺便把与此相关的notify通知类也稍稍讲讲,通知只有3个方法,
    /* ----------------- API ------------------- */
    int keyspaceEventsStringToFlags(char *classes) /* 键值字符类型转为对应的Class类型 */
    sds keyspaceEventsFlagsToString(int flags) /* 通过输入的flag值类,转为字符类型*/
    void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) /* 发布通知方法,分为2类,keySpace的通知,keyEvent的通知 */
    涉及到string To flag 和flag To String 的转换,也不知道这个会在哪里用到;

    /* Turn a string representing notification classes into an integer
     * representing notification classes flags xored.
     *
     * The function returns -1 if the input contains characters not mapping to
     * any class. */
    /* 键值字符类型转为对应的Class类型 */
    int keyspaceEventsStringToFlags(char *classes) {
        char *p = classes;
        int c, flags = 0;
    
        while((c = *p++) != '') {
            switch(c) {
            case 'A': flags |= REDIS_NOTIFY_ALL; break;
            case 'g': flags |= REDIS_NOTIFY_GENERIC; break;
            case '$': flags |= REDIS_NOTIFY_STRING; break;
            case 'l': flags |= REDIS_NOTIFY_LIST; break;
            case 's': flags |= REDIS_NOTIFY_SET; break;
            case 'h': flags |= REDIS_NOTIFY_HASH; break;
            case 'z': flags |= REDIS_NOTIFY_ZSET; break;
            case 'x': flags |= REDIS_NOTIFY_EXPIRED; break;
            case 'e': flags |= REDIS_NOTIFY_EVICTED; break;
            case 'K': flags |= REDIS_NOTIFY_KEYSPACE; break;
            case 'E': flags |= REDIS_NOTIFY_KEYEVENT; break;
            default: return -1;
            }
        }
        return flags;
    }
    
    应该是响应键盘输入的类型和Redis类型之间的转换。在notify的方法还有一个event事件的通知方法:

    /* The API provided to the rest of the Redis core is a simple function:
     *
     * notifyKeyspaceEvent(char *event, robj *key, int dbid);
     *
     * 'event' is a C string representing the event name.
     * 'key' is a Redis object representing the key name.
     * 'dbid' is the database ID where the key lives.  */
    /* 发布通知方法,分为2类,keySpace的通知,keyEvent的通知 */ 
    void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
        sds chan;
        robj *chanobj, *eventobj;
        int len = -1;
        char buf[24];
    
        /* If notifications for this class of events are off, return ASAP. */
        if (!(server.notify_keyspace_events & type)) return;
    
        eventobj = createStringObject(event,strlen(event));
        
        //2种的通知形式,略有差别
        /* __keyspace@<db>__:<key> <event> notifications. */
        if (server.notify_keyspace_events & REDIS_NOTIFY_KEYSPACE) {
            chan = sdsnewlen("__keyspace@",11);
            len = ll2string(buf,sizeof(buf),dbid);
            chan = sdscatlen(chan, buf, len);
            chan = sdscatlen(chan, "__:", 3);
            chan = sdscatsds(chan, key->ptr);
            chanobj = createObject(REDIS_STRING, chan);
            //上述几步操作,组件格式字符串,最后发布消息,下面keyEvent的通知同理
            pubsubPublishMessage(chanobj, eventobj);
            decrRefCount(chanobj);
        }
    
        /* __keyevente@<db>__:<event> <key> notifications. */
        if (server.notify_keyspace_events & REDIS_NOTIFY_KEYEVENT) {
            chan = sdsnewlen("__keyevent@",11);
            if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
            chan = sdscatlen(chan, buf, len);
            chan = sdscatlen(chan, "__:", 3);
            chan = sdscatsds(chan, eventobj->ptr);
            chanobj = createObject(REDIS_STRING, chan);
            pubsubPublishMessage(chanobj, key);
            decrRefCount(chanobj);
        }
        decrRefCount(eventobj);
    }
    
    有keySpace和keyEvent的2种事件通知。具体怎么用,等后面碰到的时候在看看。
  • 相关阅读:
    Python 学习日记 第七天
    Python 学习日记 第六天
    Python 学习日记 第五天
    Python 学习日记 第四天
    Redis 中的数据类型及基本操作
    Asp.net mvc 中View 的呈现(二)
    Asp.net mvc 中View的呈现(一)
    Asp.net mvc 中Action 方法的执行(三)
    Asp.net mvc 中Action 方法的执行(二)
    Asp.net mvc 中Action 方法的执行(一)
  • 原文地址:https://www.cnblogs.com/bianqi/p/12184197.html
Copyright © 2011-2022 走看看