简介
首先,我们翻译一下这个类的注释:
RedisMessageListenerContainer 为Redis消息侦听器 MessageListener 提供异步行为的容器。处理侦听、转换和消息分派的低级细节。
与低级别Redis(每个订阅一个连接)相反,容器只使用一个连接,该连接对所有注册的侦听器都是“多路复用”的,消息调度是通过任务执行器完成的。
注意:容器以惰性方式使用连接(仅当至少配置了一个侦听器时才使用连接)。
同时添加和删除侦听器会产生未定义的结果。强烈建议对这些方法进行相应的同步/排序。
public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle
关于 InitializingBean, DisposableBean, SmartLifecycle ,建议阅读以下文章加以了解:
start()
// 调用start()方法后,当前对象状态为运行中;调用stop()方法后,当前对象状态为不在运行。
// whether the container is running (or not)
private volatile boolean running = false;
public void start() {
if (!running) {
running = true;
// wait 方法调用时,外层必须有 synchronized 关键字
// wait for the subscription to start before returning
// technically speaking we can only be notified right before the subscription starts
synchronized (monitor) {
// 惰性侦听
lazyListen();
// lazyListen 中会异步启动内部类SubscriptionTask的实例
// 这里加上 wait 同步,主要就是想等到SubscriptionTask实例也运行起来了,再打印出 "Started RedisMessageListenerContainer" 的日志
if (listening) {
try {
// wait up to 5 seconds for Subscription thread
monitor.wait(initWait);
} catch (InterruptedException e) {
// 如果调用 start() 方法的线程被设置打断标记导致wait方法被唤醒
// 那启动过程也就中止了,container 变为不在运行中的状态
// stop waiting
Thread.currentThread().interrupt();
running = false;
return;
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("Started RedisMessageListenerContainer");
}
}
}
lazyListen
/**
* 方法检查是否确实需要侦听消息(从而使用线程)并触发它。
* Method inspecting whether listening for messages (and thus using a thread) is actually needed and triggering
*/
private void lazyListen() {
boolean debug = logger.isDebugEnabled();
boolean started = false;
if (isRunning()) {
// “double check”:synchronized 之前之后都进行一次状态判定,并发知识,不多说了
if (!listening) {
synchronized (monitor) {
if (!listening) {
// 这里就对应上了类的注释:仅当至少配置了一个侦听器时才使用连接
if (channelMapping.size() > 0 || patternMapping.size() > 0) {
// 使用订阅线程池启动订阅任务
subscriptionExecutor.execute(subscriptionTask);
listening = true;
started = true;
}
}
}
if (debug) {
if (started) {
logger.debug("Started listening for Redis messages");
} else {
logger.debug("Postpone listening for Redis messages until actual listeners are added");
}
}
}
}
}
RedisMessageListenerContainer 容器的惰性侦听,有几点:
- 至少配置了一个channel或者pattern才侦听;
- 一个容器至多启动一个subscriptionTask;
- 有三种情况会调用 lazyListen 检查是否需要侦听并启动subscriptionTask:
- 容器启动时
- 调用 addMessageListener 动态添加订阅channel或者pattern时
- SubscriptionTask.run() 异常退出时
channel 指的是字符串类型的订阅“主题”,pattern 则是用通配符表示订阅一类“主题”
内部类SubscriptionTask
SubscriptionTask 实现了 Runnable 接口,在 subscriptionExecutor.execute(subscriptionTask); 调用之后,就会异步执行 run 方法。
run()
// Redis连接
private volatile @Nullable RedisConnection connection;
private final Object localMonitor = new Object();
private boolean subscriptionTaskRunning = false;
public void run() {
// 一个SubscriptionTask实例同时只允许一个线程执行接下来的代码
synchronized (localMonitor) {
subscriptionTaskRunning = true;
}
try {
// 获取Redis连接
connection = connectionFactory.getConnection();
if (connection.isSubscribed()) {
throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
}
// 如果ConnectionFactory是LettuceConnectionFactory及其子类,就返回true
boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory);
// NB: sync drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
if (!asyncConnection) {
synchronized (monitor) {
monitor.notify();
}
}
// 订阅代码
SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription();
if (asyncConnection) {
// 设置检验条件和超时时长的“自旋屏障”:
// 要么条件满足,结束自旋;
// 要么等待超时,结束自旋;
SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime());
synchronized (monitor) {
monitor.notify();
}
}
} catch (Throwable t) {
// 订阅失败的处理
handleSubscriptionException(t);
} finally {
// 此块在订阅线程结束后执行
// this block is executed once the subscription thread has ended, this may or may not mean
// the connection has been unsubscribed, depending on driver
synchronized (localMonitor) {
subscriptionTaskRunning = false;
localMonitor.notify();
}
}
}
eventuallyPerformSubscription
/**
* Performs a potentially asynchronous registration of a subscription.
*
* @return #SubscriptionPresentCondition that can serve as a handle to check whether the subscription is ready.
*/
private SubscriptionPresentCondition eventuallyPerformSubscription() {
SubscriptionPresentCondition condition = null;
if (channelMapping.isEmpty()) {
condition = new PatternSubscriptionPresentCondition();
// 对应Redis命令中的 PSUBSCRIBE 订阅一个或多个符合给定模式的频道。
connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet()));
} else {
if (patternMapping.isEmpty()) {
condition = new SubscriptionPresentCondition();
} else {
// channelMapping 和 patternMapping 都有数据时进入该分支
// schedule the rest of the subscription
// 模式订阅交给 PatternSubscriptionTask 去做
subscriptionExecutor.execute(new PatternSubscriptionTask());
condition = new PatternSubscriptionPresentCondition();
}
// 对应Redis命令中的 SUBSCRIBE 订阅给定的一个或多个频道的信息。
connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet()));
}
return condition;
}
这里又多了两个需要了解的细节:
- *PatternSubscriptionTask 主要负责 模式订阅;
- DispatchMessageListener 主要负责 消息分发;
handleSubscriptionException
RedisMessageListenerContainer 的订阅任务具有重试机制。如果Redis发生宕机重启,定时重试,可以让WEB应用服务器在Redis恢复时,重新订阅channel或者pattern。
/**
* 处理订阅任务异常。如果异常是连接失败(例如,Redis重新启动),将尝试重新启动订阅。
* Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection
* failure (for example, Redis was restarted).
*
* @param ex Throwable exception
*/
protected void handleSubscriptionException(Throwable ex) {
// 修改当前容器的侦听状态
listening = false;
// 释放Redis连接
subscriptionTask.closeConnection();
// 如果是Redis连接断开的情况
if (ex instanceof RedisConnectionFailureException) {
if (isRunning()) {
// 默认是睡眠 5 秒后重试
logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
sleepBeforeRecoveryAttempt();
// 重新执行“惰性侦听”
lazyListen();
}
} else {
logger.error("SubscriptionTask aborted with exception:", ex);
}
}
PatternSubscriptionTask
PatternSubscriptionTask 又是 SubscriptionTask的一个子类,也实现了 Runnable 接口,我们来看看它的 run 方法
run
public void run() {
// done 表示等待 connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet())); 订阅完成
// wait for subscription to be initialized
boolean done = false;
// 自旋三次,等待 Subscription 初始化完成,这里准备复用该对象
// wait 3 rounds for subscription to be initialized
for (int i = 0; i < ROUNDS && !done; i++) {
if (connection != null) {
synchronized (localMonitor) {
if (connection.isSubscribed()) {
done = true;
connection.getSubscription().pSubscribe(unwrap(patternMapping.keySet()));
} else {
// 每次自旋等待 500 毫秒
try {
Thread.sleep(WAIT);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
}
}
}
}
}
DispatchMessageListener
private class DispatchMessageListener implements MessageListener {
@Override
public void onMessage(Message message, @Nullable byte[] pattern) {
Collection<MessageListener> listeners = null;
// if it's a pattern, disregard channel
if (pattern != null && pattern.length > 0) {
listeners = patternMapping.get(new ByteArrayWrapper(pattern));
} else {
pattern = null;
// do channel matching first
listeners = channelMapping.get(new ByteArrayWrapper(message.getChannel()));
}
if (!CollectionUtils.isEmpty(listeners)) {
// 真正分发消息给容器管理的 MessageListener
dispatchMessage(listeners, message, pattern);
}
}
}
dispatchMessage
private void dispatchMessage(Collection<MessageListener> listeners, Message message, @Nullable byte[] pattern) {
byte[] source = (pattern != null ? pattern.clone() : message.getChannel());
for (MessageListener messageListener : listeners) {
taskExecutor.execute(() -> processMessage(messageListener, message, source));
}
}
如果咱CRUD程序员没有自定义 subscriptionExecutor,那么 taskExecutor 其实和 subscriptionExecutor 是同一个引用:
// RedisMessageListenerContainer 的成员方法
public void afterPropertiesSet() {
if (taskExecutor == null) {
manageExecutor = true;
taskExecutor = createDefaultTaskExecutor();
}
if (subscriptionExecutor == null) {
// 如果执行到这里,那么两者就是同一个引用
subscriptionExecutor = taskExecutor;
}
initialized = true;
}
processMessage
处理消息时,有一层 catch,如果捕获到容器管理的 MessageListener 的异常,是可以通过容器的异常处理器来处理的。
// RedisMessageListenerContainer 的成员方法
protected void processMessage(MessageListener listener, Message message, byte[] pattern) {
executeListener(listener, message, pattern);
}
protected void executeListener(MessageListener listener, Message message, byte[] pattern) {
try {
listener.onMessage(message, pattern);
} catch (Throwable ex) {
handleListenerException(ex);
}
}
handleListenerException
可以通过给容器设置 ErrorHandler 实现对象,来统一处理捕获到他管理的的 MessageListener 的异常。
protected void handleListenerException(Throwable ex) {
if (isActive()) {
// Regular case: failed while active.
// Invoke ErrorHandler if available.
invokeErrorHandler(ex);
} else {
// Rare case: listener thread failed after container shutdown.
// Log at debug level, to avoid spamming the shutdown logger.
logger.debug("Listener exception after container shutdown", ex);
}
}
protected void invokeErrorHandler(Throwable ex) {
if (this.errorHandler != null) {
this.errorHandler.handleError(ex);
} else if (logger.isWarnEnabled()) {
logger.warn("Execution of message listener failed, and no ErrorHandler has been set.", ex);
}
}