zoukankan      html  css  js  c++  java
  • 最通俗易懂的Redis发布订阅及代码实战

    发布订阅简介

    除了使用List实现简单的消息队列功能以外,Redis还提供了发布订阅的消息机制。在这种机制下,消息发布者向指定频道(channel)发布消息,消息订阅者可以收到指定频道的消息,同一个频道可以有多个消息订阅者,如下图:

    在这里插入图片描述

    Redis也提供了一些命令支持这个机制,接下来我们详细介绍一下这些命令。

    欢迎关注微信公众号:万猫学社,每周一分享Java技术干货。

    发布订阅相关命令

    在Redis中,发布订阅相关命令有:

    1. 发布消息
    2. 订阅频道
    3. 取消订阅
    4. 按照模式订阅
    5. 按照模式取消订阅
    6. 查询订阅信息

    发布消息

    发布消息的命令是publish,语法是:

    publish 频道名称 消息
    

    比如,要向channel:one-more-study:demo频道发布一条消息“I am One More Study.”,命令如下:

    > publish channel:one-more-study:demo "I am One More Study."
    (integer) 0
    

    返回的结果是订阅者的个数,上例中没有订阅者,所以返回结果为0。

    订阅消息

    订阅消息的命令是subscribe,订阅者可以订阅一个或者多个频道,语法是:

    subscribe 频道名称 [频道名称 ...]
    

    比如,订阅一个channel:one-more-study:demo频道,命令如下:

    > subscribe channel:one-more-study:demo
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "channel:one-more-study:demo"
    3) (integer) 1
    

    返回结果中有3条,分别表示:返回值的类型(订阅成功)、订阅的频道名称、目前已订阅的频道数量。当订阅者接受到消息时,就会显示:

    1) "message"
    2) "channel:one-more-study:demo"
    3) "I am One More Study."
    

    同样也是3条结果,分别表示:返回值的类型(信息)、消息来源的频道名称、消息内容。

    新开启的订阅者,是无法收到该频道之前的历史消息的,因为Redis没有对发布的消息做持久化。

    取消订阅

    取消订阅的命令是unsubscribe,可以取消一个或者多个频道的订阅,语法是:

    unsubscribe [频道名称 [频道名称 ...]]
    

    比如,取消订阅channel:one-more-study:demo频道,命令如下:

    > unsubscribe channel:one-more-study:demo
    1) "unsubscribe"
    2) "channel:one-more-study:demo"
    3) (integer) 0
    

    返回结果中有3条,分别表示:返回值的类型(取消订阅成功)、取消订阅的频道名称、目前已订阅的频道数量。

    欢迎关注微信公众号:万猫学社,每周一分享Java技术干货。

    按模式订阅消息

    按模式订阅消息的命令是psubscribe,订阅一个或多个符合给定模式的频道,语法是:

    psubscribe 模式 [模式 ...]
    

    每个模式以 * 作为匹配符,比如 channel* 匹配所有以 channel 开头的频道,命令如下:

    > psubscribe channel:*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "channel*"
    3) (integer) 1
    

    返回结果中有3条,分别表示:返回值的类型(按模式订阅成功)、订阅的模式、目前已订阅的模式数量。当订阅者接受到消息时,就会显示:

    1) "pmessage"
    2) "channel*"
    3) "channel:one-more-study:demo"
    4) "I am One More Study."
    

    返回结果中有4条,分别表示:返回值的类型(信息)、消息匹配的模式、消息来源的频道名称、消息内容。

    按模式取消订阅

    按模式取消订阅的命令是punsubscribe,可以取消一个或者多个模式的订阅,语法是:

    punsubscribe [模式 [模式 ...]]
    

    每个模式以 * 作为匹配符,比如 channel:* 匹配所有以 channel 开头的频道,命令如下:

    1> punsubscribe channel:*
    1) "punsubscribe"
    2) "channel:*"
    3) (integer) 0
    

    返回结果中有3条,分别表示:返回值的类型(按模式取消订阅成功)、取消订阅的模式、目前已订阅的模式数量。

    欢迎关注微信公众号:万猫学社,每周一分享Java技术干货。

    查询订阅信息

    查看活跃频道

    活跃频道指的是至少有一个订阅者的频道,语法是:

    pubsub channels [模式]
    

    比如:

    > pubsub channels
    1) "channel:one-more-study:test"
    2) "channel:one-more-study:demo"
    3) "channel:demo"
    > pubsub channels *demo
    1) "channel:one-more-study:demo"
    2) "channel:demo"
    > pubsub channels *one-more-study*
    1) "channel:one-more-study:test"
    2) "channel:one-more-study:demo"
    
    查看频道订阅数
    pubsub numsub [频道名称 ...]
    

    比如:

    > pubsub numsub channel:one-more-study:demo
    1) "channel:one-more-study:demo"
    2) (integer) 1
    
    查看模式订阅数
    > pubsub numpat
    (integer) 1
    

    代码实战

    光说不练假把式,我们使用Java语言写一个简单的发布订阅示例。

    Jedis集群示例

    Jedis是Redis官方推荐的Java连接开发工具,我们使用Jedis写一个简单的集群示例。

    package onemore.study;
    
    import redis.clients.jedis.HostAndPort;
    import redis.clients.jedis.JedisCluster;
    import redis.clients.jedis.JedisPoolConfig;
    
    import java.util.HashSet;
    import java.util.Set;
    
    /**
     * Jedis集群
     *
     * @author 万猫学社
     */
    public enum Cluster {
        INSTANCE;
    
        //为了简单,把IP和端口直接写在这里,实际开发中写在配置文件会更好。
        private final String hostAndPorts = "192.168.0.60:6379;192.168.0.61:6379;192.168.0.62:6379";
        private JedisCluster jedisCluster;
    
        Cluster() {
            JedisPoolConfig poolConfig = new JedisPoolConfig();
    
            //最大连接数
            poolConfig.setMaxTotal(20);
            //最大空闲数
            poolConfig.setMaxIdle(10);
            //最小空闲数
            poolConfig.setMinIdle(2);
    
            //从jedis连接池获取连接时,校验并返回可用的连接
            poolConfig.setTestOnBorrow(true);
            //把连接放回jedis连接池时,校验并返回可用的连接
            poolConfig.setTestOnReturn(true);
    
            Set<HostAndPort> nodes = new HashSet<>();
            String[] hosts = hostAndPorts.split(";");
            for (String hostport : hosts) {
                String[] ipport = hostport.split(":");
                String ip = ipport[0];
                int port = Integer.parseInt(ipport[1]);
                nodes.add(new HostAndPort(ip, port));
            }
            jedisCluster = new JedisCluster(nodes, 1000, poolConfig);
        }
    
        public JedisCluster getJedisCluster() {
            return jedisCluster;
        }
    }
    

    欢迎关注微信公众号:万猫学社,每周一分享Java技术干货。

    发布者示例

    package onemore.study;
    
    import redis.clients.jedis.JedisCluster;
    
    /**
     * 发布者
     *
     * @author 万猫学社
     */
    public class Publisher implements Runnable {
        private final String CHANNEL_NAME = "channel:one-more-study:demo";
        private final String QUIT_COMMAND = "quit";
    
        @Override
        public void run() {
            JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
            for (int i = 1; i <= 3; i++) {
                String message = "第" + i + "消息";
                System.out.println(Thread.currentThread().getName() + " 发布:" + message);
                jedisCluster.publish(CHANNEL_NAME, message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("------------------");
            }
            jedisCluster.publish(CHANNEL_NAME, QUIT_COMMAND);
        }
    }
    

    订阅者示例

    package onemore.study;
    
    import redis.clients.jedis.JedisCluster;
    import redis.clients.jedis.JedisPubSub;
    
    /**
     * 订阅者
     *
     * @author 万猫学社
     */
    public class Subscriber implements Runnable {
        private final String CHANNEL_NAME = "channel:one-more-study:demo";
        private final String QUIT_COMMAND = "quit";
    
        private final JedisPubSub jedisPubSub = new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                System.out.println(Thread.currentThread().getName() + " 接收:" + message);
                if (QUIT_COMMAND.equals(message)) {
                    unsubscribe(CHANNEL_NAME);
                }
            }
        };
    
        @Override
        public void run() {
            JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
            jedisCluster.subscribe(jedisPubSub, CHANNEL_NAME);
        }
    }
    

    欢迎关注微信公众号:万猫学社,每周一分享Java技术干货。

    综合示例

    package onemore.study;
    
    public class App {
        public static void main(String[] args) throws InterruptedException {
            //创建3个订阅者
            new Thread(new Subscriber()).start();
            new Thread(new Subscriber()).start();
            new Thread(new Subscriber()).start();
            Thread.sleep(1000);
    
            //创建发布者
            new Thread(new Publisher()).start();
        }
    }
    

    运行结果如下:

    Thread-6 发布:第1消息
    Thread-0 接收:第1消息
    Thread-1 接收:第1消息
    Thread-2 接收:第1消息
    ------------------
    Thread-6 发布:第2消息
    Thread-0 接收:第2消息
    Thread-1 接收:第2消息
    Thread-2 接收:第2消息
    ------------------
    Thread-6 发布:第3消息
    Thread-0 接收:第3消息
    Thread-2 接收:第3消息
    Thread-1 接收:第3消息
    ------------------
    Thread-0 接收:quit
    Thread-1 接收:quit
    Thread-2 接收:quit
    

    微信公众号:万猫学社

    微信扫描二维码

    获得更多Java技术干货

  • 相关阅读:
    【Windows】Windows服务管家婆之Service Control Manager
    【Python】python文件名和文件路径操作
    【Python】python 调用c语言函数
    IIS 之 HTTP Error 404.2 – Not Found(ISAPI 和 CGI 限制)
    IIS 之 HTTP错误 404.17
    IIS 之 HTTP 错误 404.3
    IIS 之 HTTP 错误 403.14
    IIS 之 打开/关闭 Internet 信息服务
    Wcf 之 配置文件解析
    Web Service 之 开发、部署
  • 原文地址:https://www.cnblogs.com/heihaozi/p/12779069.html
Copyright © 2011-2022 走看看