zoukankan      html  css  js  c++  java
  • Redis源代码分析(三十)--- pubsub公布订阅模式

            今天学习了Redis中比較高大上的名词,“公布订阅模式”。公布订阅模式这个词在我最開始接触听说的时候是在JMS(Java Message Service)java消息服务中听说的。这个名次用通俗的一点话说。就是我订阅了这类消息,当仅仅有这类的消息进行广播发送的时候。我才会。其它的消息直接过滤,保证了一个高效的传输效率。以下切入正题。学习一下Redis是怎样实现这个公布订阅模式的。先看看里面的简单的API构造;

    /*-----------------------------------------------------------------------------
     * Pubsub low level API
     *----------------------------------------------------------------------------*/
    void freePubsubPattern(void *p) /* 释放公布订阅的模式 */
    int listMatchPubsubPattern(void *a, void *b) /* 公布订阅模式是否匹配 */
    int clientSubscriptionsCount(redisClient *c) /* 返回client的所订阅的数量,包含channels + patterns管道和模式 */
    int pubsubSubscribeChannel(redisClient *c, robj *channel) /* Client订阅一个Channel管道 */
    int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) /* 取消订阅Client中的Channel */
    int pubsubSubscribePattern(redisClient *c, robj *pattern) /* Clientclient订阅一种模式 */
    int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) /* Clientclient取消订阅pattern模式 */
    int pubsubUnsubscribeAllChannels(redisClient *c, int notify) /* client取消自身订阅的全部Channel */
    int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) /* client取消订阅全部的pattern模式 */
    int pubsubPublishMessage(robj *channel, robj *message) /* 为全部订阅了Channel的Client发送消息message */
    
    /* ------------PUB/SUB API ---------------- */
    void subscribeCommand(redisClient *c) /* 订阅Channel的命令 */
    void unsubscribeCommand(redisClient *c) /* 取消订阅Channel的命令 */
    void psubscribeCommand(redisClient *c) /* 订阅模式命令 */
    void punsubscribeCommand(redisClient *c) /* 取消订阅模式命令 */
    void publishCommand(redisClient *c) /* 公布消息命令 */
    void pubsubCommand(redisClient *c) /* 公布订阅命令 */
    
    在这里面出现了高频的词Pattern(模式)和Channel(频道,叫管道比較别扭)。也就是说,兴许全部的关于公布订阅的东东都是基于这2者展开进行的。如今大致解说一下在Redis中是怎样实现此中模式的:

    1.在RedisClient 内部维护了一个pubsub_channels的Channel列表。记录了此client所订阅的频道

    2.在Server服务端。相同维护着一个类似的变量叫做,pubsub_channels,这是一个dict字典变量,每个Channel相应着一批订阅了此频道的Client,也就是Channel-->list of Clients

    3.当一个Client publish一个message的时候。会先去服务端的pubsub_channels找对应的Channel,遍历里面的Client。然后发送通知,即完毕了整个公布订阅模式。

        我们能够简单的看一下Redis订阅一个Channel的方法实现;

    /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
     * 0 if the client was already subscribed to that channel. */
    /* Client订阅一个Channel管道 */
    int pubsubSubscribeChannel(redisClient *c, robj *channel) {
        struct dictEntry *de;
        list *clients = NULL;
        int retval = 0;
    
        /* Add the channel to the client -> channels hash table */
        //在Client的字典pubsub_channels中加入Channel
        if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
            retval = 1;
            incrRefCount(channel);
            /* Add the client to the channel -> list of clients hash table */
            //加入Clietn到server中的pubsub_channels,相应的列表中
            de = dictFind(server.pubsub_channels,channel);
            if (de == NULL) {
            	//假设此频道的Client列表为空,则创建新列表并加入
                clients = listCreate();
                dictAdd(server.pubsub_channels,channel,clients);
                incrRefCount(channel);
            } else {
            	//否则,获取这个频道的客户端列表。在尾部加入新的客户端
                clients = dictGetVal(de);
            }
            listAddNodeTail(clients,c);
        }
        /* Notify the client */
        //加入给回复客户端
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.subscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,clientSubscriptionsCount(c));
        return retval;
    }
    
    加入操作主要分2部,Client自身的内部维护的pubsub_channels的加入。是一个dict字典对象,然后,是server端维护的pubsub_channels中的client列表的加入。在进行Channel频道的删除的时候,也是运行的这2步骤操作:

    /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
     * 0 if the client was not subscribed to the specified channel. */
    /* 取消订阅Client中的Channel */
    int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
        struct dictEntry *de;
        list *clients;
        listNode *ln;
        int retval = 0;
    
        /* Remove the channel from the client -> channels hash table */
        incrRefCount(channel); /* channel may be just a pointer to the same object
                                we have in the hash tables. Protect it... */
        //字典删除Client中pubsub_channels中的Channel
        if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
            retval = 1;
            /* Remove the client from the channel -> clients list hash table */
            //再移除Channel相应的Client列表
            de = dictFind(server.pubsub_channels,channel);
            redisAssertWithInfo(c,NULL,de != NULL);
            clients = dictGetVal(de);
            ln = listSearchKey(clients,c);
            redisAssertWithInfo(c,NULL,ln != NULL);
            listDelNode(clients,ln);
            if (listLength(clients) == 0) {
                /* Free the list and associated hash entry at all if this was
                 * the latest client, so that it will be possible to abuse
                 * Redis PUBSUB creating millions of channels. */
                dictDelete(server.pubsub_channels,channel);
            }
        }
        /* Notify the client */
        if (notify) {
            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.unsubscribebulk);
            addReplyBulk(c,channel);
            addReplyLongLong(c,dictSize(c->pubsub_channels)+
                           listLength(c->pubsub_patterns));
    
        }
        decrRefCount(channel); /* it is finally safe to release it */
        return retval;
    }
    
    里面还有相应的模式的订阅和取消订阅的操作,原理和channel全然一致。二者的差别在于,pattern是用来匹配的Channel的,这个是什么意思呢。在后面会做出答案,接着看。最后看一个最最核心的方法,客户端发步消息方法:

    /* Publish a message */
    /* 为全部订阅了Channel的Client发送消息message */
    int pubsubPublishMessage(robj *channel, robj *message) {
        int receivers = 0;
        struct dictEntry *de;
        listNode *ln;
        listIter li;
    
        /* Send to clients listening for that channel */
        //找到Channel所相应的dictEntry
        de = dictFind(server.pubsub_channels,channel);
        if (de) {
        	//获取此Channel相应的客户单列表
            list *list = dictGetVal(de);
            listNode *ln;
            listIter li;
    
            listRewind(list,&li);
            while ((ln = listNext(&li)) != NULL) {
            	//依次取出List中的客户单,加入消息回复
                redisClient *c = ln->value;
    
                addReply(c,shared.mbulkhdr[3]);
                addReply(c,shared.messagebulk);
                addReplyBulk(c,channel);
                //加入消息回复
                addReplyBulk(c,message);
                receivers++;
            }
        }
        /* Send to clients listening to matching channels */
        /* 发送给尝试匹配该Channel的客户端消息 */
        if (listLength(server.pubsub_patterns)) {
            listRewind(server.pubsub_patterns,&li);
            channel = getDecodedObject(channel);
            while ((ln = listNext(&li)) != NULL) {
                pubsubPattern *pat = ln->value;
    			
    			//客户端的模式假设匹配了Channel。也会发送消息
                if (stringmatchlen((char*)pat->pattern->ptr,
                                    sdslen(pat->pattern->ptr),
                                    (char*)channel->ptr,
                                    sdslen(channel->ptr),0)) {
                    addReply(pat->client,shared.mbulkhdr[4]);
                    addReply(pat->client,shared.pmessagebulk);
                    addReplyBulk(pat->client,pat->pattern);
                    addReplyBulk(pat->client,channel);
                    addReplyBulk(pat->client,message);
                    receivers++;
                }
            }
            decrRefCount(channel);
        }
        return receivers;
    }
    
    pattern的作用就在上面体现了,假设某种pattern匹配了Channel频道,则模式的客户端也会接收消息。在server->pubsub_patterns中,pubsub_patterns是一个list列表,里面的每个pattern仅仅相应一个Client,就是上面的pat->client,这一点和Channel还是有本质的差别的。

    讲完公布订阅模式的基本操作后。顺便把与此相关的notify通知类也稍稍讲讲,通知仅仅有3个方法。

    /* ----------------- API ------------------- */
    int keyspaceEventsStringToFlags(char *classes) /* 键值字符类型转为相应的Class类型 */
    sds keyspaceEventsFlagsToString(int flags) /* 通过输入的flag值类,转为字符类型*/
    void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) /* 公布通知方法,分为2类,keySpace的通知。keyEvent的通知 */
    涉及到string To flag 和flag To String 的转换,也不知道这个会在哪里用到;

    /* Turn a string representing notification classes into an integer
     * representing notification classes flags xored.
     *
     * The function returns -1 if the input contains characters not mapping to
     * any class. */
    /* 键值字符类型转为相应的Class类型 */
    int keyspaceEventsStringToFlags(char *classes) {
        char *p = classes;
        int c, flags = 0;
    
        while((c = *p++) != '') {
            switch(c) {
            case 'A': flags |= REDIS_NOTIFY_ALL; break;
            case 'g': flags |= REDIS_NOTIFY_GENERIC; break;
            case '$': flags |= REDIS_NOTIFY_STRING; break;
            case 'l': flags |= REDIS_NOTIFY_LIST; break;
            case 's': flags |= REDIS_NOTIFY_SET; break;
            case 'h': flags |= REDIS_NOTIFY_HASH; break;
            case 'z': flags |= REDIS_NOTIFY_ZSET; break;
            case 'x': flags |= REDIS_NOTIFY_EXPIRED; break;
            case 'e': flags |= REDIS_NOTIFY_EVICTED; break;
            case 'K': flags |= REDIS_NOTIFY_KEYSPACE; break;
            case 'E': flags |= REDIS_NOTIFY_KEYEVENT; break;
            default: return -1;
            }
        }
        return flags;
    }
    
    应该是响应键盘输入的类型和Redis类型之间的转换。在notify的方法另一个event事件的通知方法:

    /* The API provided to the rest of the Redis core is a simple function:
     *
     * notifyKeyspaceEvent(char *event, robj *key, int dbid);
     *
     * 'event' is a C string representing the event name.
     * 'key' is a Redis object representing the key name.
     * 'dbid' is the database ID where the key lives.  */
    /* 公布通知方法,分为2类,keySpace的通知,keyEvent的通知 */ 
    void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
        sds chan;
        robj *chanobj, *eventobj;
        int len = -1;
        char buf[24];
    
        /* If notifications for this class of events are off, return ASAP. */
        if (!(server.notify_keyspace_events & type)) return;
    
        eventobj = createStringObject(event,strlen(event));
        
        //2种的通知形式,略有区别
        /* __keyspace@<db>__:<key> <event> notifications. */
        if (server.notify_keyspace_events & REDIS_NOTIFY_KEYSPACE) {
            chan = sdsnewlen("__keyspace@",11);
            len = ll2string(buf,sizeof(buf),dbid);
            chan = sdscatlen(chan, buf, len);
            chan = sdscatlen(chan, "__:", 3);
            chan = sdscatsds(chan, key->ptr);
            chanobj = createObject(REDIS_STRING, chan);
            //上述几步操作,组件格式字符串。最后公布消息。以下keyEvent的通知同理
            pubsubPublishMessage(chanobj, eventobj);
            decrRefCount(chanobj);
        }
    
        /* __keyevente@<db>__:<event> <key> notifications. */
        if (server.notify_keyspace_events & REDIS_NOTIFY_KEYEVENT) {
            chan = sdsnewlen("__keyevent@",11);
            if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
            chan = sdscatlen(chan, buf, len);
            chan = sdscatlen(chan, "__:", 3);
            chan = sdscatsds(chan, eventobj->ptr);
            chanobj = createObject(REDIS_STRING, chan);
            pubsubPublishMessage(chanobj, key);
            decrRefCount(chanobj);
        }
        decrRefCount(eventobj);
    }
    
    有keySpace和keyEvent的2种事件通知。

    详细怎么用。等后面碰到的时候在看看。

  • 相关阅读:
    ubuntu下怎么配置/查看串口-minicom工具
    jpg与pgm(P5)的互相转换(Python)
    hyper-v安装ubuntu18的全过程+踩过的坑(win10家庭版)
    zerotier的下载、安装、配置与使用(win10、ubuntu)
    github page+jekyll构建博客的解决方案
    opencv2.4.13.7的resize函数使用(c++)
    c++中的const和volatile知识自我总结
    各种优化算法详解
    P与NP问题
    vs2017配置pthread.h的方法
  • 原文地址:https://www.cnblogs.com/clnchanpin/p/7116767.html
Copyright © 2011-2022 走看看