需求
最近负责的一个项目是微服务开发,我所开发的服务里有个定时任务,如果在集群环境下仅仅使用@scheduled,会发生定时任务幂等的问题。
可以采取的解决方案:
1.通过在配置文件设置定时任务开关,只允许一台开启定时任务。考虑到使用了nacos作为配置中心,单节点定时任务对单台服务器的压力以及单点故障问题,不采用。
2.搭建一套分布式任务调度系统。考虑到时间紧急及目前定时任务比较少,不采用。
3.使用分布式锁。由于项目本身就使用到redis,而且开发成本低,采用。
分布式锁介绍
什么是分布式锁?
分布式锁是控制分布式系统之间共同访问共享资源的⼀种锁实现
为什么要用分布式锁?
在单节点系统中,我们可以通过synchronized和lock保证同⼀个⽅法在同一个时刻被同一个线程调用。但是在分布式系统中,这就不管用了,因此,我们使用分布式锁的目的是保证在分布式部署的应⽤集群中,同⼀个⽅法在同⼀时刻只能被⼀台机器上的⼀个线程执⾏。就像需求中提到到,同一时刻只有一台机子在执行定时任务。
实现原理和思路
原理:通过Redis的setnx key value命令,当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
思路:
1.获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,避免宕机或网络原因发生导致锁不能释放发生死锁。
2.锁的value值为当前服务器的ip+端口(考虑在定时任务中使用),大家也可以用UUID,在释放锁的时候进行判断,避免释放到不属于自己的锁。
代码实现(基于SpringBoot2.0开发)
第一步:Maven依赖
1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-data-redis</artifactId> 4 </dependency>
第二步:Redis参数配置
在application.properties中添加redis的链接地址
1 spring.redis.database = 0 2 spring.redis.host = 127.0.0.1 3 spring.redis.port = 6379 4 spring.redis.password =
创建RedisConfig
1 @Configuration 2 public class RedisConfig { 3 4 @Bean 5 public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) { 6 RedisTemplate<String, String> redisTemplate = new RedisTemplate<>(); 7 redisTemplate.setConnectionFactory(factory); 8 return redisTemplate; 9 } 10 11 @Bean 12 public KeyGenerator simpleKeyGenerator() { 13 return (o, method, objects) -> { 14 StringBuilder stringBuilder = new StringBuilder(); 15 stringBuilder.append(o.getClass().getSimpleName()); 16 stringBuilder.append("."); 17 stringBuilder.append(method.getName()); 18 stringBuilder.append("["); 19 for (Object obj : objects) { 20 stringBuilder.append(obj.toString()); 21 } 22 stringBuilder.append("]"); 23 24 return stringBuilder.toString(); 25 }; 26 } 27 28 @Bean 29 public CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) { 30 return new RedisCacheManager( 31 RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory), 32 this.getRedisCacheConfigurationWithTtl(600), 33 this.getRedisCacheConfigurationMap() 34 ); 35 } 36 37 private Map<String, RedisCacheConfiguration> getRedisCacheConfigurationMap() { 38 Map<String, RedisCacheConfiguration> redisCacheConfigurationMap = new HashMap<>(); 39 redisCacheConfigurationMap.put("UserInfoList", this.getRedisCacheConfigurationWithTtl(100)); 40 redisCacheConfigurationMap.put("UserInfoListAnother", this.getRedisCacheConfigurationWithTtl(18000)); 41 42 return redisCacheConfigurationMap; 43 } 44 45 private RedisCacheConfiguration getRedisCacheConfigurationWithTtl(Integer seconds) { 46 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); 47 ObjectMapper om = new ObjectMapper(); 48 om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); 49 om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); 50 jackson2JsonRedisSerializer.setObjectMapper(om); 51 52 RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig(); 53 redisCacheConfiguration = redisCacheConfiguration.serializeValuesWith( 54 RedisSerializationContext 55 .SerializationPair 56 .fromSerializer(jackson2JsonRedisSerializer) 57 ).entryTtl(Duration.ofSeconds(seconds)); 58 59 return redisCacheConfiguration; 60 } 61 62 }
第三步:创建RedisService,实现加锁与释放,加锁的过程不是原子操作,理论上会发生上锁成功但是没有设置过期时间的可能,这里改进的话可以使用Lua脚本来保证原子性操作。
1 @Service 2 public class RedisService { 3 4 @Autowired 5 private RedisTemplate redisTemplate; 6 7 8 /** 9 * 获取分布式锁 10 * 11 * @param lockName 锁名称 12 * @param lockValue 锁值 13 * @param lockValue 过期时间 14 * @return 15 */ 16 public boolean lock(String lockName, String lockValue, Long expireTime) { 17 boolean result = false; 18 try { 19 result = redisTemplate.opsForValue().setIfAbsent(lockName, lockValue); 20 if (result) { 21 redisTemplate.expire(lockName, expireTime, TimeUnit.SECONDS); 22 } 23 } catch (Exception e) { 24 e.printStackTrace(); 25 } 26 return result; 27 } 28 29 /** 30 * 释放分布式锁 31 * 32 * @param lockName 锁名称 33 * @param lockValue 锁值 34 * @return 35 */ 36 public boolean release(String lockName, String lockValue) { 37 boolean result = false; 38 try { 39 if (lockValue.equals(redisTemplate.opsForValue().get(lockName))) { 40 return redisTemplate.delete(lockName); 41 } 42 } catch (Exception e) { 43 e.printStackTrace(); 44 } 45 return result; 46 } 47 48 }
第四步:代码测试
1 @Service 2 @Slf4j 3 public class MyLockJob { 4 5 private static String LOCK_NAME = "my_lock"; 6 7 @Value("${server.port}") 8 private String serverPort; 9 @Autowired 10 private RedisService redisService; 11 12 13 @Scheduled(cron = "0/9 * * * * *") 14 public void lockJob() throws Exception { 15 boolean lockResult = false; 16 try { 17 18 String lockValue = InetAddress.getLocalHost().getHostAddress() + ":" + serverPort; 19 lockResult = redisService.lock(LOCK_NAME, lockValue, 10L); 20 21 // 获取锁成功 22 if (lockResult) { 23 log.info("地址:" + InetAddress.getLocalHost().getHostAddress() + ":" + serverPort + "获取锁成功!"); 24 Thread.sleep(5000); 25 return; 26 } 27 } catch (Exception e) { 28 log.error("lock error", e); 29 } finally { 30 // 根据锁的名称及锁的值释放锁 31 if (lockResult) { 32 String releaseValue = InetAddress.getLocalHost().getHostAddress() + ":" + serverPort; 33 boolean releaseResult = redisService.release(LOCK_NAME, releaseValue); 34 if (releaseResult) { 35 log.info(InetAddress.getLocalHost().getHostAddress() + ":" + serverPort + "释放锁成功!"); 36 } 37 } 38 } 39 } 40 41 }
结果展示
本地起了四台服务8080~8083,模拟多台服务器争抢锁及释放锁的过程,这里展示一分钟内获取锁和释放锁的情况。
8080端口获取锁:
8081端口获取锁:
8082端口有点倒霉,没拿到锁。
8083端口获取锁:
可以看到,同一时刻,只有一个服务能拿到锁,并且获取锁及释放锁为同一服务,需求完成。