zoukankan      html  css  js  c++  java
  • Redis——发布和订阅

    发布与订阅(又称pub/sub),订阅者(listener)负责订阅频道(channel),发送者(publisher)负责向频道发送二进制字符串消息(binary string message).每当有消息被发送给指定频道的时候,频道都所有订阅者都会收到消息。

    Redis提供都5个发布订阅命令:

    命令描述
    Redis Psubscribe 命令 订阅一个或多个符合给定模式的频道。
    Redis Pubsub 命令 查看订阅与发布系统状态。
    Redis Publish 命令 将信息发送到指定的频道。
    Redis Punsubscribe 命令 退订所有给定模式的频道。
    Redis Subscribe 命令 订阅给定的一个或多个频道的信息。
    Redis Unsubscribe 命令 指退订给定的频道。

    使用实例:

      首先需要一个订阅者(listener)这里建立一个名为Subscriber的类:

    public class Subscriber extends JedisPubSub {
    
        public void onMessage(String channel, String message) {
            System.out.println("onMessage channel = " + channel+ "message =" + message);
        }
    
        public void onSubscribe(String channel, int subscribedChannels) {
            System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                    channel, subscribedChannels));
        }
    
        public void onUnsubscribe(String channel, int subscribedChannels) {
            System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                    channel, subscribedChannels));
    
        }
    }

    这个类继承自JedisPubSub,其中onMessage负责接收订阅频道消息后,业务处理逻辑,onSubscribe负责初始化订阅时候的处理,onUnsubscribe取消订阅时候的处理。

    然后在定义一个类起一个线程来进行subscribe操作,因为我们需要订阅者一直在线,当发布者一发送消息到相应的频道时,能做出反应

    public class SubThread extends Thread {
        JedisPool pool;
        private final Subscriber subscriber = new Subscriber();
    
        private final String channel = "xx";
    
        public SubThread( JedisPool pool) {
            super("SubThread");
            this.pool = pool;
        }
    
        @Override
        public void run() {
            System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
            Jedis jedis = pool.getResource();
                try {
                    jedis.subscribe(subscriber, channel);
                } catch (Exception e) {
                    System.out.println(String.format("subsrcibe channel error, %s", e));
                } finally {
                    if (jedis != null) {
                        jedis.close();
                    }
                }
        }
    }

    再然后就是发布者:

    public class Publisher {
    
        JedisPool pool;
    
        public Publisher( JedisPool pool) {
            this.pool = pool;
        }
    
        public void start() {
            Jedis jedis = pool.getResource();
            while(true) {
                jedis.publish("xx", "233");
                try{
                    Thread.sleep(5000);
                }catch (Exception e){
                    e.printStackTrace();
                }
    
            }
        }
    }

    再然后就是主函数的调用:

    public class test1 {
        public static void main(String[] args) throws Exception{
            //连接本地的 Redis 服务
            Jedis jedis = new Jedis("localhost");
            System.out.println("连接成功");
            //查看服务是否运行
            System.out.println("服务正在运行: "+jedis.ping());
    
    
            JedisPool pool = new JedisPool("localhost", 6379);
    
            SubThread subThread = new SubThread(pool);
            subThread.start();
    
            Publisher publisher = new Publisher(pool);
            publisher.start();
    
        }

    因为Jedis不是线程安全的,JedisPool是线程安全的,所以这里使用JedisPool。

    输出:

    连接成功
    服务正在运行: PONG
    subscribe redis, channel xx, thread will be blocked
    subscribe redis channel success, channel xx, subscribedChannels 1
    onMessage channel = xxmessage =233
    onMessage channel = xxmessage =233
  • 相关阅读:
    Apache Hadoop 英文官方参考文档及中文文档
    谷歌大数据那三篇论文-中文版
    Java学习笔记(一):基础概念和语法
    Java基础概念、知识点整理
    TensorFlow基础知识
    Kafka集群环境配置
    Sqoop数据迁移工具的使用
    HBase的安装和使用
    Flume日志采集框架的使用
    zookeeper的安装和使用
  • 原文地址:https://www.cnblogs.com/xxbbtt/p/7864953.html
Copyright © 2011-2022 走看看