zoukankan      html  css  js  c++  java
  • Django上自建一个token校验机制

    场景介绍

    自己开发的接口有些需要实名认证,不能直接让匿名用户访问,那如果使用django的login_require装饰器设置在接口方法前面 会直接返回登录页面,所以在这个时候,可以考虑自己开发一个验证机制了,防止匿名用户调用。

    验证流程

    1. 先让用户提交用户密码到获取token的接口
    2. 拿到用户名密码后去后台验证是否有效,如果有效就判断这个用户是否有token在数据库中,有的话就更新这个token,否则就新建一条记录在数据库,然后返回这个新建好的token的给请求者。
    3. 用户拿到刚才请求到的token后,在请求头或者请求参数里面携带这个token。然后再去请求对应的接口。
    4. 请求过来以后,首先自建的验证机制会对token进行校验,去请求参数里面找,参数也没有就返回校验失败,找到token以后就去判断这个token是否存在和过期,token无效、过期的话,返回校验失败,只有校验成功以后,才会放行请求到接口上,此时接口才会真正处理用户的请求。

    需要做的事

    1. 新建一个表
    2. 新建一个创建token的方法
    3. 新建一个校验token的装饰器
    4. 接口上使用装饰器即可

    以上在你的项目中一个models和views编写即可,编写完以后,哪里需要使用,导入这个装饰器即可。

    实操步骤

    以下都运行在python3.5.6 和 django 1.11.13 上,无问题。

    创建数据库
    from django.db import models
    
    # Create your models here.
    from pisces import settings
    from datetime import timedelta,datetime
    from django.contrib.auth.models import User
    import pytz
    import random
    #tzutc_8=pytz.FixedOffset(480)
    #tzutc_8=pytz.timezone("UTC")
    tzutc_8=pytz.timezone('Asia/Shanghai')
    
    class user_token(models.Model):
        user = models.OneToOneField(User,related_name='user_profile')
        utype = models.CharField(max_length=15, default="guest") # ?????? guest access admin
        utoken = models.CharField(max_length=256, null=True,blank=True)
        utokenCreateTime = models.DateTimeField(null=True,blank=True)
        utokenExpire = models.DateTimeField(null=True,blank=True)
    
        def getRandomChar(self,start=None,end=None,length=30):
            '''
            get a random int and then translate to char
            :param start:
            :param end:
            :param length:
            :return:  return a token
            '''
            if ( start and not start.isdigit() ) or not start:
                start = 65
            if ( end and not end.isdigit() ) or not end:
                end = 122
            randomChar = ""
            for i in range(length):
                x = random.randint(start,end)
                # >>> chr(91)
                # '['
                # >>> chr(92)
                # '\'
                # >>> chr(93)
                # ']'
                # >>> chr(94)
                # '^'
                # >>> chr(95)
                # '_'
                # >>> chr(96)
                # '`'
    
                if x == 91 or x == 92 or x == 93 or x == 94 or x == 95 or x == 96:
                    continue
                randomChar = randomChar + chr(x)
            return randomChar
    
        def updateToken(self):
            '''
            update the user's token ,if the token is exist,it will be update ,else the token is not exist,it will be create
            :return:
            '''
            utokenDuration = settings.UTOKEN_DURATION
            if self.utokenCreateTime:  # means it has created,
                if not  self.utokenExpire:
                    self.utokenExpire = datetime.now() + timedelta(seconds=utokenDuration)
                    self.save()
                differenceTime = self.utokenExpire - self.utokenCreateTime
                if differenceTime.seconds - utokenDuration <= 60: # update the token before 1 min
                    randomChar = self.getRandomChar()
                    self.utoken = randomChar
                    self.utokenExpire = datetime.now()   + timedelta(seconds=utokenDuration)
                    self.save()
                    return {"code":200,"msg":"update utoken successfully!"}
                else:
                    return {"code":301,"msg":"the utoken is not expire!so won't update! "}
            else:
                randomChar = self.getRandomChar()
                self.utoken = randomChar
                self.utokenCreateTime = datetime.now()
                self.utokenExpire = datetime.now()  + timedelta(seconds=utokenDuration)
                self.save()
                return {"code":200,"msg":"create utoken successfully!"}
    
        def checkUtokenIsValid(self,utoken):
            '''
            check the utoken weather is valid or not!
            :return:
            '''
            if self.utokenCreateTime and self.utokenExpire:  # means it has created,
                #print(self.utokenCreateTime.astimezone(tzutc_8),  self.utokenCreateTime, datetime.now())
                if self.utokenExpire.astimezone(tzutc_8) <=  pytz.utc.localize(datetime.now()): # update the token before 1 min
                    return {"code":400,"msg":"the utoken was expire! please update it! exipre time:%s , system time:%s"%(self.utokenExpire.astimezone(tzutc_8).strftime("%Y-%m-%d %H:%M:%S"),pytz.utc.localize(datetime.now()))}
                else:
                    if utoken == self.utoken:
                        return {"code":200,"msg":"correct"}
                    else:
                        return {"code":401,"msg":"not correct"}
            else:
                return {"code":400,"msg":"the utoken is not create! please authenticate! "}
    
    
        def __str__(self):
            return "%s-%s"%(self.user.username,self.utype)
    
    
    编写装饰器与创建token的方法
    创建token的方法
    from django.shortcuts import render,redirect
    from django.contrib.auth import authenticate
    from django.contrib.auth.models import User
    from center.models import user_token
    from django.http import JsonResponse
    from django.contrib.auth import login as auth_login, logout as auth_logout
    from django.views.decorators.csrf import csrf_exempt
    from pisces.mylog import writeDebug,writeInfo,writeErr
    
    @csrf_exempt
    def get_utoken(request):
        '''
        get the user's token
        :param request:
        :return:
        '''
        if request.method == 'POST':
            username = request.POST.get('username', '')
            password = request.POST.get('password', '')
            user = authenticate(username=username, password=password)
            if user:
                writeDebug("center/views","get_utoken","the user[%s] login successfully"%username)
                auth_login(request, user, )
                userObj = User.objects.filter(username=username)
                if not userObj:
                    writeDebug("center/views","get_utoken","no userObj[%s],begin to create the user"%username)
                    ux = User.objects.create_user(username,"%s@chanjet.com"%username,password)
                    utk = user_token(user=ux)
                    utk.save()
                    utkobj = user_token.objects.filter(user=ux).first()
                    updateTokenMsg = utkobj.updateToken()
                    utoken = utkobj.utoken
                else:
                    utkobj = user_token.objects.filter(user=userObj)
                    if not utkobj:
                        utkobj_save = user_token(user=userObj.first())
                        utkobj_save.save()
                        #utkObj = user_token.objects.filter(user=userObj)
                    updateTokenMsg = utkobj.first().updateToken()
                    utkobj.first().save()
    
                    utoken = utkobj.first().utoken
                writeInfo("frame/views","login","the username[ {username} ] login and create token successfully!".format(username=username))
                return JsonResponse({"utoken":utoken,"code":200,"msg":updateTokenMsg.get("msg")})
            writeInfo("frame/views","login","the username[ {username} ] or password is invalied,can't login!".format(username=username))
            return JsonResponse({"code":401,"msg":"the username or password is not correct!"})
        else:
            return JsonResponse({"code":402,"msg":"only post method is support"})
    
    校验token的装饰器
    from django.shortcuts import render,redirect
    from django.contrib.auth import authenticate
    from django.contrib.auth.models import User
    from center.models import user_token
    from django.http import JsonResponse
    from django.contrib.auth import login as auth_login, logout as auth_logout
    from django.views.decorators.csrf import csrf_exempt
    from pisces.mylog import writeDebug,writeInfo,writeErr
    def checkUserToken(func):
        '''
        check if the user has the token
        :param func:
        :return:
        '''
        def warper(*args,**kwargs):
            try:
                utoken = args[0].POST.get("utoken")
                if utoken is None:
                    utoken = args[0].GET.get("utoken")
            except BaseException as e:
                try:
                    utoken = args[0].GET.get("utoken")
                except BaseException as e:
                    return JsonResponse({"code":400,"msg":"can't get utoken! please check params"})
    
            if not utoken:
                return JsonResponse({"code":400,"msg":"can't get utoken! please check params"})
                
            userObj = user_token.objects.filter(utoken=utoken)
            if userObj:
                ux = userObj.first()
                result = ux.checkUtokenIsValid(utoken)
                if result.get("code") == 200:
                    kwargs['username'] = ux.user.username
                    return func(*args,**kwargs)
                else:
                    return JsonResponse(result)
            else:
                return JsonResponse({"code":404,"msg":"can't find the user by utoken--> %s"%(utoken)})
        return warper
    

    使用装饰器

    
    from center.views import checkUserToken
    from django.http.response import JsonResponse
    
    @checkUserToken
    @csrf_exempt
    def create_zk_with_zkjob(request):
        '''
        
        :param request:
        :return:
        '''
        // 请求参数该怎么获取就怎么获取,并没有改变参数的获取方式
        // 以下替换成你的逻辑代码即可。
        post_env = request.POST.get("env")
        post_cxxx = request.POST.get("cxxx")
        post_project_name = request.POST.get("project_enname")
        zkop = core.zkopreation(post_env)
        zknodename = "%s%s_%s/foo"%(zkop.zkRootPath,post_cxxx,post_project_name)
        zkop_result = zkop.createPath(zknodename)
        return JsonResponse(zkop_result)
    
  • 相关阅读:
    BSS Audio® Introduces Full-Bandwidth Acoustic Echo Cancellation Algorithm for Soundweb London Conferencing Processors
    转:虚拟运营商颠覆八大行业 170号码将成主流?
    移动通信调制技术的进展 转
    转:瑞利信道,莱斯信道和高斯信道模型
    转:Android开发之旅:环境搭建及HelloWorld
    web端视频直播网站的弊端和优势
    频域分辨率与DFT,DCT,MDCT理解
    转:超声波支付
    谈音频算法技术研发团队建立
    转:HTML5标准与性能之四:asm.js
  • 原文地址:https://www.cnblogs.com/liaojiafa/p/12547584.html
Copyright © 2011-2022 走看看