zoukankan      html  css  js  c++  java
  • 集群中机器本地缓存同步实现机制:redis的发布订阅机制

    背景:

    集群中,某一台机器的本地缓存更改了,需要同步到集群中的其他机器上

    Redis订阅配置: 注解@EventListener(ContextRefreshedEvent.class),项目启动自动初始化redis订阅

    @Slf4j
    @Component
    public class RedisSubInitializer {
    	@Autowired
    	private RedisConnection redisConnection;
    	@Autowired
    	private RedisPubsubListener redisPubsubListener;
    
    	@Async
    	@EventListener(ContextRefreshedEvent.class)
    	public void onApplicationEvent(ContextRefreshedEvent event) {
    		//订阅
    		try {
    			LOGGER.info("Redis 订阅频道,pubSub={}, channel={}", redisPubsubListener, MSG_CHANNEL);
    			redisConnection.subscribe(redisPubsubListener,MSG_CHANNEL);
    		} catch (Exception e) {
    			LOGGER.error("订阅失败channel={}", MSG_CHANNEL, e);
    		}
    	}
    }
    

    发布事件

    在业务代码处,触发事件发布

    msgPublisher.publish(MSG_CHANNEL, messageVo);
    

    事件发布类

    @Slf4j
    @Service
    public class MsgPublisher {
    	@Autowired
    	private RedisConnection redisConnection;
    
    	/**
    	 * 消息发布
    	 * @param channel
    	 * @param message
    	 */
    	public void publish(String channel, Object message) {
    		try {
    			if (message instanceof String) {
    				redisConnection.publish(channel, String.valueOf(message));
    			} else {
    				redisConnection.publish(channel, JSON.toJSONString(message));
    			}
    			LOGGER.info("message publish, channel={}, message={}", channel, message);
    		} catch (Exception e) {
    			LOGGER.error("message pub error, channel={}, message={}", channel, message, e);
    		}
    	}
    }
    
    

    监听事件类,实现org.springframework.data.redis.connection.MessageListener接口

    @Slf4j
    @Component
    public class RedisPubsubListener implements MessageListener {
    	@Autowired
    	private RedisConnection redisConnection;
    
    	@Override
    	public void onMessage(Message message, byte[] pattern) {
    		String channel = redisConnection.deserializekey(message.getChannel());
    		String body = redisConnection.deserializekey(message.getBody());
    		LOGGER.info("channel={}, message={}", channel, message);
    		try {
    			if(MSG_CHANNEL.equals(channel)) {
    				//处理相关事件,缓存同步,从数据库中重新加载
                    toHandle(body);
    			}
    		} catch (Exception e) {
    			LOGGER.error("channel={}, message={}", channel, message, e);
    		}
    	}
    

    RedisConnection类:继承org.redisson.spring.data.connection.RedissonConnection类

    @Slf4j
    public class RedisConnection extends RedissonConnection {
    	private RedisSerializer keySerializer;
    	private RedisSerializer valueSerializer;
    	private RedissonClient client;
    	public String deserializekey(byte[] bytes) {
    		if (bytes == null) {
    			return null;
    		}
    		return new String(bytes, Charset.forName("UTF-8"));
    	}
    	public Long publish(String channel, String message) {
    		try {
    			return publish(getKey(channel), getKey(message));
    		} catch (Exception e) {
    			LOGGER.error("Redis publish ops error, channel={}, message={}", channel, message, e);
    		}
    		return 0L;
    	}
    
    	public void subscribe(MessageListener listener, String ... channels) {
    		try {
    			subscribe(listener, getKeys(channels));
    		} catch (Exception e) {
    			LOGGER.error("Redis subscribe ops error, listener={}, channels={}", listener, channels, e);
    		}
    	}
        private byte[] getKey(String key) {
    		return keySerializer.serialize(getRedisKey(key));
    	}
    
    	private byte[][] getKeys(String ... keys) {
    		byte[][] bs = new byte[keys.length][];
    		for (int i = 0; i < keys.length; i++) {
    			bs[i] = getKey(keys[i]);
    		}
    		return bs;
    	}
    }
    

    总结

    本地缓存同步就可以通过Redis的事件发布订阅机制来实现,但本机也会消费到自己发布的消息,要做幂等性操作。

  • 相关阅读:
    mysql 常用语句
    easyui 时间格式化
    sql学习
    Java基础知识
    windows部署环境(laravel项目)
    gradle spring boot构建项目
    linux mysql操作
    composer
    linux常用命令
    mongodb使用手册
  • 原文地址:https://www.cnblogs.com/bb-ben99/p/14012536.html
Copyright © 2011-2022 走看看