参考书籍:《redis深度历险:核心原理与应用实践》
分布式应用进行逻辑处理时经常会遇到并发问题,这个时候就要使用到分布式锁来限制程序的并发执行。
>set lock:codehole true ex 5 nx ...do something... >del lock:codehole
这个指令就是setnx和expire组合在一起的原子指令,他就是分布式锁的奥义所在。
但是redis的分布式锁不能解决超时问题,如果在加锁和释放锁之间的逻辑执行的时间太长,以至于超出了锁的时间范围,就会导致临界区的逻辑还没有被第一个线程执行完,第二线程就提前重新持有了这把锁,导致临界区代码不能得到严格的串行执行。
如若客户端在处理请求时加锁没加成功。一般有3种策略来处理加锁失败:
1.直接抛出异常,通知用户稍后重试
这种方式比较适合由用户直接发起的请求,用户看到错误对话框后,再点击重试,这就是人工延时。它本质上是对当前请求的放弃,由用户决定是否发起新的请求。
2.sleep
sleep会阻塞当前的消息,处理线程,会导致队列的后续消息处理出现延迟。如果碰撞的比较频繁,或者队列里消息比较多,sleep可能并不合适。如果因为个别死锁的key导致加锁不成功,线程会彻底堵死,导致后续消息永远得不到及时处理
3.延时队列
这种方式比较适合异步消息处理,将当前冲突的请求扔到另一个队列延后处理以避冲突
redis的分布式锁支持可重入,可重入性是指线程在持有锁的情况下再次请求加锁,如果一个锁支持同一个线程多次加锁,那么这个锁就是可重入的。如果想要redis的锁支持可重入,需要对客户端的set方法进行包装,使用线程的Threadlocal变量存储当前持有锁的计数。
import redis import threading locks=threading.local() locks.redis={} def key_for(user_id): return "account_{}".format(user_id) def _lock(client,key): return bool(client.set(key,True,nx=True,ex=100)) def lock(client,user_id): key=key_for(user_id) if key in locks.redis: locks.redis[key]+=1 return True ok=_lock(client,key) if not ok: return False locks.redis[key]=1 return True def unlock(client,user_id): key=key_for(user_id) if key in locks.redis: locks.redis[key]-=1 if locks.redis[key]<0: del locks.redis[key] self._unlock(key) return True return False client=redis.StrictRedis() print("lock",lock(client,"codehole")) print("lock",lock(client,"codehole")) print("unlock",unlock(client,"codehole")) print("unlock",unlock(client,"codehole"))