zoukankan      html  css  js  c++  java
  • Redis 订阅发布

    Redis 订阅发布 - Jedis实现

    我想到使用Redis的订阅发布模式是用来解决推送问题的~。

    对于概念性的叙述,多多少少还是要提一下的:

    什么是Redis发布订阅?Redis发布订阅是一种消息通信模式,发送者通过通道A发送消息message,订阅过通道A的客户端就可以接收到消息message。嗯度娘上面的解释要比我所说的好多了,而我所理解的就是:所谓的订阅发布模式,其实和我们看电视,听广播差不多,在我们没有调台(换频道)的时候,那个频道也是在传递消息的(发布)。我们换到那个频道上(订阅)就能接收到消息了。是的,虽然可能有些不恰当~

    说明

    ​ 本文中示例采用了三个客户端,以“品”字形排列,由上至下,由左至右分别为客户端1(c1),客户端2(c2),客户端3(c3).特此说明。

    Redis订阅与发布命令###

    首先声明,有关Redis服务器的搭建工作,请自行查阅相关资料进行环境抢建

    听说Redis中发布与订阅只有简单的6个命令。即:

    • PSUBSCRIBE pattern [pattern ...]
      • 订阅一个或者多个符合pattern格式的频道
    • PUBLISH channel message
      • 发布消息到chanel中
    • PUBSUB subcommand [argument [argument ...]]
      • 查看订阅与发布系统状态
    • PUNSUBSCRIBE [pattern [pattern ...]]
      • 退订所有符合格式的频道
    • SUBSCRIBE channel [channel ...]
      • 订阅一个或者多个频道
    • UNSUBSCRIBE [channel [channel ...]]
      • 取消订阅频道

    例1 - SUBSCRIBE####

    连接redis后键入命令

    ​ SUBSCRIBE study

    subscribe

    这样便订阅了一个名为study的频道。

    接下来study频道要发消息啦。~~


    例2 - PUBLISH

    另开启一个客户端,我使用的是品字形布局的最上面那个做为发布者,键入

    ​ PUBLISH study "message1-go go go"

    publish

    可以看到,当客户端1在study频道发布消息时,客户端2(已订阅study频道)可以接收到c1发布的消息,而客户端3由于没有订阅study频道,所以接收不到c1发送的消息。


    例3 - PSUBSCRIBE

    现在,跟着博主左手,右手一个慢动作。在c3中键入

    ​ PSUBSCRIBE study*

    psubscribe

    OK,现在在c1中键入

    ​ PUBLISH study "message2"

    上结果图:

    PSUBSCRIBE1

    c3采用的通配符的形式,也将study频道给订阅成功了。

    接下来,在c1中继续键入命令:

    ​ PUBLISH study:java "I hate java forever"

    PSUBSCRIBE2

    可以看到,使用psubscribe不仅将study频道订阅了,而且将以study为首的频道也订阅了。


    例4 - PUBSUB

    在c1中键入pubsub channel,可以获得:

    ​ 127.0.0.1:6379> PUBSUB channels
    ​ 1) "study"

    意为当前正在活跃的频道。


    Jedis实现订阅发布者模式

    ​ 好了,上面通过命令行熟悉了一下Redis中有关订阅发布者模式的相关命令。下面我们要将redis的订阅与发布者嵌入到项目中。

    ​ 首先,我们使用jedis先订阅一个名为:study的频道

    订阅study的频道

    然后我们先从命令行处进行消息发布:

    命令行处进行消息发布

    之后 ,我们使用jedis在项目中进行消息发布:

    项目中进行消息发布

    我们可以进行正常的通信 ~噢耶~


    核心代码:

    PublishMessage.java 用于开启一个发布消息的线程###

    private Logger logger = LoggerFactory.getLogger(PublishMessage.class);
    
    @Resource
    private JedisCluster jedisCluster;
    
    /**
     * 发布消息
     *
     * @param channel 频道
     * @param message 信息
     */
    public void sendMessage(final String channel, final String message) {
        Thread thread = new Thread(() -> {
            Long publish = jedisCluster.publish(channel, message);
            logger.info("服务器在: {} 频道发布消息{} - {}", channel, message, publish);
        });
        logger.info("发布线程启动:");
        thread.setName("publishThread");
        thread.start();
    }
    

    ChatSubscribe.java用于处理订阅相关事件,继承自JedisPubSub

    private Logger logger = LoggerFactory.getLogger(ChatSubscribe.class);
    
    // 取得订阅的消息后的处理
    @Override
    public void onMessage(String channel, String message) {
        logger.info("订阅成功,接收到的消息为:频道-{},消息-{}", channel, message);
        RedisString.message = message;
    }
    
    // 取得按表达式的方式订阅的消息后的处理
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("-----取得按表达式的方式订阅的消息后的处理-----");
        System.out.println(pattern + "=" + channel + "=" + message);
    }
    
    // 初始化按表达式的方式订阅时候的处理
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        System.out.println("-----初始化按表达式的方式订阅时候的处理-----");
        System.out.println(pattern + "=" + subscribedChannels);
    }
    
    // 取消按表达式的方式订阅时候的处理
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("-----取消按表达式的方式订阅时候的处理-----");
        System.out.println(pattern + "=" + subscribedChannels);
    }
    
    @Override
    public void onPong(String pattern) {
        super.onPong(pattern);
    }
    
    // 初始化订阅时候的处理
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        logger.info("初始化订阅信息:频道-{},订阅频道-{}", channel, subscribedChannels);
    }
    
    // 取消订阅时候的处理
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        logger.info("已取消订阅频道{}", channel);
    }
    

    SubScribeMessage.java 订阅频道,取消频道等动作类

    private Logger logger = LoggerFactory.getLogger(SubScribeMessage.class);
    
    private ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    
    @Resource
    private JedisCluster jedisCluster;
    /**
         * 订阅频道
         *
         * @param channel          频道
         * @param roomSubListerner
         */
        public void subscribeChannel(final String channel, final ChatSubscribe roomSubListerner) {
    
            cachedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    jedisCluster.subscribe(roomSubListerner, channel);
                }
            });
        }
    

    ​ jedisCluster是否封装工具类,取自各位看官,核心代码已给出,请各位看官根据自身业务与逻辑,自行更改与优化代码。

    ​ 本次示例程序采用tomcat 9.0 + spring + springmvc

    ​ 使用了诸如:@RestController,@GetMapping等相关注解,便于开发,有兴趣可自行查阅spring相关资料。

  • 相关阅读:
    the-backdoor-factory安装
    python 实验普通IO和多路复用IO
    vue学习随笔(一)
    CentOS最小安装
    CentOS7安装ELK实践(二)
    CentOS7安装ELK实践(一)
    【转】mysql 多主多从配置,自增id解决方案
    Istio组件解析
    Kubernetes NetworkPolicy 插件支持
    在Kubernetes集群上安装Metrics Server
  • 原文地址:https://www.cnblogs.com/tdg-yyx/p/7048967.html
Copyright © 2011-2022 走看看