zoukankan      html  css  js  c++  java
  • redis的发布订阅模式

    概要
    redis的每个server实例都维护着一个保存服务器状态的redisServer结构
    struct redisServer
    {
        /* Pubsub */
        // 字典,键为频道,值为链表
        // 链表中保存了所有订阅某个频道的客户端
        // 新客户端总是被添加到链表的表尾
        dict *pubsub_channels;  /* Map channels to list of subscribed clients */
        // 这个链表记录了客户端订阅的所有模式的名字
        list *pubsub_patterns;  /* A list of pubsub_patterns */
    };
    pubsub_channels记录了所有客户端订阅的频道的信息。数据类型是dict,dict中的key代表了channel,而val存储了关注频道的所有clients。
    redis的发布订阅模式

     

    订阅者通过sub命令订阅频道,server使用pub命令把消息推送到符合条件的pubsub_channels中。
    内部数据结构
    pubsub_channels是一个字典结构,字典内部使用hash表存储和索引数据。
    实现字典的可选数据结构
        hash表:简单但是不稳定的基于数组的冲突解决法;简单且平均效率稳定的链式地址的冲突解决法;
        字典树:使用树结构。
    redis采用hash的链式地址法作为实现,好处是直观、简单、可期待平均时间复杂度较小且平稳。
    typedef struct dict {
        // 类型特定函数
        dictType *type;
        // 私有数据
        void *privdata;
        // 哈希表
        dictht ht[2];
        // rehash 索引
        // 当 rehash 不在进行时,值为 -1
        int rehashidx; /* rehashing not in progress if rehashidx == -1 */
        // 目前正在运行的安全迭代器的数量
        int iterators; /* number of iterators currently running */
    } dict;
    dict使用两个hash表进行索引,当进行rehash的时候使用ht[1]的hash表,其他时候都使用hd[0]的hash表。
    使用的hash表的定义如下
    typedef struct dictht {
        // 哈希表数组
        dictEntry **table;
        // 哈希表大小
        unsigned long size;
        // 哈希表大小掩码,用于计算索引值
        // 总是等于 size - 1
        unsigned long sizemask;
        // 该哈希表已有节点的数量
        unsigned long used;
    } dictht;
    table是一个存储dictEntry*指针的数组,table中的每一项都是一个指向DictEntry结构的指针,同时每一项也都带有一个指向下一项的指针。可以看出这是一个使用链地址法解决冲突的hash结构。
    dictEntry的定义,包含key、value、next。
    /*
     * 哈希表节点
     */
    typedef struct dictEntry {
        // 键
        void *key;
        // 值
        union {
            void *val;
            uint64_t u64;
            int64_t s64;
        } v;
        // 指向下个哈希表节点,形成链表
        struct dictEntry *next;
    } dictEntry;
    客户端订阅频道
    就是字典中插入新元素的过程
            // 关联示意图
            // {
            //  频道名        订阅频道的客户端
            //  'channel-a' : [c1, c2, c3],
            //  'channel-b' : [c5, c2, c1],
            //  'channel-c' : [c10, c2, c1]
            // }
            /* Add the client to the channel -> list of clients hash table */
            // 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
            // 如果 channel 不存在于字典,那么添加进去
            de = dictFind(server.pubsub_channels,channel);
            if (de == NULL) {
                clients = listCreate();
                dictAdd(server.pubsub_channels,channel,clients);
                incrRefCount(channel);
            } else {
                clients = dictGetVal(de);
            }
            // before:
            // 'channel' : [c1, c2]
            // after:
            // 'channel' : [c1, c2, c3]
            // 将客户端添加到链表的末尾
            listAddNodeTail(clients,c);
    1.查询server是否包含指定频道
    2.如果频道存在就获取频道指向的list地址;如果频道不存在,就创建频道和list
    3.使用步骤2的list,将客户端添加到list的末尾。
    完成这三步以后,server维护的发布订阅频道就新增了一个频道和关注的客户端,server发布时检测发布订阅字典,获取订阅客户端并依次发送。
    server发布消息到channel
    /* Send to clients listening for that channel */
        // 取出包含所有订阅频道 channel 的客户端的链表
        // 并将消息发送给它们
        de = dictFind(server.pubsub_channels,channel);
        if (de) {
            list *list = dictGetVal(de);
            listNode *ln;
            listIter li;
            // 遍历客户端链表,将 message 发送给它们
            listRewind(list,&li);
            while ((ln = listNext(&li)) != NULL) {
                redisClient *c = ln->value;
                // 回复客户端。
                // 示例:
                // 1) "message"
                // 2) "xxx"
                // 3) "hello"
                addReply(c,shared.mbulkhdr[3]);
                // "message" 字符串
                addReply(c,shared.messagebulk);
                // 消息的来源频道
                addReplyBulk(c,channel);
                // 消息内容
                addReplyBulk(c,message);
                // 接收客户端计数
                receivers++;
            }
        }
    结合订阅者订阅的过程,发布过程就是一个查找订阅者并轮询发送消息给订阅者的过程。
  • 相关阅读:
    electron之打包成安装程序
    electron之环境安装、启动程序
    微信支付.net官方坑太多,我们来精简
    微信支付官方.net版之坑你没商量
    程序员出路在何方
    简单介绍
    mac中显示隐藏文件
    sublime Text 3 安装emmet
    Andriod学习笔记5:通过NDK在C++中实现日志输出
    Andriod学习笔记4:mac下搭建 Eclipse+CDT 集成开发环境
  • 原文地址:https://www.cnblogs.com/learn-my-life/p/5637742.html
Copyright © 2011-2022 走看看