分布式锁是 redis 比较常见的应用之一;
问题场景:现在有一个一个简单用户的相关操作,一个线程取修改用户状态,首先从数据库读取用户信息,在到内存进行修改,修改完毕进行持久化,单线程这样操作没问题,但是在多线程中,由于读取,修改,持久化 是三个操作,不是原子操作,因此多线程中,可能会发生数据紊乱,对于这种问题可用使用分布式锁限制程序并发执行;分布式锁实现原理:第一个线程先占位,当后续线程进来时发现已被占用,会等待或稍后重试;在 Redis 中,占位一般使用 setnx 进行操作;先进来的线程先占位,线程操作执行完毕在带调用 del 删除指令,释放占位;
CaooWithJedis 接口
public interface CaooWithJedis { void callJedis(Jedis jedis); }
Redis 类
public class Redis { private JedisPool pool; GenericObjectPoolConfig conf = new GenericObjectPoolConfig(); public Redis(){ pool = new JedisPool(conf,"192.168.134.129",6379,30000,"wdh01"); } public void execute(CaooWithJedis caooWithJedis){ try(Jedis jedis = pool.getResource()){ caooWithJedis.callJedis(jedis); } } }
测试代码
public static void main(String[] args) { Redis redis = new Redis(); redis.execute(jedis -> { Long setnx = jedis.setnx("k1", "v1"); if(setnx == 1){ // 无人占位 String set = jedis.set("name", "wdh01"); String name = jedis.get("name"); System.out.println(name); jedis.del("k1"); }else{ //有人占位,停止/暂缓操作 } }); }
上面的代码存在一个问题:当代码执行 del 之前出现异常,则会导致 del 未执行,k1 则无法释放,后面的请求也会被阻塞,分布式锁也无法释放;为此 可用为 k1 设置过期时间,确保一定时间后 分布式锁会被释放;
public static void main(String[] args) { Redis redis = new Redis(); redis.execute(jedis -> { Long setnx = jedis.setnx("k1", "v1"); if(setnx == 1){ //给锁设置过期时间 10s,即使程序出现问题,10 s后 k1 将被释放 jedis.expire("k1",10); // 无人占位 String set = jedis.set("name", "wdh01"); String name = jedis.get("name"); System.out.println(name); jedis.del("k1"); }else{ //有人占位,停止/暂缓操作 } }); }
这样改造后还有一个问题,在获取锁和设置过期时间之间如果程序或服务器挂掉了,此时锁也将无法得到释放会造成死锁;获取锁和设置过期时间是两个操作;不具备原子性,为了解决此问题,自 Redis 2.8 开始 setnx 和 expire 可用通过一个命令一起执行,下面对上述代码做进一步改进
public static void main(String[] args) { Redis redis = new Redis(); redis.execute(jedis -> { String set = jedis.set("k1","v1",new SetParams().nx().ex(10)); if(set != null && "OK".equals(set)){ //给锁设置过期时间 10s,即使程序出现问题,10 s后 k1 将被释放 jedis.expire("k1",10); // 无人占位 jedis.set("name", "wdh01"); String name = jedis.get("name"); System.out.println(name); jedis.del("k1"); }else{ //有人占位,停止/暂缓操作 } }); }