zoukankan      html  css  js  c++  java
  • python 带参装饰器 +reids ,处理高频操作

    因业务需要写了个python的装饰器与redis同时使用,用于处理高并发操作,防止记录重复操作

    装饰器部分代码

    import hashlib
    from functools import wraps
    from api.api_state import APIState
    from api.base_api import APIResponse
    from utils.redis import acquire_lock_with_timeout, release_lock
    
    
    def lock_operation(option_name: str, lock_key: str, option_type="GET", paramer_type="request"):
        """
        操作加锁防止同一记录暴击
        @param option_name: 操作名称
        @param paramer_type: 取参类型 : request
        @param option_type: 请求类型 :GET,POST
        @param lock_key: 取参关键字
        @return:
        """
    
        def wrapper(func):
            def redislock(request, *args, **kwargs):
                if paramer_type == "request":
                    if option_type == "GET":
                        key_paramer = request.request.query_params.get(lock_key, None)
                    else:
                        key_paramer = request.request.data.get(lock_key, None)
                else:
                    key_paramer = kwargs.get(lock_key, None)
    
                lock_name = hashlib.md5(f'{option_name}{key_paramer}'.encode()).hexdigest()
                identifier = acquire_lock_with_timeout(lock_name)
                if not identifier:
                    return APIResponse(status=APIState.PARAMTER_ERROR.value, msg='操作太快了,请稍后再试')
                try:
                    return func(request, *args, **kwargs)
                finally:
                    release_lock(lock_name, identifier)
    
            return redislock
    
        return wrapper
    
    

    reids 处理部分

    import json
    import logging
    import math
    import time
    import uuid
    from decimal import Decimal
    
    import redis
    from django.utils.timezone import now
    from django_redis import get_redis_connection
    
    from users.models import TCustomerinfo
    
    logger = logging.getLogger(__name__)
    
    conn = get_redis_connection("default")
    
    class DecimalEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, Decimal):
                return float(obj)
            return json.JSONEncoder.default(self, obj)
    
    
    def get_redis_value_by_key(key):
        return conn.get(key)
    
    
    def set_redis_value(key, value, expires):
        conn.set(key, value, expires)
    
    
    def set_cache_data(key, data, TOKEN_EXPIRED_TIME):
        cache_data = json.dumps(data, cls=DecimalEncoder)
        set_redis_value(key, cache_data, TOKEN_EXPIRED_TIME)
        return cache_data
    
    
    def acquire_lock_with_timeout(lockname, acquire_timeout=3, lock_timeout=5):
        logger.info(f'acquire_lock: {lockname}')
        # 128位随机标识符。
        identifier = str(uuid.uuid4())
        lockname = 'lock:' + lockname
        # 确保传给EXPIRE的都是整数。
        lock_timeout = int(math.ceil(lock_timeout))
    
        end = time.time() + acquire_timeout
        while time.time() < end:
            # 获取锁并设置过期时间。
            if conn.setnx(lockname, identifier):
                conn.expire(lockname, lock_timeout)
                return identifier
            # 检查过期时间,并在有需要时对其进行更新。
            elif not conn.ttl(lockname):
                conn.expire(lockname, lock_timeout)
    
            time.sleep(.001)
    
        logger.info(f'can not lock: {lockname}')
        return False
    
    
    def release_lock(lockname, identifier):
        logger.info(f'release_lock: {lockname}')
        pipe = conn.pipeline(True)
        lockname = 'lock:' + lockname
    
        while True:
            try:
                # 检查并确认进程还持有着锁。
                pipe.watch(lockname)
                #取得锁值
                lockname_value = pipe.get(lockname)
                if lockname_value:
                    lockname_value = lockname_value.decode()
    
                if lockname_value == identifier:
    
                    # 释放锁。
                    pipe.multi()
                    pipe.delete(lockname)
                    pipe.execute()
                    logger.info("释放结束")
                    return True
    
                pipe.unwatch()
                break
    
            # 有其他客户端修改了锁;重试。
            except redis.exceptions.WatchError:
                pass
    
        # 进程已经失去了锁。
        return False
    
    

    使用方式:

    class OrderLogisticsDetailsView(APIView):
        """
        get
            express_id:
            express_no:
        """
    
        @lock_operation(option_name="WL", paramer_type="kwargs", option_type="GET", lock_key="express_no")
        def get(self, request, *args, **kwargs):
            express_no = kwargs.get("express_no", None)
            express_id = kwargs.get("express_id", None)
            if (not express_id is None) and (not express_no is None):
                Logisticsinfo_ins = TLogisticsinfo.objects.filter(ls_id=express_id).first()
                ret = query_logistics_information(Logisticsinfo_ins, express_no)
    
                if ret:
                    return ret
    
            return APIResponse(status=APIState.PARAMTER_ERROR.value)
    

    参考链接:
    https://mp.weixin.qq.com/s/nPNowhyNPQah-eQBbsj29Q

  • 相关阅读:
    curl -L 跟随跳转
    Http报头Accept与Content-Type的区别
    curl 发送json请求
    IntelliJ 中类似于Eclipse ctrl+o的是ctrl+F12
    Spring AOP
    Windows下Nginx配置SSL实现Https访问(包含证书生成)
    @Retention n. 保留
    Exchanger使用
    Semaphore使用
    UVA12493
  • 原文地址:https://www.cnblogs.com/YQYC/p/14250184.html
Copyright © 2011-2022 走看看