客户端可以通过执行 subscribe 命令订阅一个或多个频道,每当有其他客户端向被订阅的频道发送消息时,频道所有的订阅者都会收到这条消息。
客户端还可以通过执行 psubscribe 命令订阅一个或多个模式,消息也会被发送给与频道相匹配模式的订阅者。
频道的订阅与退订
当一个客户端执行 subscribe 命令定于某个频道的时候,就与被订阅频道之间建立起了一种订阅关系。所有频道的订阅关系都保存在服务器状态的 pubsub_channels 字典中,字典的键是被订阅的频道,值是记录了所有订阅这个频道的客户端。
struct redisServer {
// ...
dict *pubsub_channels; /* Map channels to list of subscribed clients */
// ...
}
订阅频道
每当执行订阅命令时,服务器会将客户端与被订阅的频道在 pubsub_channels 字典中进行关联。频道中已经有其他订阅者,将客户端添加到订阅者链表的末尾;如果还没有订阅者,那么程序首先在 pubsub_channels 字典中为频道创建一个键,并为这个键的值设置为空链表,然后再将客户端添加到链表。
退订频道
订阅频道的逆向过程。从链表中删除客户端,如果链表为空了,则删除这个频道。
模式的订阅与退订
模式的订阅关系保存在 pubsub_patterns 链表属性里。
struct redisServer {
// ...
dict *pubsub_channels; /* Map channels to list of subscribed clients */
// ...
}
每个节点都包含着一个 pubsubPattern 结构:
typedef struct pubsubPattern {
redisClient *client;
robj *pattern;
} pubsubPattern;
client 属性记录了订阅模式的客户端,parttern 属性记录了被订阅的模式。
订阅模式
当客户端执行 psubscribe 命令订阅某个模式时,服务器会对每个订阅的模式新建一个 pubsubPattern 结构,将结构的 client 属性设置为订阅模式的客户端, pattern 设置为订阅的模式。然后将pubsubPattern 结构 添加到 pubsub_patterns 链表的表尾。
退订模式
订阅模式的反操作。在 pubsub_patterns 链表中查找并删除指定 pattern 和 client 的pubsubPattern 结构。
发送消息
当一个 Redis 客户端执行 publish channel message 命令发送给频道 channel 的时候,服务器会将消息 发送给 channel 频道的订阅者,然后发送给模式的订阅者。
发送给频道订阅者
在 pubsub_channels 字典里找到频道 channel 的订阅者名单,并将消息依次发送给所有客户端。
发送给模式订阅者
遍历整个 pubsub_patterns 链表,查找与 channel 频道想匹配的模式,并将消息发送给这些模式的客户端。
查看订阅消息
客户端可以通过 pubsub 这个命令查看频道或模式的相关信息,比如某个频道有多少订阅者,某个模式有多少订阅者。它有三个子命令:
pubsub channels
pubsub channels [pattern],用于返回服务器当前被订阅的频道。其中 pattern 是可选参数,不选默认为查询所有。
127.0.0.1:6379> pubsub channels
1) "liushijie"
pubsub numsub
pubsub numsub [channel-1 channel-2 ... channel-n],用于接收任意多个频道作为输入参数并返回这些频道订阅者的数量。必须要有一个频道,否则查出的是空值。
127.0.0.1:6379> pubsub numsub liushijie
1) "liushijie"
2) (integer) 4
pubsub numpat
pubsub numpat 命令用户返回服务器当前被订阅模式的数量。
127.0.0.1:6379> pubsub numpat
(integer) 0
java 实现
// 发布者
import redis.clients.jedis.Jedis;
public class Publish {
public static void main(String[] args) throws InterruptedException {
Jedis jedis = new Jedis("127.0.0.1", 6379);
while (true) {
jedis.publish("liushijie", System.currentTimeMillis() + "");
Thread.sleep(2000);
}
}
}
// 订阅者
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class SubScribe {
public static void main(String[] args) {
new Thread(new SubscribeRunnable()).start();
new Thread(new SubscribeRunnable()).start();
new Thread(new SubscribeRunnable()).start();
}
private static class SubscribeRunnable implements Runnable {
@Override
public void run() {
Jedis receiver = new Jedis("127.0.0.1", 6379);
receiver.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
System.out.println(channel + " got message " + message);
}
@Override
public void onPMessage(String pattern, String channel, String message) {
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
}
}, "liushijie");
}
}
}