zoukankan      html  css  js  c++  java
  • Redis订阅广播实现多级缓存

    Redis应用场景很多,现在介绍一下它的几大特性之一   发布订阅(pub/sub)

    特性介绍:

      什么是redis的发布订阅(pub/sub)?   Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。 

         同样,Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合,  Redis作为一个pub/sub的server, 在订阅者和发布者之间起到了消息路由的功能。

      上面的都是概念,不知道没关系,其实我也看不懂。

    简单来讲,这里面还有个channel的概念,这里就是频道的意思,比如你订阅了银行的频道,当你的资金发生变动时,银行就会通过它的频道给你发送信息,在这里,你是属于被动接收的,而不是向银行索要信息,这个例子中,你就是sub(订阅者),而银行就是pub(发布者)。

     

    代码:

    先引入依赖

    1. <dependency>  
    2.     <groupId>redis.clients</groupId>  
    3.     <artifactId>jedis</artifactId>  
    4.     <version>2.9.0</version>  
    5. </dependency>  

    新建一个发布者

    1. public class Publisher extends Thread{  
    2.     private final JedisPool jedisPool;  
    3.     
    4.     public Publisher(JedisPool jedisPool) {  
    5.         this.jedisPool = jedisPool;  
    6.     }  
    7.     
    8.     @Override  
    9.     public void run() {  
    10.         BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));  
    11.         Jedis jedis = jedisPool.getResource();   //连接池中取出一个连接  
    12.         while (true) {  
    13.             String line = null;  
    14.             try {  
    15.                 line = reader.readLine();  
    16.                 if (!"quit".equals(line)) {  
    17.                     jedis.publish("mychannel", line);   // mychannel 的频道上推送消息  
    18.                 } else {  
    19.                     break;  
    20.                 }  
    21.             } catch (IOException e) {  
    22.                 e.printStackTrace();  
    23.             }  
    24.         }  
    25.     }  
    26. }  

    新建一个订阅者

    1. public class Subscriber extends JedisPubSub {  
    2.     
    3.     public Subscriber(){}  
    4.     @Override  
    5.     public void onMessage(String channel, String message) {       //收到消息会调用  
    6.         System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));  
    7.     }  
    8.     @Override  
    9.     public void onSubscribe(String channel, int subscribedChannels) {    //订阅了频道会调用  
    10.         System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",  
    11.                 channel, subscribedChannels));  
    12.     }  
    13.     @Override  
    14.     public void onUnsubscribe(String channel, int subscribedChannels) {   //取消订阅 会调用  
    15.         System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",  
    16.                 channel, subscribedChannels));  
    17.     
    18.     }  
    19. }

    订阅频道

    1. public class SubThread extends Thread {  
    2.     private final JedisPool jedisPool;  
    3.     private final Subscriber subscriber = new Subscriber();  
    4.     
    5.     private final String channel = "mychannel";  
    6.     
    7.     public SubThread(JedisPool jedisPool) {  
    8.         super("SubThread");  
    9.         this.jedisPool = jedisPool;  
    10.     }  
    11.     
    12.     @Override  
    13.     public void run() {  
    14.         System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));  
    15.         Jedis jedis = null;  
    16.         try {  
    17.             jedis = jedisPool.getResource();   //取出一个连接  
    18.             jedis.subscribe(subscriber, channel);    //通过subscribe api去订阅,入参是订阅者和频道名  
    19.         } catch (Exception e) {  
    20.             System.out.println(String.format("subsrcibe channel error, %s", e));  
    21.         } finally {  
    22.             if (jedis != null) {  
    23.                 jedis.close();  
    24.             }  
    25.         }  
    26.     }  
    27. }  
    28.    

    测试下

    1. public class PubSubDemo {  
    2.     public static void main( String[] args )  
    3.     {  
    4.         // 连接redis服务端  
    5.         JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "172.31.12.18"7379);  
    6.     
    7.         System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d""127.0.0.1"7379));  
    8.     
    9.         SubThread subThread = new SubThread(jedisPool);  //订阅者  
    10.         subThread.start();  
    11.     
    12.         Publisher publisher = new Publisher(jedisPool);    //发布者  
    13.         publisher.start();  
    14.     }  
    15. }  

     

    打印信息

     

     

     

  • 相关阅读:
    第03组 Alpha冲刺(2/4)
    第03组 Alpha冲刺
    第09组 Beta版本演示
    第09组 Beta冲刺(4/4)
    第09组 Beta冲刺(3/4)
    第09组 Beta冲刺(2/4)
    第09组 Beta冲刺(1/4)
    第09组 Alpha事后诸葛亮
    第09组 Alpha冲刺(4/4)
    第09组 Alpha冲刺(3/4)
  • 原文地址:https://www.cnblogs.com/ytxiao/p/9915765.html
Copyright © 2011-2022 走看看