zoukankan      html  css  js  c++  java
  • 【Django】:日志记录、API认证

    日志记录:

      调用同一个对象,分别记录错误日志和运行日志

    自定义日志类:

    class Logger(object):
        __instance = None           
    
        def __init__(self):
            self.run_log_file = settings.RUN_LOG_FILE
            self.error_log_file = settings.ERROR_LOG_FILE
            self.run_logger = None
            self.error_logger = None
    
            self.initialize_run_log()
            self.initialize_error_log()
    
        def __new__(cls, *args, **kwargs):              # 单例模式
            if not cls.__instance:
                cls.__instance = object.__new__(cls, *args, **kwargs)
            return cls.__instance
    
        @staticmethod
        def check_path_exist(log_abs_file):
            log_path = os.path.split(log_abs_file)[0]
            if not os.path.exists(log_path):
                os.mkdir(log_path)
    
        def initialize_run_log(self):
            self.check_path_exist(self.run_log_file)
            file_1_1 = logging.FileHandler(self.run_log_file, 'a', encoding='utf-8')
            fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s :  %(message)s")
            file_1_1.setFormatter(fmt)
            logger1 = logging.getLogger('run_log')          # 'run_log' 随意写
            logger1.setLevel(logging.INFO)                  # 效果与error里logging.Logger一样
            logger1.addHandler(file_1_1)
            self.run_logger = logger1
    
        def initialize_error_log(self):
            self.check_path_exist(self.error_log_file)
            file_1_1 = logging.FileHandler(self.error_log_file, 'a', encoding='utf-8')
            fmt = logging.Formatter(fmt="%(asctime)s  - %(levelname)s :  %(message)s")
            file_1_1.setFormatter(fmt)
            logger1 = logging.Logger('run_log', level=logging.ERROR)
            logger1.addHandler(file_1_1)
            self.error_logger = logger1
    
        def log(self, message, mode=True):
            """
            写入日志
            :param message: 日志信息
            :param mode: True表示运行信息,False表示错误信息
            :return:
            """
            if mode:
                self.run_logger.info(message)
            else:
                self.error_logger.error(message)
    

    调用方式: 

    if ret['code'] == 1000:
        print(IP,'更新成功')
        Logger().log(ret['message'], True)
    else:
        Logger().log(ret['message'], False)
    

      

    API认证: 

      密钥串+时间戳发送进行认证,超过超时时间认证失败,认证过的密钥过时清空

    客户端:

    import hashlib
    import time
    import requests
    
    class Client(object):
    
        def __init__(self):
            self.key = '299095cc-1330-11e5-b06a-a45e60bec08b'
            self.key_name = 'auth-key'
            self.asset_api = 'http://127.0.0.1:8000/api/'
    
        def auth_key(self):
            """
            接口认证
            """
            ha = hashlib.md5(self.key.encode('utf-8'))      # 用self.key 做加密盐
            time_span = time.time()
            ha.update(bytes("%s|%f" % (self.key, time_span), encoding='utf-8'))
            encryption = ha.hexdigest()
            result = "%s|%f" % (encryption, time_span)
            return {self.key_name: result}
    
        def get_asset(self):
            """
            post方式向街口提交资产信息
            """
            headers = {}
            headers.update(self.auth_key())
            response = requests.get(
                url=self.asset_api,
                headers=headers,
            )
    
            print(response.text)
    
    Client().get_asset()

    服务端:  

    from django.views import View
    import time
    import hashlib
    
    ASSET_AUTH_KEY = '299095cc-1330-11e5-b06a-a45e60bec08b'     #认证的KEY
    ASSET_AUTH_HEADER_NAME = 'HTTP_AUTH_KEY'        # 认证头
    ASSET_AUTH_TIME = 2             # 超时时间
    ENCRYPT_LIST  = []   #存放认证过的key
    
    def api_auth(request):
    
        auth_key = request.META.get(ASSET_AUTH_HEADER_NAME)
        if not auth_key:        # 请求认证头不正确
            return False
        sp = auth_key.split('|')
        if len(sp) != 2:        # 格式不正确
            return False
        encrypt, timestamp = sp
        timestamp = float(timestamp)    # str换成float
        limit_timestamp = time.time() - ASSET_AUTH_TIME
    
        if limit_timestamp > timestamp:     # 当前程序时间与客户端时间戳对比 超时
            return False
        ha = hashlib.md5(ASSET_AUTH_KEY.encode('utf-8'))
        ha.update(bytes("%s|%f" % (ASSET_AUTH_KEY, timestamp), encoding='utf-8'))
        result = ha.hexdigest()
        if encrypt != result:           # md5值校验
            return False
    
        exist = False
        del_keys = []
        for k, v in enumerate(ENCRYPT_LIST):   # 记录当前认证,如果在过期内已经认证过,则认证失败,另过期认证记录删除
            m = v['time']
            n = v['encrypt']
            if m < limit_timestamp:
                del_keys.append(k)
                continue
            if n == encrypt:
                exist = True
        for k in del_keys:
            del ENCRYPT_LIST[k]
    
        if exist:
            return False
        ENCRYPT_LIST.append({'encrypt': encrypt, 'time': timestamp})
        return True
    
    
    class Api(View):
    
        def get(self, request):
            result = api_auth(request)
            if result:
                return HttpResponse('认证成功')
            else:
                return HttpResponse('去你的吧')
    

      

  • 相关阅读:
    wc 统计程序
    读C#程序
    VS2013 单元测试
    android开发心得之知识的量变到质变
    大学第一篇博客
    团队作业七
    团队作业六
    团队作业五
    团队作业四
    团队作业三
  • 原文地址:https://www.cnblogs.com/lianzhilei/p/6513230.html
Copyright © 2011-2022 走看看