认证源码分析
#1、APIAPIView #进来 #2、APIAPIView类中有很多方法,找到dispatch方法 ,请求来了肯定要走dispatch def dispatch(self, request, *args, **kwargs): request = self.initialize_request(request, *args, **kwargs) #request已经变了 self.initial(request, *args, **kwargs) #这个request是包装后的 #3、来到initial方法、权限、频率等都在该方法中 def initial(self, request, *args, **kwargs): #包装后的request self.perform_authentication(request) #认证 self.check_permissions(request) self.check_throttles(request) #4、来到perform_authentication, def perform_authentication(self, request): request.user #此时就要去包装后的request中找user #----------------------------------上边都是APIAPIView类中的方法 #----------------------------------下边看都是Request类中的方法 #5、点from rest_framework.request import Request进来,找到user方法 @property def user(self): self._authenticate() #执行_authenticate方法 #6、来到_authenticate def _authenticate(self): for authenticator in self.authenticators: #循环取值,authenticators是什么 user_auth_tuple = authenticator.authenticate(self) #7、它是实例化的时候传过来的,那么实例化是在什么时候完成的?是在第2步request包装的时候完成的,实例化的时候,此时来到第二步,找到request包装的过程 def __init__(self, request, parsers=None, authenticators=None, negotiator=None, parser_context=None): self.authenticators = authenticators or () #----------------------------又回到了APIAPIView中 #8、此时来到第二步,找到request包装的过程,点进去,注意此时来到了APIAPIView中 def initialize_request(self, request, *args, **kwargs): return Request( return Request( request, parsers=self.get_parsers(), #解析的 authenticators=self.get_authenticators(), #现在重点关注这,这里的self是谁?是APIAPIView,其实是我们写的类,我们继承了APIAPIView negotiator=self.get_content_negotiator(), #分页 parser_context=parser_context #解析 #9、自己写的类中没有get_authenticators方法,来它的父类APIAPIView找 def get_authenticators(self): return [auth() for auth in self.authentication_classes] #直接点进去 #10、同样是APIAPIView中的authentication_classes class APIView(View): authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES #11、先不管api_settings是什么,如果我们自己类中写一个authentication_classes, #如下 注意:来到了view.py class Auth(): pass class Login(APIView): authentication_classes=[Auth,] #那么第9步会从这里拿到Auth,并加括号实例化产生对象 def get(self,request,*args,**kwargs): ret = models.Book.objects.all() #12、往回看,第8步 authenticators=self.get_authenticators() #authenticators其实就是列表中包含一个一个对象 #[Auth对象,,,] #--------------------------回到 Request看 #13、看5,6步,先看第5步 for authenticator in self.authenticators: #把[Auth对象,,,]列表中的对象一个一个取出来 user_auth_tuple = authenticator.authenticate(self) #调用authenticator 的 authenticate方法,也就是Auth对象的方法, #所以在自己的Auth方法中把pass去掉,写authenticate方法,, #注意注意,这里的authenticate(self)带有self参数,正常情况下,authenticator对象,调用自己的方法,不需要传参,这里传了,这里的self。是谁?是Request对象,所以,我们在写Auth中的authenticate时也需要传参 class Auth(): def authenticate(self,request):
源码分析
def _authenticate(self): for authenticator in self.authenticators: #已经知道是一个一个对象 #执行authenticate(自己写的) user_auth_tuple = authenticator.authenticate(self) if user_auth_tuple is not None: # self._authenticator = authenticator self.user, self.auth = user_auth_tuple # return
自己手写验证
class UserInfo(models.Model): name = models.CharField(max_length=32) pwd = models.CharField(max_length=32) # 用户token class UserToken(models.Model): token = models.CharField(max_length=64) user = models.OneToOneField(to=UserInfo)
from django.shortcuts import render from rest_framework.views import APIView from app01 import models from django.core.exceptions import ObjectDoesNotExist import hashlib import time from django.http import JsonResponse from app01 import MySerializer # Create your views here. def get_token(name): #写一个生成token的方法 # 生成一个md5对象 md5 = hashlib.md5() # 往里添加值,必须是bytes格式 # time.time()生成时间戳类型,转成字符串,再encode转成bytes格式 md5.update(str(time.time()).encode('utf-8')) md5.update(name.encode('utf-8')) return md5.hexdigest() # 登录 class Login(APIView): def post(self, request, *args, **kwargs): response = {'status': 100, 'msg': '登录成功'} name = request.data.get('name') pwd = request.data.get('pwd') try: user = models.UserInfo.objects.get(name=name, pwd=pwd) # 校验通过,登录成功,生成一个随机字符串(身份标识)token token = get_token(name) # 保存到数据库 # update_or_create更新或者创建,因为不可能同一用户访问10次,生成10次 models.UserToken.objects.update_or_create(user=user, defaults={'token': token}) response['token'] = token except ObjectDoesNotExist as e: response['status'] = 101 response['msg'] = '用户名或密码错误' except Exception as e: response['status'] = 102 # response['msg']='未知错误' response['msg'] = str(e) return JsonResponse(response, safe=False)
class Books(APIView): def get(self, request, *args, **kwargs): response = {'status': 100, 'msg': '查询成功'} # 必须登录以后,才能获取数据 # 取出token,取数据库验证,是否登录 token = request.query_params.get('token') ret = models.UserToken.objects.filter(token=token) if ret: # 认证通过,是登录用户 ret = models.Book.objects.all() book_ser = MySerializer.BookSerializer(ret, many=True) response['data'] = book_ser.data else: response['status'] = 101 response['msg'] = '认证不通过' return JsonResponse(response, safe=False)
使用auth
1、认证类(在新建的py文件中)
rom rest_framework.authentication import BaseAuthentication class TokenAuth(BaseAuthentication): #尽量继承,避免抛出异常 def authenticate(self, request): token = request.GET.get('token') token_obj = models.UserToken.objects.filter(token=token).first() if token_obj: return else: raise AuthenticationFailed('认证失败') def authenticate_header(self,request): pass
2、view层
def get_random(name): import hashlib import time md=hashlib.md5() md.update(bytes(str(time.time()),encoding='utf-8')) md.update(bytes(name,encoding='utf-8')) return md.hexdigest() class Login(APIView): def post(self,reuquest): back_msg={'status':1001,'msg':None} try: name=reuquest.data.get('name') pwd=reuquest.data.get('pwd') user=models.User.objects.filter(username=name,password=pwd).first() if user: token=get_random(name) models.UserToken.objects.update_or_create(user=user,defaults={'token':token}) back_msg['status']='1000' back_msg['msg']='登录成功' back_msg['token']=token else: back_msg['msg'] = '用户名或密码错误' except Exception as e: back_msg['msg']=str(e) return Response(back_msg) class Course(APIView): authentication_classes = [TokenAuth, ] def get(self, request): return HttpResponse('get') def post(self, request): return HttpResponse('post')
总结
#局部使用,只需要在视图类里加入: authentication_classes = [TokenAuth, ]
全局使用
#全局使用 #在setting中配置: REST_FRAMEWORK={ 'DEFAULT_AUTHENTICATION_CLASSES':['app01.MyAuth.LoginAuth',] } app01.MyAuth.LoginAuth认证的地址 一般情况下,认证不会写在view中,都是单独写在一个py文件中, #局部禁用,(不可能别人还没有登陆,就需要认证) 在需要认证的类中, authentication_classes = []
#authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES 在读源码的时候,有这么一句,当时的处理是自己写一个authentication_classes #如果步自己写的话,就会用api_settings中的, #一直点进来得到, 'DEFAULT_AUTHENTICATION_CLASSES': ( 'rest_framework.authentication.SessionAuthentication', 'rest_framework.authentication.BasicAuthentication'
不存数据库版的token
#存数据库耗资源,并且需要IO,耗费时间
def create_token(user_id): md5 = hashlib.md5() md5.update(user_id.encode('utf-8')) md5.update(settings.password.encode('utf-8')) #加密盐,在settings中写的字符串 hex = md5.hexdigest() token = hex + '|' + user_id print(token) return token # 登录 class Login(APIView): authentication_classes = [] #局部禁用auth认证,还没登陆,不能验证 def post(self, request, *args, **kwargs): #发送post请求, response = {'status': 100, 'msg': '登录成功'} name = request.data.get('name') pwd = request.data.get('pwd') try: user = models.UserInfo.objects.get(name=name, pwd=pwd) user_info_json = json.dumps({'name': user.name, 'id': user.pk}) # 生产dafgasdewf|{'id':user.pk}的token token = create_token(str(user.pk)) response['token'] = token except ObjectDoesNotExist as e: response['status'] = 101 response['msg'] = '用户名或密码错误' except Exception as e: response['status'] = 102 # response['msg']='未知错误' response['msg'] = str(e) return JsonResponse(response, safe=False)
自己写auth认证
#先看class LoginAuth(BaseAuthentication): #理念就是,把token取出来,切分后,按照原来的加密凡是加密,判断是否相同 def check_token(token): ret = True #两个变量 user_info=None try: ll = token.split('|') # "eef48b787e24381258aa71d0d53615c2,{"id": 1}" md5 = hashlib.md5() #切分后,把|后边的按照原来的加密方式,进行加密,判断是否与|前边的相同 md5.update(ll[1].encode('utf-8')) md5.update(settings.password.encode('utf-8')) #同样需要加密盐,更安全 hex = md5.hexdigest() if not hex == ll[0]: ret = False else: user_info=ll[1] except Exception as e: ret = False return ret,user_info class LoginAuth(BaseAuthentication): # 函数名一定要叫authenticate,接收必须两个参数,第二个参数是request对象 def authenticate(self, request): # 从request对象中取出token(也可以从其它地方取) token = request.query_params.get('token') ret, user_info = check_token(token) #拿到token,调用check_token if ret: return user_info, None # 如果查不到,抛异常 raise exceptions.APIException('您认证失败')