https://www.cnblogs.com/xiaonq/p/12328934.html
1.1 redis事物介绍
1.redis事物是可以一次执行多个命令,本质是一组命令的集合。 2.一个事务中的所有命令都会序列化,按顺序串行化的执行而不会被其他命令插入 **作用:**一个队列中,一次性、顺序性、排他性的执行一系列命令
# Multi 命令用于标记一个事务块的开始事务块内的多条命令会按照先后顺序被放进一个队列当中,最后由 EXEC 命令原子性( atomic )地执行 > multi # 开始一个redis事物 incr books incr books > exec # 执行事物 > discard # 丢弃事物
1.2 watch乐观锁
实质:WATCH 只会在数据被其他客户端抢先修改了的情况下通知执行命令的这个客户端(通过 WatchError 异常)但不会阻止其他客户端对数据的修改 1)原理 1.当用户购买时,通过 WATCH 监听用户库存,如果库存在watch监听后发生改变,就会捕获异常而放弃对库存减一操作 2.如果库存没有监听到变化并且数量大于1,则库存数量减一,并执行任务 2)弊端 1.Redis 在尝试完成一个事务的时候,可能会因为事务的失败而重复尝试重新执行 2.保证商品的库存量正确是一件很重要的事情,但是单纯的使用 WATCH 这样的机制对服务器压力过大
使用reids的 watch + multi 指令实现超卖
#! /usr/bin/env python # -*- coding: utf-8 -*- import redis def sale(rs): while True: with rs.pipeline() as p: try: p.watch('apple') # 监听key值为apple的数据数量改变 count = int(rs.get('apple')) print('拿取到了苹果的数量: %d' % count) p.multi() # 事务开始 if count> 0 : # 如果此时还有库存 p.set('apple', count - 1) p.execute() # 执行事务 p.unwatch() break # 当库存成功减一或没有库存时跳出执行循环 except Exception as e: # 当出现watch监听值出现修改时,WatchError异常抛出 print('[Error]: %s' % e) continue # 继续尝试执行 rs = redis.Redis(host='127.0.0.1', port=6379) # 连接redis rs.set('apple',1000) # 首先在redis中设置某商品apple 对应数量value值为1000 sale(rs)
1.3 setnx分布式锁
1.分布式锁本质是占一个坑,当别的进程也要来占坑时发现已经被占,就会放弃或者稍后重试 2.占坑一般使用 setnx(set if not exists)指令,只允许一个客户端占坑 3.先来先占,用完了在调用del指令释放坑 > setnx lock:codehole true .... do something critical .... > del lock:codehole
1.3.1 setnx+watch+multi解决超卖问题
#! /usr/bin/env python # -*- coding: utf-8 -*- import redis import uuid import time # 1.初始化连接函数 def get_conn(host,port=6379): rs = redis.Redis(host=host, port=port) return rs # 2. 构建redis锁 def acquire_lock(rs, lock_name, expire_time=10): ''' rs: 连接对象 lock_name: 锁标识 acquire_time: 过期超时时间 return -> False 获锁失败 or True 获锁成功 ''' identifier = str(uuid.uuid4()) end = time.time() + expire_time while time.time() < end: # 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False if rs.setnx(lock_name, identifier): # 尝试取得锁 return identifier time.sleep(0.001) return False # 3. 释放锁 def release_lock(rs, lockname, identifier): ''' rs: 连接对象 lockname: 锁标识 identifier: 锁的value值,用来校验 ''' pipe = rs.pipeline(True) try: pipe.watch(lockname) if rs.get(lockname).decode() == identifier: # 防止其他进程同名锁被误删 pipe.multi() # 开启事务 pipe.delete(lockname) pipe.execute() return True # 删除锁 pipe.unwatch() # 取消事务 except Exception as e: pass return False # 删除失败 '''在业务函数中使用上面的锁''' def sale(rs): start = time.time() # 程序启动时间 with rs.pipeline() as p: ''' 通过管道方式进行连接 多条命令执行结束,一次性获取结果 ''' while True: lock = acquire_lock(rs, 'lock') if not lock: # 持锁失败 continue try: count = int(rs.get('apple')) # 取量 p.set('apple', count-1) # 减量 p.execute() print('当前库存量: %s' % count) break finally: release_lock(rs, 'lock', lock) print('[time]: %.2f' % (time.time() - start)) rs = redis.Redis(host='127.0.0.1', port=6379) # 连接redis rs.set('apple',1000) # # 首先在redis中设置某商品apple 对应数量value值为1000 sale(rs)
1.3.2 优化:给分布式锁加超时时间防止死锁
def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10): ''' rs: 连接对象 lock_name: 锁标识 acquire_time: 过期超时时间 locked_time: 锁的有效时间 return -> False 获锁失败 or True 获锁成功 ''' identifier = str(uuid.uuid4()) end = time.time() + expire_time while time.time() < end: # 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False if rs.setnx(lock_name, identifier): # 尝试取得锁 # print('锁已设置: %s' % identifier) rs.expire(lock_name, locked_time) return identifier time.sleep(.001) return False