zoukankan      html  css  js  c++  java
  • 使用蓝鲸平台登录态验证接入第三方系统

    使用蓝鲸平台登录态验证接入第三方系统

    蓝鲸智云平台登录态验证, 验证用户登录的状态,如验证登录token是否合法、过期、是否注销退出等。

    本文内容主要是探索蓝鲸bk_token源码验证流程,以及基于验证流程简单实现一个第三方系统接入蓝鲸开发的系统应用中。

    其实蓝鲸平台本身有接入其他系统的方式,但那是蓝鲸整个框架的接入方式,而不是其中一个应用的接入。现在框架维护不归我们管,所以。。。蓝鲸官方文档

    获取bk_token

    可直接通过请求request对象获取到bk_token:

    bk_token = request.COOKIES.get("bk_token", "")
    

    bk_token验证接口

    即蓝鲸bk_token合法性验证接口

    • url: ../login/accounts/is_login/

    • method: GET

    • params: bk_token

    • 接口返回:验证成功

      {
          "message": "User authentication succeeded",
          "code": "00",
          "data": {
              "username": "admin"
          },
          "result": True
      }
      
    • 接口返回: 验证失败

      {
          "message": "auth fail reason",
          "code": "1200",
          "data": {},
          "result": False
      }
      

    bk_token验证源码流程解析

    源码文件在项目下: ../blueapps/account/components/bk_token/backends.py

    1. 请求到达路由前,先经过中间件blueapps.account.middlewares.LoginRequiredMiddlwware, 会发现其实是走 blueapps.account.components.bk_token.middlewares.LoginRequiredMiddlwware 中间件。

      中间件源码(部分):

      class LoginRequiredMiddlwware(MiddlewareMixin):
          def process_view(self, request, view, args, kwargs):
              """
              Login paas by two ways
              1. views decorated with 'login_exempt' keyword
              2. User has logged in calling auth.login
              """
              if hasattr(request, 'login_exempt', False):
                  return None
              
              form = AuthenticationForm(request.COOKIES)		# 获取cookies并Form检验
              
              if form.is_valid():
                  bk_token = form.cleaned_data['bk_token']
                  
                  # ======== 进行bk_token验证,以及user对象封装,见第2步 ========
                  user = auth.authenticate(request=request, bk_token=bk_token)
                  
                  # 判断user对象,不成功
                  if user is not None and user.username != request.user.username:
                      auth.login(request, user)
                      
                  return None
              
              handler = ResponseHandler(ConfFixture, settings)
              return handler.build_401_response(request)
      
    2. 进行bk_token验证, 以及user对象封装

      authenticate源码:

      class TokenBackend(ModelBackend):
          def authenticate(self, request=None, bk_token=None):
              logger.debug(u"Enter in TokenBackend")
              # 判断是否传入验证所需的bk_token,没传入则返回None
              if not bk_token:
                  return None
      		# ======== bk_token验证,返回验证结果以及用户名,见第3步 ========
              verify_result, username = self.verify_bk_token(bk_token)
              
              # 判断bk_token是否验证通过,不通过则返回None
              if not verify_result:
                  return None
      		
              # ======== 验证用户,将用户信息、权限等注入user对象中 ========
              user_model = get_user_model()
              try:
                  user, _ = user_model.objects.get_or_create(username=username)
                  get_user_info_result, user_info = self.get_user_info(bk_token)
                  # 判断是否获取到用户信息,获取不到则返回None
                  if not get_user_info_result:
                      return None
                  user.set_property(key="qq", value=user_info.get("qq", ""))
                  user.set_property(key="language", value=user_info.get("language", ""))
                  user.set_property(key="time_zone", value=user_info.get("time_zone", ""))
                  user.set_property(key="role", value=user_info.get("role", ""))
                  user.set_property(key="phone", value=user_info.get("phone", ""))
                  user.set_property(key="email", value=user_info.get("email", ""))
                  user.set_property(key="wx_userid", value=user_info.get("wx_userid", ""))
                  user.set_property(key="chname", value=user_info.get("chname", ""))
      
                  # 用户如果不是管理员,则需要判断是否存在平台权限,如果有则需要加上
                  if not user.is_superuser and not user.is_staff:
                      role = user_info.get("role", "")
                      is_admin = True if str(role) == ROLE_TYPE_ADMIN else False
                      user.is_superuser = is_admin
                      user.is_staff = is_admin
                      user.save()
      
                  return user
      
    3. 真正验证bk_token源码:

      verify_bk_token源码:

      class TokenBackend(ModelBackend):
          
      	@staticmethod
          def verify_bk_token(bk_token):
              """
              请求VERIFY_URL,认证bk_token是否正确
              @param bk_token: "_FrcQiMNevOD05f8AY0tCynWmubZbWz86HslzmOqnhk"
              @type bk_token: str
              @return: False,None True,username
              @rtype: bool,None/str
              """
              # bk_token参数
              api_params = {"bk_token": bk_token}
      
              try:
                  # ======== 请求验证,返回JSON数据,见第4步 ========
                  response = send(ConfFixture.VERIFY_URL, "GET", api_params, verify=False)
              except Exception:  # pylint: disable=broad-except
                  logger.exception(u"Abnormal error in verify_bk_token...")
                  return False, None
              
      		# ========= 对结果进行解析,返回result,username ========
              if response.get("result"):
                  data = response.get("data")
                  username = data.get("username")
                  return True, username
              else:
                  error_msg = response.get("message", "")
                  error_data = response.get("data", "")
                  logger.error(
                      u"Fail to verify bk_token, error={}, ret={}".format(
                          error_msg, error_data
                      )
                  )
                  return False, None
      
    4. send源码:请求is_login/,返回结果JSON数据

      def send(url, method, params, timeout=None, **kwargs):
          """
          统一请求处理,定制化参数, GET 参数使用 form-data,POST 参数使用 json 字符串,返回内容
          要求为 JSON 格式
      
          @exception
              ApiResultError:非 JSON 返回,抛出 ApiResultError
              ApiNetworkError: 请求服务端超时
      
          @param url:string,请求 URL
          @param method:string,请求方法,目前仅支持 GET、POST
          @param params:dict,请求参数 KV 结构
          @param timeout: float,服务器在 timeout 秒内没有应答,将会引发一个异常
          """
          session = requests.session()
      	# ======== 请求is_login验证 ========
          try:
              if method.upper() == "GET":
                  response = session.request(
                      method="GET", url=url, params=params, timeout=timeout, **kwargs
                  )
              elif method.upper() == "POST":
                  session.headers.update({"Content-Type": "application/json; chartset=utf-8"})
                  response = session.request(
                      method="POST",
                      url=url,
                      data=json.dumps(params),
                      timeout=timeout,
                      **kwargs
                  )
              else:
                  raise Exception(_(u"异常请求方式,%s") % method)
          except requests.exceptions.Timeout:
              err_msg = _(u"请求超时,url=%s,method=%s,params=%s,timeout=%s") % (
                  url,
                  method,
                  params,
                  timeout,
              )
              raise ApiNetworkError(err_msg)
      
      	# ======== 返回JSON数据 =========
          try:
              return response.json()
          except Exception:  # pylint: disable=broad-except
              err_msg = _(
                  u"返回内容不符合 JSON 格式,url=%s,method=%s,params=%s,error=%s," u"response=%s"
              ) % (
                  url,
                  method,
                  json.dumps(params),
                  traceback.format_exc(),
                  response.text[:1000],
              )
              raise ApiResultError(err_msg)
      

    第三方系统接入到蓝鲸开发的应用示例

    将B系统(第三方)接入到蓝鲸应用A系统中,即用户登录了A系统,在前端点击按钮就能够直接跳转到B系统中,不需要再次登录B系统。思路是:蓝鲸应用A系统携带bk_token和username请求B系统,B系统会向蓝鲸平台请求验证bk_token是否合法,验证成功后跳转到B系统首页,失败跳转B系统登录页面。

    当然有一个前提: 蓝鲸应用A系统和三方B系统必须拥有相同的用户

    流程图如下:

    蓝鲸应用A系统

    • url: http://127.0.0.1:8000/get_b_server/

    • method: GET

    • view:

      def get_b_server(request):
          """B系统接入"""
          b_url = "http://127.0.0.1:8888/bk_server"	# b系统的验证URL
          username = request.user.username
          bk_token = request.COOKIES.get("bk_token", "")
          
          json_data = dict()
          if bk_token:
              json_data["status"] = 200
              json_data["message"] = "token get success"
          else:
              json_data["status"] = 500
              json_data["message"] = "token is None or get failed"
              
          json_data["data"] = {"b_url": b_url, "bk_token": bk_token, "username": username}
          
          return JsonResponse(json_data)
      
    • 接口返回说明:有bk_token

      {
          "status": 200,
          "message": "token get success",
          "data": {
              "b_url" : "http://127.0.0.1:8888/bk_server",
              "bk_token": "4jZo98ZQ5o2gdky...",
              "username": "admin"
          }
      }
      
    • 接口返回说明: 无bk_token

      {    
          "status": 500,    
          "message": "token is None or get failed",    
          "data": {        
              "b_url" : "http://127.0.0.1:8888/bk_server",        
              "bk_token": "",        
              "username": "admin"    
          }
      }
      
    • HTML中 Ajax 获取数据,成功后打开新标签页并向B系统发送请求

      $("#test").on("click", function () {    
          $.ajax({        
              url: "http://127.0.0.1:8000/get_b_server",        
              type: "get",        
              success: function (res) {            l
                  et data = res.data;            
                  if (res.status == 200){                
                      // 打开新页面                
                      window.open(data.b_url + "?username=" + data.username + "&bk_token=" + data.bk_token, "_blank")            
                  }else{                
                      layer.msg(res.message)            
                  }        
              }    
          })
        })
      

    B系统

    • url: http://127.0.0.1:8888/bk_server

    • method: GET

    • view:

      BK_VERIFY_URL = "蓝鲸平台/login/accounts/is_login/"		# 蓝鲸平台bk_token验证 URL
      def bk_server(request):    
          """蓝鲸平台登录态bk_token验证"""        
          bk_token = request.GET.get("bk_token")    
          username = request.GET.get("username")        
          res = requests.get(url=BK_VERIFY_URL, params={"bk_token": bk_token}, verify=False)	# 登录态bk_token验证,返回参数见验证接口解析    
          res = res.json()    
          result = res.get("result")    
          data = res.get("data")        
          # 结果验证    
          if result and data.get("username") == username:        
              # B系统用户验证        
              login_obj = User.objects.filter(username=username).values        
              if login_obj:            
                  request.session["user"] = login_obj[0]	# 设置session                        
                  return redirect("home")		# 首页            
          return redirect("login")	# 验证失败跳登录页            
      
  • 相关阅读:
    Introduction to Oracle9i: SQL left join 和 left outer join 的区别
    ORACLE10G RMAN 命令
    Oracle管理与维护.手工创建数据库以及脚本
    RMAN 备份基本用法
    ASM 常用概念解释
    oracle学习笔记之二:数据类型之DATETIME 收藏
    10g中表监控与statistics_level
    Oracle 学习笔记: RMAN常用命令
    Oracle 进程类别
    ORACLE TRUNC()函数
  • 原文地址:https://www.cnblogs.com/yzm1017/p/14871730.html
Copyright © 2011-2022 走看看