zoukankan      html  css  js  c++  java
  • redis 实现发布/订阅模式

     

    类似于MQ的主题模式-只能消费订阅之后发布的消息,一个消息可以被多个订阅者消费)

    1.客户端发布/订阅

    1.1   普通的发布/订阅

       除了实现任务队列外,redis还提供了一组命令可以让开发者实现"发布/订阅"(publish/subscribe)模式。"发布/订阅"模式同样可以实现进程间的消息传递,其原理如下:

      "发布/订阅"模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息。

    (1)发布消息

      发布者发布消息的命令是  publish,用法是 publish channel message,如向 channel1.1说一声hi

    127.0.0.1:6379> publish channel:1 hi
    (integer) 0

    这样消息就发出去了。返回值表示接收这条消息的订阅者数量。发出去的消息不会被持久化,也就是有客户端订阅channel:1后只能接收到后续发布到该频道的消息,之前的就接收不到了。

    (2)订阅频道

      订阅频道的命令是 subscribe,可以同时订阅多个频道,用法是 subscribe channel1 [channel2 ...],例如新开一个客户端订阅上面频道:(不会收到消息,因为不会收到订阅之前就发布到该频道的消息)

    127.0.0.1:6379> subscribe channel:1
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "channel:1"
    3) (integer) 1

    执行上面命令客户端会进入订阅状态,处于此状态下客户端不能使用除subscribe、unsubscribe、psubscribe和punsubscribe这四个属于"发布/订阅"之外的命令,否则会报错。

      进入订阅状态后客户端可能收到3种类型的回复。每种类型的回复都包含3个值,第一个值是消息的类型,根据消类型的不同,第二个和第三个参数的含义可能不同。

    消息类型的取值可能是以下3个:

      (1)subscribe。表示订阅成功的反馈信息。第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量。

      (2)message。表示接收到的消息,第二个值表示产生消息的频道名称,第三个值是消息的内容。

      (3)unsubscribe。表示成功取消订阅某个频道。第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量,当此值为0时客户端会退出订阅状态,之后就可以执行其他非"发布/订阅"模式的命令了。

    (3)第一个客户端重新向channel:1发送一条消息

    127.0.0.1:6379> publish channel:1 hi
    (integer) 1

    返回值表示订阅此频道的数量

    c

    上面订阅的客户端:

    127.0.0.1:6379> subscribe channel:1
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "channel:1"
    3) (integer) 1
    1) "message"
    2) "channel:1"
    3) "hi"

    红字部分表示成功的收到消息(依次是消息类型,频道,消息内容)

    1.2   按照规则发布/订阅

      除了可以使用subscribe命令订阅指定的频道外,还可以使用psubscribe命令订阅指定的规则。规则支持通配符格式。命令格式为      psubscribe pattern [pattern ...]订阅多个模式的频道。

      通配符中?表示1个占位符,*表示任意个占位符(包括0),?*表示1个以上占位符。

    例如:

    (1)订阅者订阅三个通配符频道

    127.0.0.1:6379> psubscribe c? b* d?*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "c?"
    3) (integer) 1
    1) "psubscribe"
    2) "b*"
    3) (integer) 2
    1) "psubscribe"
    2) "d?*"
    3) (integer) 3

    (2)新开一个客户端发送到指定频道

    C:Usersliqiang>redis-cli
    127.0.0.1:6379> publish c m1
    (integer) 0
    127.0.0.1:6379> publish c1 m1
    (integer) 1
    127.0.0.1:6379> publish c11 m1
    (integer) 0
    127.0.0.1:6379> publish b m1
    (integer) 1
    127.0.0.1:6379> publish b1 m1
    (integer) 1
    127.0.0.1:6379> publish b11 m1
    (integer) 1
    127.0.0.1:6379> publish d m1
    (integer) 0
    127.0.0.1:6379> publish d1 m1
    (integer) 1
    127.0.0.1:6379> publish d11 m1
    (integer) 1

    上面返回值为1表示被订阅者所接受,可以匹配上面的通配符。

    订阅者客户端:

    127.0.0.1:6379> psubscribe c? b* d?*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "c?"
    3) (integer) 1
    1) "psubscribe"
    2) "b*"
    3) (integer) 2
    1) "psubscribe"
    2) "d?*"
    3) (integer) 3
    1) "pmessage"
    2) "c?"
    3) "c1"
    4) "m1"
    1) "pmessage"
    2) "b*"
    3) "b"
    4) "m1"
    1) "pmessage"
    2) "b*"
    3) "b1"
    4) "m1"
    1) "pmessage"
    2) "b*"
    3) "b11"
    4) "m1"
    1) "pmessage"
    2) "d?*"
    3) "d1"
    4) "m1"
    1) "pmessage"
    2) "d?*"
    3) "d11"
    4) "m1"

    注意:

    (1)使用psubscribe命令可以重复订阅同一个频道,如客户端执行了psubscribe c? c?*。这时向c1发布消息客户端会接受到两条消息,而同时publish命令的返回值是2而不是。.同样的,如果有另一个客户端执行了subscribe c1 和psubscribe c?*的话,向c1发送一条消息该客户顿也会受到两条消息(但是是两种类型:message和pmessage),同时publish命令也返回2.

    (2)punsubscribe命令可以退订指定的规则,用法是: punsubscribe [pattern [pattern ...]],如果没有参数则会退订所有规则。

    (3)使用punsubscribe只能退订通过psubscribe命令订阅的规则,不会影响直接通过subscribe命令订阅的频道;同样unsubscribe命令也不会影响通过psubscribe命令订阅的规则。另外需要注意punsubscribe命令退订某个规则时不会将其中的通配符展开,而是进行严格的字符串匹配,所以punsubscribe * 无法退订c*规则,而是必须使用punsubscribe c*才可以退订。

    2.Java程序实现发布者订阅者模式

    1.生产者

    import redis.clients.jedis.Jedis;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 21:29 2018/10/9
     */
    public class MessageProducer extends Thread {
        public static final String CHANNEL_KEY = "channel:1";
        private volatile int count;
    
        public void putMessage(String message) {
            Jedis jedis = JedisPoolUtils.getJedis();
            Long publish = jedis.publish(CHANNEL_KEY, message);//返回订阅者数量
            System.out.println(Thread.currentThread().getName() + " put message,count=" + count+",subscriberNum="+publish);
            count++;
        }
    
        @Override
        public synchronized void run() {
            for (int i = 0; i < 5; i++) {
                putMessage("message" + count);
            }
        }
    
        public static void main(String[] args) {
            MessageProducer messageProducer = new MessageProducer();
            Thread t1 = new Thread(messageProducer, "thread1");
            Thread t2 = new Thread(messageProducer, "thread2");
            Thread t3 = new Thread(messageProducer, "thread3");
            Thread t4 = new Thread(messageProducer, "thread4");
            Thread t5 = new Thread(messageProducer, "thread5");
            t1.start();
            t2.start();
            t3.start();
            t4.start();
            t5.start();
        }
    }

    结果:

    thread1 put message,count=0,subscriberNum=0
    thread1 put message,count=1,subscriberNum=0
    thread1 put message,count=2,subscriberNum=0
    thread1 put message,count=3,subscriberNum=0
    thread1 put message,count=4,subscriberNum=0
    thread4 put message,count=5,subscriberNum=0
    thread4 put message,count=6,subscriberNum=0
    thread4 put message,count=7,subscriberNum=0
    thread4 put message,count=8,subscriberNum=0
    thread4 put message,count=9,subscriberNum=0
    thread5 put message,count=10,subscriberNum=0
    thread5 put message,count=11,subscriberNum=0
    thread5 put message,count=12,subscriberNum=0
    thread5 put message,count=13,subscriberNum=0
    thread5 put message,count=14,subscriberNum=0
    thread2 put message,count=15,subscriberNum=0
    thread2 put message,count=16,subscriberNum=0
    thread2 put message,count=17,subscriberNum=0
    thread2 put message,count=18,subscriberNum=0
    thread2 put message,count=19,subscriberNum=0
    thread3 put message,count=20,subscriberNum=0
    thread3 put message,count=21,subscriberNum=0
    thread3 put message,count=22,subscriberNum=0
    thread3 put message,count=23,subscriberNum=0
    thread3 put message,count=24,subscriberNum=0
    View Code

    2.消费者

    (1)subscribe实现订阅消费消息(开启两个线程订阅消息)

    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPubSub;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 22:34 2018/10/9
     */
    public class MessageConsumer implements Runnable {
        public static final String CHANNEL_KEY = "channel:1";//频道
    
        public static final String EXIT_COMMAND = "exit";//结束程序的消息
    
        private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//处理接收消息
    
        public void consumerMessage() {
            Jedis jedis = JedisPoolUtils.getJedis();
            jedis.subscribe(myJedisPubSub, CHANNEL_KEY);//第一个参数是处理接收消息,第二个参数是订阅的消息频道
        }
    
        @Override
        public void run() {
            while (true) {
                consumerMessage();
            }
        }
    
        public static void main(String[] args) {
            MessageConsumer messageConsumer = new MessageConsumer();
            Thread t1 = new Thread(messageConsumer, "thread5");
            Thread t2 = new Thread(messageConsumer, "thread6");
            t1.start();
            t2.start();
        }
    }
    
    /**
     * 继承JedisPubSub,重写接收消息的方法
     */
    class MyJedisPubSub extends JedisPubSub {
        @Override
        /** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
         * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
         * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
         * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]
         **/
        public void onMessage(String channel, String message) {
            System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message);
            //接收到exit消息后退出
            if (MessageConsumer.EXIT_COMMAND.equals(message)) {
                System.exit(0);
            }
        }
    }

    我们再次启动生产者生产消息,生产者控制台:

    thread5 put message,count=0,subscriberNum=2
    thread5 put message,count=1,subscriberNum=2
    thread5 put message,count=2,subscriberNum=2
    thread5 put message,count=3,subscriberNum=2
    thread5 put message,count=4,subscriberNum=2
    thread3 put message,count=5,subscriberNum=2
    thread3 put message,count=6,subscriberNum=2
    thread3 put message,count=7,subscriberNum=2
    thread3 put message,count=8,subscriberNum=2
    thread3 put message,count=9,subscriberNum=2
    thread2 put message,count=10,subscriberNum=2
    thread2 put message,count=11,subscriberNum=2
    thread2 put message,count=12,subscriberNum=2
    thread2 put message,count=13,subscriberNum=2
    thread2 put message,count=14,subscriberNum=2
    thread4 put message,count=15,subscriberNum=2
    thread4 put message,count=16,subscriberNum=2
    thread4 put message,count=17,subscriberNum=2
    thread4 put message,count=18,subscriberNum=2
    thread4 put message,count=19,subscriberNum=2
    thread1 put message,count=20,subscriberNum=2
    thread1 put message,count=21,subscriberNum=2
    thread1 put message,count=22,subscriberNum=2
    thread1 put message,count=23,subscriberNum=2
    thread1 put message,count=24,subscriberNum=2
    
    Process finished with exit code 0

    消费者控制台:

    thread6-接收到消息:channel=channel:1,message=message0
    thread5-接收到消息:channel=channel:1,message=message0
    thread5-接收到消息:channel=channel:1,message=message1
    thread6-接收到消息:channel=channel:1,message=message1
    thread5-接收到消息:channel=channel:1,message=message2
    thread6-接收到消息:channel=channel:1,message=message2
    thread5-接收到消息:channel=channel:1,message=message3
    thread6-接收到消息:channel=channel:1,message=message3
    thread5-接收到消息:channel=channel:1,message=message4
    thread6-接收到消息:channel=channel:1,message=message4
    thread5-接收到消息:channel=channel:1,message=message5
    thread6-接收到消息:channel=channel:1,message=message5
    thread5-接收到消息:channel=channel:1,message=message6
    thread6-接收到消息:channel=channel:1,message=message6
    thread5-接收到消息:channel=channel:1,message=message7
    thread6-接收到消息:channel=channel:1,message=message7
    thread5-接收到消息:channel=channel:1,message=message8
    thread6-接收到消息:channel=channel:1,message=message8
    thread5-接收到消息:channel=channel:1,message=message9
    thread6-接收到消息:channel=channel:1,message=message9
    thread5-接收到消息:channel=channel:1,message=message10
    thread6-接收到消息:channel=channel:1,message=message10
    thread5-接收到消息:channel=channel:1,message=message11
    thread6-接收到消息:channel=channel:1,message=message11
    thread5-接收到消息:channel=channel:1,message=message12
    thread6-接收到消息:channel=channel:1,message=message12
    thread5-接收到消息:channel=channel:1,message=message13
    thread6-接收到消息:channel=channel:1,message=message13
    thread5-接收到消息:channel=channel:1,message=message14
    thread6-接收到消息:channel=channel:1,message=message14
    thread5-接收到消息:channel=channel:1,message=message15
    thread6-接收到消息:channel=channel:1,message=message15
    thread5-接收到消息:channel=channel:1,message=message16
    thread6-接收到消息:channel=channel:1,message=message16
    thread5-接收到消息:channel=channel:1,message=message17
    thread6-接收到消息:channel=channel:1,message=message17
    thread5-接收到消息:channel=channel:1,message=message18
    thread6-接收到消息:channel=channel:1,message=message18
    thread5-接收到消息:channel=channel:1,message=message19
    thread6-接收到消息:channel=channel:1,message=message19
    thread5-接收到消息:channel=channel:1,message=message20
    thread6-接收到消息:channel=channel:1,message=message20
    thread5-接收到消息:channel=channel:1,message=message21
    thread6-接收到消息:channel=channel:1,message=message21
    thread5-接收到消息:channel=channel:1,message=message22
    thread6-接收到消息:channel=channel:1,message=message22
    thread5-接收到消息:channel=channel:1,message=message23
    thread6-接收到消息:channel=channel:1,message=message23
    thread5-接收到消息:channel=channel:1,message=message24
    thread6-接收到消息:channel=channel:1,message=message24

    (2)psubscribe实现订阅消费消息(开启两个线程订阅消息)

    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPubSub;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 22:34 2018/10/9
     */
    public class MessageConsumer implements Runnable {
        public static final String CHANNEL_KEY = "channel*";//频道
    
        public static final String EXIT_COMMAND = "exit";//结束程序的消息
    
        private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//处理接收消息
    
        public void consumerMessage() {
            Jedis jedis = JedisPoolUtils.getJedis();
            jedis.psubscribe(myJedisPubSub, CHANNEL_KEY);//第一个参数是处理接收消息,第二个参数是订阅的消息频道
        }
    
        @Override
        public void run() {
            while (true) {
                consumerMessage();
            }
        }
    
        public static void main(String[] args) {
            MessageConsumer messageConsumer = new MessageConsumer();
            Thread t1 = new Thread(messageConsumer, "thread5");
            Thread t2 = new Thread(messageConsumer, "thread6");
            t1.start();
            t2.start();
        }
    }
    
    /**
     * 继承JedisPubSub,重写接收消息的方法
     */
    class MyJedisPubSub extends JedisPubSub {
        @Override
        public void onPMessage(String pattern, String channel, String message) {
            System.out.println(Thread.currentThread().getName()+"-接收到消息:pattern="+pattern+",channel=" + channel + ",message=" + message);
            //接收到exit消息后退出
            if (MessageConsumer.EXIT_COMMAND.equals(message)) {
                System.exit(0);
            }
        }
    }

    重写JedisPubSub 的onPMessage方法即可

    启动生产者生产消息之后查看消费者控制台:

    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message0
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message0
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message1
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message1
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message2
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message2
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message3
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message3
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message4
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message4
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message5
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message5
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message6
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message6
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message7
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message7
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message8
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message8
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message9
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message9
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message10
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message10
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message11
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message11
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message12
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message12
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message13
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message13
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message14
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message14
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message15
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message15
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message16
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message16
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message17
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message17
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message18
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message18
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message19
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message19
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message20
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message20
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message21
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message21
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message22
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message22
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message23
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message23
    thread5-接收到消息:pattern=channel*,channel=channel:1,message=message24
    thread6-接收到消息:pattern=channel*,channel=channel:1,message=message24
    View Code

    补充:订阅的时候subscribe()和psubscribe()的第二个参数支持可变参数,也就是可以实现订阅多个频道。

      至此实现了两种方式的消息队列:

        redis自带的list类型(lpush和rpop或者brpop,rpush和lpop或者blpop)---blpop和brpop是阻塞读取。

        "发布/订阅"模式(publish channel message 和 subscribe channel [channel ...] 或者 psubscribe pattern [pattern ...] 通配符订阅多个频道)

    补充:

    1.发布订阅执行订阅之后该线程处于阻塞状态,线程不会终止,如果终止线程需要退订,需要调用JedisPubSub的unsubscribe()方法

    例如:

    package plainTest;
    
    import cn.xm.redisChat.util.JedisPoolUtils;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPubSub;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 23:36 2018/10/13
     */
    public class Test111 {
        public static void main(String[] args) {
            Jedis jedis = JedisPoolUtils.getJedis();
            System.out.println("订阅前");
            jedis.subscribe(new JedisPubSub() {
                @Override
                public void onMessage(String channel, String message) {
                    super.onMessage(channel, message);
                }
            }, "c1");
            System.out.println("订阅后");
        }
    }

    结果只会打印订阅前,而且线程不会终止。

    为了使线程可以停止,必须退订,而且退订只能调用  JedisPubSub.unsubscribe()方法,例如:收到quit消息之后会退订,线程会回到主线程打印订阅后。

    package plainTest;
    
    import cn.xm.redisChat.util.JedisPoolUtils;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPubSub;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 23:36 2018/10/13
     */
    public class Test111 {
        public static void main(String[] args) {
            Jedis jedis = JedisPoolUtils.getJedis();
            System.out.println("订阅前");
            jedis.subscribe(new JedisPubSub() {
                @Override
                public void onMessage(String channel, String message) {
                    if("quit".equals(message)){
                        unsubscribe("c1");
                    }
                    System.out.println(message);
                }
    
                @Override
                public void unsubscribe(String... channels) {
                    super.unsubscribe(channels);
                }
            }, "c1");
            System.out.println("订阅后");
        }
    }

    2.BRPOP:当给定列表内没有任何元素可供弹出的时候,连接将被BRPOP命令阻塞,直到等待超时或发现可弹出元素为止。(每次只弹出一个元素,当没有元素的时候处于阻塞,当弹出一个元素之后就会解除阻塞)

    package plainTest;
    
    import cn.xm.redisChat.util.JedisPoolUtils;
    import redis.clients.jedis.Jedis;
    
    import java.util.List;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 23:36 2018/10/13
     */
    public class Test111 {
        public static void main(String[] args) {
            Jedis jedis = JedisPoolUtils.getJedis();
            System.out.println("brpop之前");
            List<String> messages = jedis.brpop(0,"list1");
            System.out.println(messages);
            System.out.println("brpop之后");
        }
    }

    没有元素的时候只会打印brpop之前。

    原文:https://www.cnblogs.com/qlqwjy/p/9763754.html

  • 相关阅读:
    ThreadLocal分析学习
    探究.NET的bin引用程序集运行机制看.NET程序集部署原理
    ASP.NET网页代码模型分析
    JBPM与设计模式之职责链模式
    根据webform页面大小的变化动态调整控件的大小
    jbpm binding类深入解析
    JBPM与软件架构模式之命令模式
    JBPM对象主键生成机制
    主键思维定势导致的惨案
    电脑安装windows server 2008 导致磁盘分区消失解决方法
  • 原文地址:https://www.cnblogs.com/yrjns/p/12501085.html
Copyright © 2011-2022 走看看