zoukankan      html  css  js  c++  java
  • Redis学习笔记四:独立功能之发布与订阅

    客户端可以通过执行 subscribe 命令订阅一个或多个频道,每当有其他客户端向被订阅的频道发送消息时,频道所有的订阅者都会收到这条消息。
    客户端还可以通过执行 psubscribe 命令订阅一个或多个模式,消息也会被发送给与频道相匹配模式的订阅者。

    频道的订阅与退订

    当一个客户端执行 subscribe 命令定于某个频道的时候,就与被订阅频道之间建立起了一种订阅关系。所有频道的订阅关系都保存在服务器状态的 pubsub_channels 字典中,字典的键是被订阅的频道,值是记录了所有订阅这个频道的客户端。

    struct redisServer {
     // ...
     dict *pubsub_channels;  /* Map channels to list of subscribed clients */
     // ...
    }
    

    订阅频道
    每当执行订阅命令时,服务器会将客户端与被订阅的频道在 pubsub_channels 字典中进行关联。频道中已经有其他订阅者,将客户端添加到订阅者链表的末尾;如果还没有订阅者,那么程序首先在 pubsub_channels 字典中为频道创建一个键,并为这个键的值设置为空链表,然后再将客户端添加到链表。

    退订频道
    订阅频道的逆向过程。从链表中删除客户端,如果链表为空了,则删除这个频道。

    模式的订阅与退订

    模式的订阅关系保存在 pubsub_patterns 链表属性里。

    struct redisServer {
     // ...
     dict *pubsub_channels;  /* Map channels to list of subscribed clients */
     // ...
    }
    

    每个节点都包含着一个 pubsubPattern 结构:

    typedef struct pubsubPattern {
        redisClient *client;
        robj *pattern;
    } pubsubPattern;
    

    client 属性记录了订阅模式的客户端,parttern 属性记录了被订阅的模式。

    订阅模式
    当客户端执行 psubscribe 命令订阅某个模式时,服务器会对每个订阅的模式新建一个 pubsubPattern 结构,将结构的 client 属性设置为订阅模式的客户端, pattern 设置为订阅的模式。然后将pubsubPattern 结构 添加到 pubsub_patterns 链表的表尾。

    退订模式
    订阅模式的反操作。在 pubsub_patterns 链表中查找并删除指定 pattern 和 client 的pubsubPattern 结构。

    发送消息

    当一个 Redis 客户端执行 publish channel message 命令发送给频道 channel 的时候,服务器会将消息 发送给 channel 频道的订阅者,然后发送给模式的订阅者。

    发送给频道订阅者
    在 pubsub_channels 字典里找到频道 channel 的订阅者名单,并将消息依次发送给所有客户端。

    发送给模式订阅者
    遍历整个 pubsub_patterns 链表,查找与 channel 频道想匹配的模式,并将消息发送给这些模式的客户端。

    查看订阅消息

    客户端可以通过 pubsub 这个命令查看频道或模式的相关信息,比如某个频道有多少订阅者,某个模式有多少订阅者。它有三个子命令:

    pubsub channels
    pubsub channels [pattern],用于返回服务器当前被订阅的频道。其中 pattern 是可选参数,不选默认为查询所有。

    127.0.0.1:6379> pubsub channels
    1) "liushijie"
    

    pubsub numsub
    pubsub numsub [channel-1 channel-2 ... channel-n],用于接收任意多个频道作为输入参数并返回这些频道订阅者的数量。必须要有一个频道,否则查出的是空值。

    127.0.0.1:6379> pubsub numsub liushijie
    1) "liushijie"
    2) (integer) 4
    

    pubsub numpat
    pubsub numpat 命令用户返回服务器当前被订阅模式的数量。

    127.0.0.1:6379> pubsub numpat
    (integer) 0
    

    java 实现

    // 发布者
    import redis.clients.jedis.Jedis;
    
    public class Publish {
        public static void main(String[] args) throws InterruptedException {
            Jedis jedis = new Jedis("127.0.0.1", 6379);
    
            while (true) {
                jedis.publish("liushijie", System.currentTimeMillis() + "");
                Thread.sleep(2000);
            }
    
        }
    }
    
    // 订阅者
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPubSub;
    
    public class SubScribe {
        public static void main(String[] args) {
    
            new Thread(new SubscribeRunnable()).start();
            new Thread(new SubscribeRunnable()).start();
            new Thread(new SubscribeRunnable()).start();
    
        }
    
        private static class SubscribeRunnable implements Runnable {
    
            @Override
            public void run() {
                Jedis receiver = new Jedis("127.0.0.1", 6379);
                receiver.subscribe(new JedisPubSub() {
                    @Override
                    public void onMessage(String channel, String message) {
                        System.out.println(channel + " got message " + message);
                    }
    
                    @Override
                    public void onPMessage(String pattern, String channel, String message) {
    
                    }
    
                    @Override
                    public void onSubscribe(String channel, int subscribedChannels) {
    
                    }
    
                    @Override
                    public void onUnsubscribe(String channel, int subscribedChannels) {
    
                    }
    
                    @Override
                    public void onPUnsubscribe(String pattern, int subscribedChannels) {
    
                    }
    
                    @Override
                    public void onPSubscribe(String pattern, int subscribedChannels) {
    
                    }
                }, "liushijie");
            }
        }
    
    }
    
  • 相关阅读:
    JavaScript原型链详解
    Js作用域与闭包
    tjs 在嵌套函数中this关键字引用head对象
    NodeJS stream 一:Buffer
    NodeJS Stream 二:什么是 Stream
    NodeJS Stream 三:readable
    NodeJS Stream 四:Writable
    VSS又一次出错了,神出鬼没的
    VS2005的关于母版页嵌套的一个小技巧
    【转】SQL Server数据库开发的二十一条军规
  • 原文地址:https://www.cnblogs.com/liushijie/p/5094047.html
Copyright © 2011-2022 走看看