zoukankan      html  css  js  c++  java
  • Jedis实现频道的订阅,取消订阅

     第一步:创建一个发布者

    package work;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    
    /**
     * 发布
     * author:songyan
     * date: 2019/10/17
     **/
    public class Publisher  extends Thread  {
        private final JedisPool jedisPool;
        private String chanelName;
    
        public Publisher(JedisPool jedisPool, String chanelName) {
            this.jedisPool = jedisPool;
            this.chanelName = chanelName;
            System.out.println("【发布者""+chanelName+""初始化成功】");
            System.out.println("请输入要发布的消息:");
        }
    
        @Override
        public void run() {
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            Jedis jedis = jedisPool.getResource();
            while (true) {
                String line = null;
                try {
                    line = reader.readLine();
                    if (!"quit".equals(line)) {
                        System.out.println(chanelName+"发布消息成功");
                        jedis.publish(chanelName, line);
                    } else {
                        break;
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
            }
        }
        }

    第二步:创建一个订阅者的监听器

    package work;
    
    import redis.clients.jedis.JedisPubSub;
    
    /**
     * 监听
     * author:songyan
     * date: 2019/10/17
     **/
    public class SubscriberListener  extends JedisPubSub {
        private String subName;
        public SubscriberListener(String subName) {
            this.subName = subName;
        }
    
        // 取得订阅的消息后的处理
        public void onMessage(String channel, String message) {
            System.out.println(String.format("【"+subName + "接收到消息】频道:%s;消息:%s。" , channel , message));
        }
    
        // 初始化订阅时候的处理
        public void onSubscribe(String channel, int subscribedChannels) {
            System.out.println(String.format( "【"+subName + "订阅频道成功】频道:%s;频道数:%d。" , channel  , subscribedChannels));
        }
    
        // 取消订阅时候的处理
        public void onUnsubscribe(String channelName, int subscribedChannels) {
            System.out.println(String.format( "【"+subName + "取消订阅】频道:%s;频道数:%d。",channelName , subscribedChannels));
        }
    }

    第三步:创建一个订阅者

    package work;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    /**
     * 订阅
     * author:songyan
     * date: 2019/10/17
     **/
    public class Subscriber extends Thread {
        //jedis连接池
        private final JedisPool jedisPool;
    
        private final SubscriberListener subscriberListener;
    
        private String channelName;
    
        public Subscriber(JedisPool jedisPool, SubscriberListener subscriberListener, String channelName) {
            super();
            this.jedisPool = jedisPool;
            this.subscriberListener = subscriberListener;
            this.channelName = channelName;
        }
    
        @Override
        public void run() {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.subscribe(subscriberListener,channelName);// 通过jedis.subscribe()方法去订阅,入参是1.订阅者、2.频道名称
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(String.format("频道订阅失败:%s",e));
            } finally {
                if (null != jedis)
                    jedis.close();
            }
        }
    }

    第四步:测试(编写客户端)

    (1)发布者客户端

    package work.test;
    
    import redis.clients.jedis.JedisPool;
    import work.Publisher;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 发布测试
     * author:songyan
     * date: 2019/10/17
     **/
    public class PublishClient {
        public static void main(String[] args) throws InterruptedException {
            //创建连接池
            JedisPool jedisPool = new JedisPool("192.168.159.133");
            //创建线程池,并设定线程数量
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            //创建一个发布者
            Publisher publisher = new Publisher(jedisPool,"发布者1");
            executorService.submit(publisher);
            executorService.shutdown();
            executorService.awaitTermination(600, TimeUnit.SECONDS);
        }
    }

    执行main方法,创建一个发布者。

    (2)订阅者客户端

    package work.test;
    
    import redis.clients.jedis.JedisPool;
    import work.Subscriber;
    import work.SubscriberListener;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 订阅者客户端
     * author:songyan
     * date: 2019/10/17
     **/
    public class SubscriberClient {
    
        public static void main(String[] args) throws InterruptedException {
            //创建redis连接池
            JedisPool jedisPool = new JedisPool("192.168.159.133");
            //创建线程池
            ExecutorService executorService = Executors.newFixedThreadPool(5);
    
            //创建订阅者
            final SubscriberListener subscriberListener = new SubscriberListener("订阅者一号");
            //订阅频道
            Subscriber subscriber = new Subscriber(jedisPool, subscriberListener, "发布者1");
            executorService.submit(subscriber);
            executorService.shutdown();
            executorService.awaitTermination(60, TimeUnit.SECONDS);
    
            //30s后取消订阅
            Thread.sleep(3000);
            subscriberListener.onUnsubscribe("发布者1", 0);
        }
    }

    执行main方法,创建一个订阅者(订阅上面发布者的频道)。

    发布者发布信息:

     订阅者接收到订阅信息:

    订阅者取消订阅:

     

    扩展:ExecutorService

  • 相关阅读:
    JAVA中的流-简介(二)
    JAVA中的流-简介(一)
    Java中内部类简介
    应用小练习-自定义栈
    集合知识点(二)
    集合知识点(一)
    JAVA中的正则表达式简介
    从头文件中学习sfr和sbit
    PCB中实现元器件旋转一个角度放置
    DXP中插入LOGO字体方法(2)
  • 原文地址:https://www.cnblogs.com/excellencesy/p/11696580.html
Copyright © 2011-2022 走看看