日志记录:
调用同一个对象,分别记录错误日志和运行日志
自定义日志类:
class Logger(object): __instance = None def __init__(self): self.run_log_file = settings.RUN_LOG_FILE self.error_log_file = settings.ERROR_LOG_FILE self.run_logger = None self.error_logger = None self.initialize_run_log() self.initialize_error_log() def __new__(cls, *args, **kwargs): # 单例模式 if not cls.__instance: cls.__instance = object.__new__(cls, *args, **kwargs) return cls.__instance @staticmethod def check_path_exist(log_abs_file): log_path = os.path.split(log_abs_file)[0] if not os.path.exists(log_path): os.mkdir(log_path) def initialize_run_log(self): self.check_path_exist(self.run_log_file) file_1_1 = logging.FileHandler(self.run_log_file, 'a', encoding='utf-8') fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s : %(message)s") file_1_1.setFormatter(fmt) logger1 = logging.getLogger('run_log') # 'run_log' 随意写 logger1.setLevel(logging.INFO) # 效果与error里logging.Logger一样 logger1.addHandler(file_1_1) self.run_logger = logger1 def initialize_error_log(self): self.check_path_exist(self.error_log_file) file_1_1 = logging.FileHandler(self.error_log_file, 'a', encoding='utf-8') fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s : %(message)s") file_1_1.setFormatter(fmt) logger1 = logging.Logger('run_log', level=logging.ERROR) logger1.addHandler(file_1_1) self.error_logger = logger1 def log(self, message, mode=True): """ 写入日志 :param message: 日志信息 :param mode: True表示运行信息,False表示错误信息 :return: """ if mode: self.run_logger.info(message) else: self.error_logger.error(message)
调用方式:
if ret['code'] == 1000: print(IP,'更新成功') Logger().log(ret['message'], True) else: Logger().log(ret['message'], False)
API认证:
密钥串+时间戳发送进行认证,超过超时时间认证失败,认证过的密钥过时清空
客户端:
import hashlib import time import requests class Client(object): def __init__(self): self.key = '299095cc-1330-11e5-b06a-a45e60bec08b' self.key_name = 'auth-key' self.asset_api = 'http://127.0.0.1:8000/api/' def auth_key(self): """ 接口认证 """ ha = hashlib.md5(self.key.encode('utf-8')) # 用self.key 做加密盐 time_span = time.time() ha.update(bytes("%s|%f" % (self.key, time_span), encoding='utf-8')) encryption = ha.hexdigest() result = "%s|%f" % (encryption, time_span) return {self.key_name: result} def get_asset(self): """ post方式向街口提交资产信息 """ headers = {} headers.update(self.auth_key()) response = requests.get( url=self.asset_api, headers=headers, ) print(response.text) Client().get_asset()
服务端:
from django.views import View import time import hashlib ASSET_AUTH_KEY = '299095cc-1330-11e5-b06a-a45e60bec08b' #认证的KEY ASSET_AUTH_HEADER_NAME = 'HTTP_AUTH_KEY' # 认证头 ASSET_AUTH_TIME = 2 # 超时时间 ENCRYPT_LIST = [] #存放认证过的key def api_auth(request): auth_key = request.META.get(ASSET_AUTH_HEADER_NAME) if not auth_key: # 请求认证头不正确 return False sp = auth_key.split('|') if len(sp) != 2: # 格式不正确 return False encrypt, timestamp = sp timestamp = float(timestamp) # str换成float limit_timestamp = time.time() - ASSET_AUTH_TIME if limit_timestamp > timestamp: # 当前程序时间与客户端时间戳对比 超时 return False ha = hashlib.md5(ASSET_AUTH_KEY.encode('utf-8')) ha.update(bytes("%s|%f" % (ASSET_AUTH_KEY, timestamp), encoding='utf-8')) result = ha.hexdigest() if encrypt != result: # md5值校验 return False exist = False del_keys = [] for k, v in enumerate(ENCRYPT_LIST): # 记录当前认证,如果在过期内已经认证过,则认证失败,另过期认证记录删除 m = v['time'] n = v['encrypt'] if m < limit_timestamp: del_keys.append(k) continue if n == encrypt: exist = True for k in del_keys: del ENCRYPT_LIST[k] if exist: return False ENCRYPT_LIST.append({'encrypt': encrypt, 'time': timestamp}) return True class Api(View): def get(self, request): result = api_auth(request) if result: return HttpResponse('认证成功') else: return HttpResponse('去你的吧')