一、场景
项目A监听mq中的其他项目的部署消息(包括push_seq, status, environment,timestamp等),然后将部署消息同步到数据库中(项目X在对应环境[environment]上部署的push_seq[项目X的版本])。那么问题来了,mq中加入包含了两个部署消息 dm1 和 dm2,dm2的push_seq > dm1的push_seq,在分布式的情况下,dm1 和 dm2可能会分别被消费(也就是并行),那么在同步数据库的时候可能会发生 dm1 的数据保存 后于 dm2的数据保存,导致保存项目的部署信息发生异常。
二、解决思路
将mq消息的并行消费变成串行消费,这里借助redis分布式锁来完成。同一个服务在分布式的状态下,监听到mq消息后,触发方法的执行,执行之前(通过spring aop around来做的)首先获得redis的一个分布式锁,获取锁成功之后才能执行相关的逻辑以及数据库的保存,最后释放锁。
三、主要代码
import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author: hujunzheng * @create: 17/9/29 下午2:49 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface RedisLock { /** * redis的key * @return */ String value(); /** * 持锁时间,单位毫秒,默认一分钟 */ long keepMills() default 60000; /** * 当获取失败时候动作 */ LockFailAction action() default LockFailAction.GIVEUP; public enum LockFailAction{ /** * 放弃 */ GIVEUP, /** * 继续 */ CONTINUE; } /** * 睡眠时间,设置GIVEUP忽略此项 * @return */ long sleepMills() default 500; }
import java.lang.reflect.Method; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author: hujunzheng * @create: 17/9/29 下午2:49 */ @Component @Aspect public class RedisLockAspect { private static final Log log = LogFactory.getLog(RedisLockAspect.class); @Autowired private RedisCacheTemplate.RedisLockOperation redisLockOperation; @Pointcut("execution(* com.hjzgg..StargateDeployMessageConsumer.consumeStargateDeployMessage(..))" + "&& @annotation(me.ele.api.portal.service.redis.RedisLock)") private void lockPoint(){} @Around("lockPoint()") public Object arround(ProceedingJoinPoint pjp) throws Throwable{ MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); Method method = methodSignature.getMethod(); RedisLock lockInfo = method.getAnnotation(RedisLock.class); /* String lockKey = lockInfo.value(); if (method.getParameters().length == 1 && pjp.getArgs()[0] instanceof DeployMessage) { DeployMessage deployMessage = (DeployMessage) pjp.getArgs()[0]; lockKey += deployMessage.getEnv(); System.out.println(lockKey); } */ boolean lock = false; Object obj = null; while(!lock){ long timestamp = System.currentTimeMillis()+lockInfo.keepMills(); lock = setNX(lockInfo.value(), timestamp); //得到锁,已过期并且成功设置后旧的时间戳依然是过期的,可以认为获取到了锁(成功设置防止锁竞争) long now = System.currentTimeMillis(); if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){ log.info("得到redis分布式锁..."); obj = pjp.proceed(); if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){ releaseLock(lockInfo.value()); } }else{ if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){ log.info("稍后重新请求redis分布式锁..."); Thread.currentThread().sleep(lockInfo.sleepMills()); }else{ log.info("放弃redis分布式锁..."); break; } } } return obj; } private boolean setNX(String key,Long value){ return redisLockOperation.setNX(key, value); } private long getLock(String key){ return redisLockOperation.get(key); } private Long getSet(String key,Long value){ return redisLockOperation.getSet(key, value); } private void releaseLock(String key){ redisLockOperation.delete(key); } @Pointcut(value = "execution(* me.ele..StargateBuildMessageConsumer.consumeStargateBuildMessage(me.ele.api.portal.service.mq.dto.BuildMessage)) && args(buildMessage)" + "&& @annotation(me.ele.api.portal.service.redis.RedisLock)", argNames = "buildMessage") private void buildMessageLockPoint(BuildMessage buildMessage){} @Around(value = "buildMessageLockPoint(buildMessage)", argNames = "pjp,buildMessage") public Object buildMessageAround(ProceedingJoinPoint pjp, BuildMessage buildMessage) throws Throwable { final String LOCK = buildMessage.getAppId() + buildMessage.getPushSequence(); Lock lock = redisLockRegistry.obtain(LOCK); try { lock.lock(); return pjp.proceed(); } finally { try { lock.unlock(); } catch (Exception e) { log.error("buildMessage={}, Lock {} unlock failed. {}", buildMessage, lock, e); } } } }
四、遇到的问题
开始是将锁加到deploy的方法上的,但是一直aop一直没有作用,换到consumeStargateDeployMessage方法上就可以了。考虑了一下是因为 @Transactional的原因。这里注意下。
在一篇文章中找到了原因:SpringBoot CGLIB AOP解决Spring事务,对象调用自己方法事务失效.
只要脱离了Spring容器管理的所有对象,对于SpringAOP的注解都会失效,因为他们不是Spring容器的代理类,SpringAOP,就切入不了。也就是说是 @Transactional注解方法的代理对象并不是spring代理对象。
参考: 关于proxy模式下,@Transactional标签在创建代理对象时的应用
五、使用spring-redis中的RedisLockRegistry
import java.util.concurrent.locks.Lock; import org.springframework.integration.redis.util.RedisLockRegistry; @Bean public RedisLockRegistry redisLockRegistry(@Value("${xxx.xxxx.registry}") String redisRegistryKey, RedisTemplate redisTemplate) { return new RedisLockRegistry(redisTemplate.getConnectionFactory(), redisRegistryKey, 200000); } Lock lock = redisLockRegistry.obtain(appId); lock.tryLock(180, TimeUnit.SECONDS); .... lock.unlock();
六、参考
其他工具类,请参考这里。
七、springboot LockRegistry
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.integration.redis.util.RedisLockRegistry; import redis.clients.jedis.JedisShardInfo; @Ignore public class RedisLockTest { private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockTest.class); private static final String LOCK = "xxx.xxx"; private RedisLockRegistry redisLockRegistry; @Before public void setUp() { JedisShardInfo shardInfo = new JedisShardInfo("127.0.0.1"); JedisConnectionFactory factory = new JedisConnectionFactory(shardInfo); redisLockRegistry = new RedisLockRegistry(factory, "test", 50L); } private class TaskA implements Runnable { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Lock lock = redisLockRegistry.obtain(LOCK); try { lock.lock(); LOGGER.info("Lock {} is obtained", lock); Thread.sleep(10); lock.unlock(); LOGGER.info("Lock {} is unlocked", lock); } catch (Exception ex) { LOGGER.error("Lock {} unlock failed", lock, ex); } } } private class TimeoutTask implements Runnable { @Override public void run() { Lock lock = redisLockRegistry.obtain(LOCK); try { lock.lock(); LOGGER.info("Lock {} is obtained", lock); Thread.sleep(5000); lock.unlock(); LOGGER.info("Lock {} is unlocked", lock); } catch (Exception ex) { LOGGER.error("Lock {} unlock failed", lock, ex); } } } @Test public void test() throws InterruptedException, TimeoutException { ExecutorService service = Executors.newFixedThreadPool(2); service.execute(new TimeoutTask()); service.execute(new TaskA()); service.shutdown(); if (!service.awaitTermination(1, TimeUnit.MINUTES)) { throw new TimeoutException(); } } }