因业务需要写了个python的装饰器与redis同时使用,用于处理高并发操作,防止记录重复操作
装饰器部分代码
import hashlib
from functools import wraps
from api.api_state import APIState
from api.base_api import APIResponse
from utils.redis import acquire_lock_with_timeout, release_lock
def lock_operation(option_name: str, lock_key: str, option_type="GET", paramer_type="request"):
"""
操作加锁防止同一记录暴击
@param option_name: 操作名称
@param paramer_type: 取参类型 : request
@param option_type: 请求类型 :GET,POST
@param lock_key: 取参关键字
@return:
"""
def wrapper(func):
def redislock(request, *args, **kwargs):
if paramer_type == "request":
if option_type == "GET":
key_paramer = request.request.query_params.get(lock_key, None)
else:
key_paramer = request.request.data.get(lock_key, None)
else:
key_paramer = kwargs.get(lock_key, None)
lock_name = hashlib.md5(f'{option_name}{key_paramer}'.encode()).hexdigest()
identifier = acquire_lock_with_timeout(lock_name)
if not identifier:
return APIResponse(status=APIState.PARAMTER_ERROR.value, msg='操作太快了,请稍后再试')
try:
return func(request, *args, **kwargs)
finally:
release_lock(lock_name, identifier)
return redislock
return wrapper
reids 处理部分
import json
import logging
import math
import time
import uuid
from decimal import Decimal
import redis
from django.utils.timezone import now
from django_redis import get_redis_connection
from users.models import TCustomerinfo
logger = logging.getLogger(__name__)
conn = get_redis_connection("default")
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return json.JSONEncoder.default(self, obj)
def get_redis_value_by_key(key):
return conn.get(key)
def set_redis_value(key, value, expires):
conn.set(key, value, expires)
def set_cache_data(key, data, TOKEN_EXPIRED_TIME):
cache_data = json.dumps(data, cls=DecimalEncoder)
set_redis_value(key, cache_data, TOKEN_EXPIRED_TIME)
return cache_data
def acquire_lock_with_timeout(lockname, acquire_timeout=3, lock_timeout=5):
logger.info(f'acquire_lock: {lockname}')
# 128位随机标识符。
identifier = str(uuid.uuid4())
lockname = 'lock:' + lockname
# 确保传给EXPIRE的都是整数。
lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end:
# 获取锁并设置过期时间。
if conn.setnx(lockname, identifier):
conn.expire(lockname, lock_timeout)
return identifier
# 检查过期时间,并在有需要时对其进行更新。
elif not conn.ttl(lockname):
conn.expire(lockname, lock_timeout)
time.sleep(.001)
logger.info(f'can not lock: {lockname}')
return False
def release_lock(lockname, identifier):
logger.info(f'release_lock: {lockname}')
pipe = conn.pipeline(True)
lockname = 'lock:' + lockname
while True:
try:
# 检查并确认进程还持有着锁。
pipe.watch(lockname)
#取得锁值
lockname_value = pipe.get(lockname)
if lockname_value:
lockname_value = lockname_value.decode()
if lockname_value == identifier:
# 释放锁。
pipe.multi()
pipe.delete(lockname)
pipe.execute()
logger.info("释放结束")
return True
pipe.unwatch()
break
# 有其他客户端修改了锁;重试。
except redis.exceptions.WatchError:
pass
# 进程已经失去了锁。
return False
使用方式:
class OrderLogisticsDetailsView(APIView):
"""
get
express_id:
express_no:
"""
@lock_operation(option_name="WL", paramer_type="kwargs", option_type="GET", lock_key="express_no")
def get(self, request, *args, **kwargs):
express_no = kwargs.get("express_no", None)
express_id = kwargs.get("express_id", None)
if (not express_id is None) and (not express_no is None):
Logisticsinfo_ins = TLogisticsinfo.objects.filter(ls_id=express_id).first()
ret = query_logistics_information(Logisticsinfo_ins, express_no)
if ret:
return ret
return APIResponse(status=APIState.PARAMTER_ERROR.value)