第一步:创建一个发布者
package work; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; /** * 发布 * author:songyan * date: 2019/10/17 **/ public class Publisher extends Thread { private final JedisPool jedisPool; private String chanelName; public Publisher(JedisPool jedisPool, String chanelName) { this.jedisPool = jedisPool; this.chanelName = chanelName; System.out.println("【发布者""+chanelName+""初始化成功】"); System.out.println("请输入要发布的消息:"); } @Override public void run() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); Jedis jedis = jedisPool.getResource(); while (true) { String line = null; try { line = reader.readLine(); if (!"quit".equals(line)) { System.out.println(chanelName+"发布消息成功"); jedis.publish(chanelName, line); } else { break; } } catch (IOException e) { e.printStackTrace(); } } } }
第二步:创建一个订阅者的监听器
package work; import redis.clients.jedis.JedisPubSub; /** * 监听 * author:songyan * date: 2019/10/17 **/ public class SubscriberListener extends JedisPubSub { private String subName; public SubscriberListener(String subName) { this.subName = subName; } // 取得订阅的消息后的处理 public void onMessage(String channel, String message) { System.out.println(String.format("【"+subName + "接收到消息】频道:%s;消息:%s。" , channel , message)); } // 初始化订阅时候的处理 public void onSubscribe(String channel, int subscribedChannels) { System.out.println(String.format( "【"+subName + "订阅频道成功】频道:%s;频道数:%d。" , channel , subscribedChannels)); } // 取消订阅时候的处理 public void onUnsubscribe(String channelName, int subscribedChannels) { System.out.println(String.format( "【"+subName + "取消订阅】频道:%s;频道数:%d。",channelName , subscribedChannels)); } }
第三步:创建一个订阅者
package work; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; /** * 订阅 * author:songyan * date: 2019/10/17 **/ public class Subscriber extends Thread { //jedis连接池 private final JedisPool jedisPool; private final SubscriberListener subscriberListener; private String channelName; public Subscriber(JedisPool jedisPool, SubscriberListener subscriberListener, String channelName) { super(); this.jedisPool = jedisPool; this.subscriberListener = subscriberListener; this.channelName = channelName; } @Override public void run() { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.subscribe(subscriberListener,channelName);// 通过jedis.subscribe()方法去订阅,入参是1.订阅者、2.频道名称 } catch (Exception e) { e.printStackTrace(); System.out.println(String.format("频道订阅失败:%s",e)); } finally { if (null != jedis) jedis.close(); } } }
第四步:测试(编写客户端)
(1)发布者客户端
package work.test; import redis.clients.jedis.JedisPool; import work.Publisher; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 发布测试 * author:songyan * date: 2019/10/17 **/ public class PublishClient { public static void main(String[] args) throws InterruptedException { //创建连接池 JedisPool jedisPool = new JedisPool("192.168.159.133"); //创建线程池,并设定线程数量 ExecutorService executorService = Executors.newFixedThreadPool(5); //创建一个发布者 Publisher publisher = new Publisher(jedisPool,"发布者1"); executorService.submit(publisher); executorService.shutdown(); executorService.awaitTermination(600, TimeUnit.SECONDS); } }
执行main方法,创建一个发布者。
(2)订阅者客户端
package work.test; import redis.clients.jedis.JedisPool; import work.Subscriber; import work.SubscriberListener; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 订阅者客户端 * author:songyan * date: 2019/10/17 **/ public class SubscriberClient { public static void main(String[] args) throws InterruptedException { //创建redis连接池 JedisPool jedisPool = new JedisPool("192.168.159.133"); //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); //创建订阅者 final SubscriberListener subscriberListener = new SubscriberListener("订阅者一号"); //订阅频道 Subscriber subscriber = new Subscriber(jedisPool, subscriberListener, "发布者1"); executorService.submit(subscriber); executorService.shutdown(); executorService.awaitTermination(60, TimeUnit.SECONDS); //30s后取消订阅 Thread.sleep(3000); subscriberListener.onUnsubscribe("发布者1", 0); } }
执行main方法,创建一个订阅者(订阅上面发布者的频道)。
发布者发布信息:
订阅者接收到订阅信息:
订阅者取消订阅: