zoukankan      html  css  js  c++  java
  • SpringData-Redis发布订阅自动重连分析

    SpringData-Redis发布订阅自动重连分析

    RedisMessageListenerContainer

    配置

    @Bean
    @Autowired
    RedisMessageListenerContainer redisContainer(JedisConnectionFactory redisConnectionFactory, RedisMessageListener a) {
        RedisMessageListenerContainer container
                = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        List<Topic> topics = Lists.newArrayList(new ChannelTopic(
                        CHANNEL),
                new ChannelTopic(CHANNEL)
        );
        container.addMessageListener(new MessageListenerAdapter(a), topics);
        return container;
    }
    
    

    启动分析

    添加频道监听

    //RedisMessageListenerContainer.java
    
    public void addMessageListener(MessageListener listener, Collection<? extends Topic> topics) {
    	addListener(listener, topics);
    	lazyListen();
    }
    

    这个AddListener会 对Topic做一些记录,patternMapping, channelMapping,去重等等,然后最关键的一步:

    //RedisMessageListenerContainer.java
    //addListener
    // check the current listening state
    	if (listening) {
    		subscriptionTask.subscribeChannel(channels.toArray(new byte[channels.size()][]));
    		subscriptionTask.subscribePattern(patterns.toArray(new byte[patterns.size()][]));
    	}
    
    //RedisMessageListenerContainer.java
    
    void subscribeChannel(byte[]... channels) {
    	if (channels != null && channels.length > 0) {
    		if (connection != null) {
    			synchronized (localMonitor) {
    				Subscription sub = connection.getSubscription();
    				if (sub != null) {
    					sub.subscribe(channels);
    				}
    			}
    		}
    	}
    }
    
    //JedisSubscription.java
    	protected void doSubscribe(byte[]... channels) {
    		jedisPubSub.subscribe(channels);
    	}
    

    但是启动之前 这个listening=false。故该代码不生效。再看lazyListen方法:

    //RedisMessageListenerContainer.java
    private void lazyListen() {
    		boolean debug = logger.isDebugEnabled();
    		boolean started = false;
    
    		if (isRunning()) {
    			if (!listening) {
    				synchronized (monitor) {
    					if (!listening) {
    						if (channelMapping.size() > 0 || patternMapping.size() > 0) {
    							subscriptionExecutor.execute(subscriptionTask);
    							listening = true;
    							started = true;
    						}
    					}
    				}
    				if (debug) {
    					if (started) {
    						logger.debug("Started listening for Redis messages");
    					} else {
    						logger.debug("Postpone listening for Redis messages until actual listeners are added");
    					}
    				}
    			}
    		}
    	}
    
    

    调用addMessageListener的时候,isRunning()=false 也不生效。

    最后:当@Bean构造完成的时候 ,LifeCycle进入start的时候,该Container启动。

    //RedisMessageListenerContainer.java
    
        public void start() {
    		if (!running) {
    			running = true;
    			// wait for the subscription to start before returning
    			// technically speaking we can only be notified right before the subscription starts
    			synchronized (monitor) {
    				lazyListen();
    				if (listening) {
    					try {
    						// wait up to 5 seconds for Subscription thread
    						monitor.wait(initWait);
    					} catch (InterruptedException e) {
    						// stop waiting
    					}
    				}
    			}
    
    			if (logger.isDebugEnabled()) {
    				logger.debug("Started RedisMessageListenerContainer");
    			}
    		}
    	}
    

    这个时候,running=true了。
    然后调用 lazyListen(),确实比较Lazy。
    这个时候,启动子线程来执行订阅和监听。

    subscriptionExecutor.execute(subscriptionTask);
    

    这个subscriptionTask的构造如下:

    //RedisMessageListenerContainer.java
    public void run() {
    	synchronized (localMonitor) {
    		subscriptionTaskRunning = true;
    	}
    	try {
    		connection = connectionFactory.getConnection();
    		if (connection.isSubscribed()) {
    			throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
    		}
    
    		boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory);
    
    		// NB: async drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
    		if (!asyncConnection) {
    			synchronized (monitor) {
    				monitor.notify();
    			}
    		}
    
    		SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription();
    
    		if (asyncConnection) {
    			SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime());
    
    			synchronized (monitor) {
    				monitor.notify();
    			}
    		}
    	} catch (Throwable t) {
    		handleSubscriptionException(t);
    	} finally {
    		// this block is executed once the subscription thread has ended, this may or may not mean
    		// the connection has been unsubscribed, depending on driver
    		synchronized (localMonitor) {
    			subscriptionTaskRunning = false;
    			localMonitor.notify();
    		}
    	}
    }
    

    这里connection 肯定不是subscribed。
    然后他根据Redis的客户端类型来判断是否是阻塞的
    如果是阻塞的类型,则唤醒一下被阻塞的Container线程。(???)

    然后,最关键的是:eventuallyPerformSubscription(),最终发起订阅的,并轮询订阅的是方法。

    //RDMLC
    
    private SubscriptionPresentCondition eventuallyPerformSubscription() {
    
    	SubscriptionPresentCondition condition = null;
    
    	if (channelMapping.isEmpty()) {
    
    		condition = new PatternSubscriptionPresentCondition();
    		connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet()));
    	} else {
    
    		if (patternMapping.isEmpty()) {
    			condition = new SubscriptionPresentCondition();
    		} else {
    			// schedule the rest of the subscription
    			subscriptionExecutor.execute(new PatternSubscriptionTask());
    			condition = new PatternSubscriptionPresentCondition();
    		}
    
    		connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet()));
    	}
    
    	return condition;
    }
    

    以connection.subscribe()为例:即将发起订阅,注意这里是利用DispatchMessageListener做的事件分发监听器。

    //JedisConnection.java
    
    public void subscribe(MessageListener listener, byte[]... channels) {
    	if (isSubscribed()) {
    		throw new RedisSubscribedConnectionException(
    				"Connection already subscribed; use the connection Subscription to cancel or add new channels");
    	}
    
    	if (isQueueing()) {
    		throw new UnsupportedOperationException();
    	}
    	if (isPipelined()) {
    		throw new UnsupportedOperationException();
    	}
    
    	try {
    		BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);
    
    		subscription = new JedisSubscription(listener, jedisPubSub, channels, null);
    		jedis.subscribe(jedisPubSub, channels);
    
    	} catch (Exception ex) {
    		throw convertJedisAccessException(ex);
    	}
    }
    
    //BinaryJedis.java
    
    public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {
        client.setTimeoutInfinite();
        try {
          jedisPubSub.proceed(client, channels);
        } finally {
          client.rollbackTimeout();
        }
    }
    
    

    这里调用了BinaryJedisPubSub的proceed()。

    这里先提出两个问题?
    要订阅是不是要发起subscribe命令给Redis?发起 subscribe channel命令,然后Listener怎么办?

    这里调用是jedis.subscribe(jedisPubSub, channels);而一开始 subscibeChannels的实现却不太一样?

    下面看jedisPubSub:

      public void proceed(Client client, byte[]... channels) {
        this.client = client;
        client.subscribe(channels);
        client.flush();
        process(client);
      }
    

    这里subscribe是再次发起订阅请求,然后process轮询检查消息。

    异常处理

    再看看JedisConnection类subscribe方法的异常的处理:

    protected DataAccessException convertJedisAccessException(Exception ex) {
    
    	if (ex instanceof NullPointerException) {
    		// An NPE before flush will leave data in the OutputStream of a pooled connection
    		broken = true;
    	}
    
    	DataAccessException exception = EXCEPTION_TRANSLATION.translate(ex);
    	if (exception instanceof RedisConnectionFailureException) {
    		broken = true;
    	}
    
    	return exception;
    }
    

    EXCEPTION_TRANSLATION.translate(ex); 会调用:PassThroughExceptionTranslationStrategy的Convert。

    public class JedisExceptionConverter implements Converter<Exception, DataAccessException> {
    
    	public DataAccessException convert(Exception ex) {
    
    		if (ex instanceof DataAccessException) {
    			return (DataAccessException) ex;
    		}
    		if (ex instanceof JedisDataException) {
    			return new InvalidDataAccessApiUsageException(ex.getMessage(), ex);
    		}
    		if (ex instanceof JedisConnectionException) {
    			return new RedisConnectionFailureException(ex.getMessage(), ex);
    		}
    		if (ex instanceof JedisException) {
    			return new InvalidDataAccessApiUsageException(ex.getMessage(), ex);
    		}
    		if (ex instanceof UnknownHostException) {
    			return new RedisConnectionFailureException("Unknown host " + ex.getMessage(), ex);
    		}
    		if (ex instanceof IOException) {
    			return new RedisConnectionFailureException("Could not connect to Redis server", ex);
    		}
    
    		return null;
    	}
    }
    

    那么,当Jedis抛错:JedisConnectionException 服务器似乎断开了连接
    这个时候,subscribe 从而抛出RedisConnectionFailureException。

    最后,再看RedisMessageListenerContainerd的run方法内的异常处理:
    这个时候,

    protected void handleSubscriptionException(Throwable ex) {
    	listening = false;
    	subscriptionTask.closeConnection();
    	if (ex instanceof RedisConnectionFailureException) {
    		if (isRunning()) {
    			logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
    			sleepBeforeRecoveryAttempt();
    			lazyListen();
    		}
    	} else {
    		logger.error("SubscriptionTask aborted with exception:", ex);
    	}
    }
    
    

    到这个时候,isRunning还是true的(当且仅当LifeCycle进入close的时候,才会变成false),结果就会在 recoveryInterval ms之后,重启调用lazyListen(),再次启动订阅和监听。

    实际效果

    实际上,我在服务器上的错误日志中,我确实看到了

    Connection failure occurred. Restarting subscription task after 5000 ms

    总结

    SpringData-Redis,能够解决手动处理Redis pub/sub的订阅被意外断开,导致监听失败的问题。
    他能确保,服务持续监听,出现异常时,能够重新订阅并监听给定的频道。
    所以,还是用框架吧,比自己手写的发布订阅更可靠。

  • 相关阅读:
    微软教程:ASP.NET Core MVC 入门
    微软教程:ASP.NET Core Razor Pages 入门
    微软教程:ASP.NET Core Web API 入门
    微软教程:ASP.NET Core SignalR 入门(实时Web应用)
    Entity Framework Core 系列教程(翻译)
    视频教程:ASP.NET Core 3.x 构建 RESTful API(高级)
    C#(99):C# 9.0 新特性( NET Framework 5.0 与 Visual Studio ? )
    视频教程:VS Core 40分钟进行WebAPI开发和调用(入门级别)
    视频教程:ASP.NET Core 3.x 入门(包括MVC、Razor Page、Blazor、SignalR、gRPC)
    EntityFramework Core入门教程-12-在ASP.NET Core项目中配置EF Core
  • 原文地址:https://www.cnblogs.com/slankka/p/10028428.html
Copyright © 2011-2022 走看看