zoukankan      html  css  js  c++  java
  • redis 订阅发布

      第一次接触订阅&&发布模型的时候是在openvswitch里面,其使用ovsdb-nosql数据库处理盒子产品的数据库控制平面;

    目前看redis 的时候又看到了订阅&&发布,所以来看看源码以及其使用的数据结构!

    Redis中是如何实现此中模式的:???

    • 1.在RedisClient 内部维护了一个pubsub_channels的Channel列表,记录了此客户端所订阅的频道
    • 2.在Server服务端,同样维护着一个类似的变量叫做,pubsub_channels,这是一个dict字典变量,每一个Channel对应着一批订阅了此频道的Client,也就是Channel-->list of Clients
    • 3.当一个Client publish一个message的时候,会先去服务端的pubsub_channels找相应的Channel,遍历里面的Client,然后发送通知,即完成了整个发布订阅模式。
    /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
     * 0 if the client was already subscribed to that channel. 
     *
     * 设置客户端 c 订阅频道 channel 。
     *
     * 订阅成功返回 1 ,如果客户端已经订阅了该频道,那么返回 0 。
     */
    int pubsubSubscribeChannel(redisClient *c, robj *channel) {
        dictEntry *de;
        list *clients = NULL;
        int retval = 0;
    
        /* Add the channel to the client -> channels hash table */
        // 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)
        if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
            retval = 1;
            incrRefCount(channel);
    
            // 关联示意图
            // {
            //  频道名        订阅频道的客户端
            //  'channel-a' : [c1, c2, c3],
            //  'channel-b' : [c5, c2, c1],
            //  'channel-c' : [c10, c2, c1]
            // }
            /* Add the client to the channel -> list of clients hash table */
            // 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
            // 如果 channel 不存在于字典,那么添加进去
            de = dictFind(server.pubsub_channels,channel);
            if (de == NULL) {
                clients = listCreate();
                dictAdd(server.pubsub_channels,channel,clients);
                incrRefCount(channel);
            } else {
                clients = dictGetVal(de);
            }
    
            // before:
            // 'channel' : [c1, c2]
            // after:
            // 'channel' : [c1, c2, c3]
            // 将客户端添加到链表的末尾
            listAddNodeTail(clients,c);
        }
    
        /* Notify the client */
        // 回复客户端。
        // 示例:
        // redis 127.0.0.1:6379> SUBSCRIBE xxx
        // Reading messages... (press Ctrl-C to quit)
        // 1) "subscribe"
        // 2) "xxx"
        // 3) (integer) 1
        addReply(c,shared.mbulkhdr[3]);
        // "subscribe
    " 字符串
        addReply(c,shared.subscribebulk);
        // 被订阅的客户端
        addReplyBulk(c,channel);
        // 客户端订阅的频道和模式总数
        addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
        return retval;
    }
    /* -----------------------------------------------------------------------------
     * Higher level functions to queue data on the client output buffer.
     * The following functions are the ones that commands implementations will call.
     * -------------------------------------------------------------------------- */
    //sendReplyToClient为实际的数据write的地方,并且会现在一次性最多发送多少,避免阻塞
    void addReply(redisClient *c, robj *obj) {
    
        // 为客户端安装写处理器到事件循环
        if (prepareClientToWrite(c) != REDIS_OK) return;
    
        /* This is an important place where we can avoid copy-on-write
         * when there is a saving child running, avoiding touching the
         * refcount field of the object if it's not needed.
         *
         * 如果在使用子进程,那么尽可能地避免修改对象的 refcount 域。
         *
         * If the encoding is RAW and there is room in the static buffer
         * we'll be able to send the object to the client without
         * messing with its page. 
         *
         * 如果对象的编码为 RAW ,并且静态缓冲区中有空间
         * 那么就可以在不弄乱内存页的情况下,将对象发送给客户端。
         */
        if (sdsEncodedObject(obj)) {
            // 首先尝试复制内容到 c->buf 中,这样可以避免内存分配
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
                // 如果 c->buf 中的空间不够,就复制到 c->reply 链表中
                // 可能会引起内存分配
                _addReplyObjectToList(c,obj);
        } else if (obj->encoding == REDIS_ENCODING_INT) {
            /* Optimization: if there is room in the static buffer for 32 bytes
             * (more than the max chars a 64 bit integer can take as string) we
             * avoid decoding the object and go for the lower level approach. */
            // 优化,如果 c->buf 中有等于或多于 32 个字节的空间
            // 那么将整数直接以字符串的形式复制到 c->buf 中
            if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
                char buf[32];
                int len;
    
                len = ll2string(buf,sizeof(buf),(long)obj->ptr);
                if (_addReplyToBuffer(c,buf,len) == REDIS_OK)
                    return;
                /* else... continue with the normal code path, but should never
                 * happen actually since we verified there is room. */
            }
            // 执行到这里,代表对象是整数,并且长度大于 32 位
            // 将它转换为字符串
            obj = getDecodedObject(obj);
            // 保存到缓存中
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
                _addReplyObjectToList(c,obj);
            decrRefCount(obj);
        } else {
            redisPanic("Wrong obj->encoding in addReply()");
        }
    }
    /* Add a Redis Object as a bulk reply 
     *
     * 返回一个 Redis 对象作为回复
     */
    void addReplyBulk(redisClient *c, robj *obj) {
        addReplyBulkLen(c,obj);
        addReply(c,obj);
        addReply(c,shared.crlf);
    }

    如上就是单个channel的订阅方式了,总结如下:

        1. 客户端自行管理需要订阅的channel, 放到 c->pubsub_channels 中;
        2. redis使用的一个统一的 server->pubsub_channels dict容器进行管理所有的channel;
        3. 对于多个客户端订阅一个channel, redis 使用list进行管理追加;

    http代理服务器(3-4-7层代理)-网络事件库公共组件、内核kernel驱动 摄像头驱动 tcpip网络协议栈、netfilter、bridge 好像看过!!!! 但行好事 莫问前程 --身高体重180的胖子
  • 相关阅读:
    PAT 1142 Maximal Clique
    PAT 1076 Forwards on Weibo
    PAT 1021 Deepest Root
    PAT 1030 Travel Plan*
    diji模板
    PAT 1020 Tree Traversals
    PAT 1108 Finding Average
    PAT 1104 Sum of Number Segments
    PAT 1100 Mars Numbers
    PAT 1096 Consecutive Factors
  • 原文地址:https://www.cnblogs.com/codestack/p/15469296.html
Copyright © 2011-2022 走看看