zoukankan      html  css  js  c++  java
  • springdataredis之RedisMessageListenerContainer源码解析

    简介

    首先,我们翻译一下这个类的注释:

    RedisMessageListenerContainer 为Redis消息侦听器 MessageListener 提供异步行为的容器。处理侦听、转换和消息分派的低级细节。
    与低级别Redis(每个订阅一个连接)相反,容器只使用一个连接,该连接对所有注册的侦听器都是“多路复用”的,消息调度是通过任务执行器完成的。
    注意:容器以惰性方式使用连接(仅当至少配置了一个侦听器时才使用连接)。
    同时添加和删除侦听器会产生未定义的结果。强烈建议对这些方法进行相应的同步/排序。

    public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle
    

    关于 InitializingBean, DisposableBean, SmartLifecycle ,建议阅读以下文章加以了解:

    start()

    // 调用start()方法后,当前对象状态为运行中;调用stop()方法后,当前对象状态为不在运行。
    // whether the container is running (or not)
    private volatile boolean running = false;
    
    public void start() {
      if (!running) {
        running = true;
        // wait 方法调用时,外层必须有 synchronized 关键字
        // wait for the subscription to start before returning
        // technically speaking we can only be notified right before the subscription starts
        synchronized (monitor) {
          // 惰性侦听
          lazyListen();
          // lazyListen 中会异步启动内部类SubscriptionTask的实例
          // 这里加上 wait 同步,主要就是想等到SubscriptionTask实例也运行起来了,再打印出 "Started RedisMessageListenerContainer" 的日志
          if (listening) {
            try {
              // wait up to 5 seconds for Subscription thread
              monitor.wait(initWait);
            } catch (InterruptedException e) {
              // 如果调用 start() 方法的线程被设置打断标记导致wait方法被唤醒
              // 那启动过程也就中止了,container 变为不在运行中的状态
              // stop waiting
              Thread.currentThread().interrupt();
              running = false;
              return;
            }
          }
        }
    
        if (logger.isDebugEnabled()) {
          logger.debug("Started RedisMessageListenerContainer");
        }
      }
    }
    

    lazyListen

    /**
     * 方法检查是否确实需要侦听消息(从而使用线程)并触发它。
     * Method inspecting whether listening for messages (and thus using a thread) is actually needed and triggering
     */
    private void lazyListen() {
      boolean debug = logger.isDebugEnabled();
      boolean started = false;
      if (isRunning()) {
        // “double check”:synchronized 之前之后都进行一次状态判定,并发知识,不多说了
        if (!listening) {
          synchronized (monitor) {
            if (!listening) {
              // 这里就对应上了类的注释:仅当至少配置了一个侦听器时才使用连接
              if (channelMapping.size() > 0 || patternMapping.size() > 0) {
                // 使用订阅线程池启动订阅任务
                subscriptionExecutor.execute(subscriptionTask);
                listening = true;
                started = true;
              }
            }
          }
          if (debug) {
            if (started) {
              logger.debug("Started listening for Redis messages");
            } else {
              logger.debug("Postpone listening for Redis messages until actual listeners are added");
    	}
          }
        }
      }
    }
    

    RedisMessageListenerContainer 容器的惰性侦听,有几点:

    1. 至少配置了一个channel或者pattern才侦听;
    2. 一个容器至多启动一个subscriptionTask;
    3. 有三种情况会调用 lazyListen 检查是否需要侦听并启动subscriptionTask:
      • 容器启动时
      • 调用 addMessageListener 动态添加订阅channel或者pattern时
      • SubscriptionTask.run() 异常退出时

    channel 指的是字符串类型的订阅“主题”,pattern 则是用通配符表示订阅一类“主题”

    内部类SubscriptionTask

    SubscriptionTask 实现了 Runnable 接口,在 subscriptionExecutor.execute(subscriptionTask); 调用之后,就会异步执行 run 方法。

    run()

    // Redis连接
    private volatile @Nullable RedisConnection connection;
    private final Object localMonitor = new Object();
    private boolean subscriptionTaskRunning = false;
    public void run() {
      // 一个SubscriptionTask实例同时只允许一个线程执行接下来的代码
      synchronized (localMonitor) {
        subscriptionTaskRunning = true;
      }
      try {
        // 获取Redis连接
        connection = connectionFactory.getConnection();
        if (connection.isSubscribed()) {
          throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
        }
        // 如果ConnectionFactory是LettuceConnectionFactory及其子类,就返回true
        boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory);
        // NB: sync drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
        if (!asyncConnection) {
          synchronized (monitor) {
            monitor.notify();
          }
        }
        // 订阅代码
        SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription();
        if (asyncConnection) {
          // 设置检验条件和超时时长的“自旋屏障”:
          // 要么条件满足,结束自旋;
          // 要么等待超时,结束自旋;
          SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime());
          synchronized (monitor) {
            monitor.notify();
          }
        }
      } catch (Throwable t) {
        // 订阅失败的处理
        handleSubscriptionException(t);
      } finally {
        // 此块在订阅线程结束后执行
        // this block is executed once the subscription thread has ended, this may or may not mean
        // the connection has been unsubscribed, depending on driver
        synchronized (localMonitor) {
          subscriptionTaskRunning = false;
          localMonitor.notify();
        }
      }
    }
    

    eventuallyPerformSubscription

    /**
     * Performs a potentially asynchronous registration of a subscription.
     *
     * @return #SubscriptionPresentCondition that can serve as a handle to check whether the subscription is ready.
     */
    private SubscriptionPresentCondition eventuallyPerformSubscription() {
      SubscriptionPresentCondition condition = null;
      if (channelMapping.isEmpty()) {
        condition = new PatternSubscriptionPresentCondition();
        // 对应Redis命令中的 PSUBSCRIBE 订阅一个或多个符合给定模式的频道。
        connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet()));
      } else {
        if (patternMapping.isEmpty()) {
          condition = new SubscriptionPresentCondition();
        } else {
          // channelMapping 和 patternMapping 都有数据时进入该分支
          // schedule the rest of the subscription
          // 模式订阅交给 PatternSubscriptionTask 去做
          subscriptionExecutor.execute(new PatternSubscriptionTask());
          condition = new PatternSubscriptionPresentCondition();
        }
        // 对应Redis命令中的 SUBSCRIBE 订阅给定的一个或多个频道的信息。
        connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet()));
      }
      return condition;
    }
    

    这里又多了两个需要了解的细节:

    • *PatternSubscriptionTask 主要负责 模式订阅
    • DispatchMessageListener 主要负责 消息分发

    handleSubscriptionException

    RedisMessageListenerContainer 的订阅任务具有重试机制。如果Redis发生宕机重启,定时重试,可以让WEB应用服务器在Redis恢复时,重新订阅channel或者pattern。

    /**
     * 处理订阅任务异常。如果异常是连接失败(例如,Redis重新启动),将尝试重新启动订阅。
     * Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection
     * failure (for example, Redis was restarted).
     *
     * @param ex Throwable exception
     */
    protected void handleSubscriptionException(Throwable ex) {
      // 修改当前容器的侦听状态
      listening = false;
      // 释放Redis连接
      subscriptionTask.closeConnection();
      // 如果是Redis连接断开的情况
      if (ex instanceof RedisConnectionFailureException) {
        if (isRunning()) {
          // 默认是睡眠 5 秒后重试
          logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
          sleepBeforeRecoveryAttempt();
          // 重新执行“惰性侦听”
          lazyListen();
        }
      } else {
        logger.error("SubscriptionTask aborted with exception:", ex);
      }
    }
    

    PatternSubscriptionTask

    PatternSubscriptionTask 又是 SubscriptionTask的一个子类,也实现了 Runnable 接口,我们来看看它的 run 方法

    run

    public void run() {
      // done 表示等待 connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet())); 订阅完成
      // wait for subscription to be initialized
      boolean done = false;
      // 自旋三次,等待 Subscription 初始化完成,这里准备复用该对象 
      // wait 3 rounds for subscription to be initialized
      for (int i = 0; i < ROUNDS && !done; i++) {
        if (connection != null) {
          synchronized (localMonitor) {
            if (connection.isSubscribed()) {
              done = true;
    	  connection.getSubscription().pSubscribe(unwrap(patternMapping.keySet()));
            } else {
              // 每次自旋等待 500 毫秒
              try {
                Thread.sleep(WAIT);
              } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                return;
              }
            }
          }
        }
      }
    }
    

    DispatchMessageListener

    private class DispatchMessageListener implements MessageListener {
      @Override
      public void onMessage(Message message, @Nullable byte[] pattern) {
        Collection<MessageListener> listeners = null;
        // if it's a pattern, disregard channel
        if (pattern != null && pattern.length > 0) {
          listeners = patternMapping.get(new ByteArrayWrapper(pattern));
        } else {
          pattern = null;
          // do channel matching first
          listeners = channelMapping.get(new ByteArrayWrapper(message.getChannel()));
        }
        if (!CollectionUtils.isEmpty(listeners)) {
          // 真正分发消息给容器管理的 MessageListener
          dispatchMessage(listeners, message, pattern);
        }
      }
    }
    

    dispatchMessage

    private void dispatchMessage(Collection<MessageListener> listeners, Message message, @Nullable byte[] pattern) {
      byte[] source = (pattern != null ? pattern.clone() : message.getChannel());
      for (MessageListener messageListener : listeners) {
        taskExecutor.execute(() -> processMessage(messageListener, message, source));
      }
    }
    

    如果咱CRUD程序员没有自定义 subscriptionExecutor,那么 taskExecutor 其实和 subscriptionExecutor 是同一个引用:

    // RedisMessageListenerContainer 的成员方法
    public void afterPropertiesSet() {
      if (taskExecutor == null) {
        manageExecutor = true;
        taskExecutor = createDefaultTaskExecutor();
      }
      if (subscriptionExecutor == null) {
        // 如果执行到这里,那么两者就是同一个引用
        subscriptionExecutor = taskExecutor;
      }
      initialized = true;
    }
    

    processMessage

    处理消息时,有一层 catch,如果捕获到容器管理的 MessageListener 的异常,是可以通过容器的异常处理器来处理的。

    // RedisMessageListenerContainer 的成员方法
    protected void processMessage(MessageListener listener, Message message, byte[] pattern) {
      executeListener(listener, message, pattern);
    }
    
    protected void executeListener(MessageListener listener, Message message, byte[] pattern) {
      try {
        listener.onMessage(message, pattern);
      } catch (Throwable ex) {
        handleListenerException(ex);
      }
    }
    

    handleListenerException

    可以通过给容器设置 ErrorHandler 实现对象,来统一处理捕获到他管理的的 MessageListener 的异常。

    protected void handleListenerException(Throwable ex) {
      if (isActive()) {
        // Regular case: failed while active.
        // Invoke ErrorHandler if available.
        invokeErrorHandler(ex);
      } else {
        // Rare case: listener thread failed after container shutdown.
        // Log at debug level, to avoid spamming the shutdown logger.
        logger.debug("Listener exception after container shutdown", ex);
      }
    }
    
    protected void invokeErrorHandler(Throwable ex) {
      if (this.errorHandler != null) {
        this.errorHandler.handleError(ex);
      } else if (logger.isWarnEnabled()) {
        logger.warn("Execution of message listener failed, and no ErrorHandler has been set.", ex);
      }
    }
    
  • 相关阅读:
    笔记类产品会不会衰落?
    ios 性能优化概述
    一个简单的旋转加载动画设计的思路
    常用算法的C++实现
    [转]解决Ionic2 innerHTML 无法嵌入HTML
    How to debug an Angular 2 application with Chrome and VS Code
    Angularjs2-下拉列表实现(父子组件通信)
    Angular 2 HTTP Requests with Observables
    Ionic2 beta8后更新的内容
    ionic2中如何使用自动生成器
  • 原文地址:https://www.cnblogs.com/kendoziyu/p/15807337.html
Copyright © 2011-2022 走看看