zoukankan      html  css  js  c++  java
  • API 接口认证与传输数据加密

    应用场景

    cmdb 这类项目的资产入库等操作,当agent 与server 端通过api 进行数据交互时,为了安全采取了两项安全措施:1、server 端需要对agent 端进行身份验证(避免有冒充agent 请况);2、当agent 采集数据提交到server 端过程需要对数据加密,数据传输到agent 端时再数据解密(防止数据传输过程被截取,泄露数据)。

    api 身份认证

    认证原理

    server 与 agent保存相同的字符串密钥,agent 通过密钥生成签名值,把数据与签名值发送给server 端,server 根据自己的配置文件保存的密钥生成签名值与agent 端签名值比对,两者一致则认证通过否则失败。

    安全加强(只针对数据传输中签名被截取安全隐患,暂不考虑server与agent 本地保存的字符串密钥安全隐患)

    签名值一次性

    agent 每次发送过来的签名值都会被server 记录,每次发送的签名值如果已经使用过则认证失败(防止传输中被截取后黑客可以永久使用被截取的签名密钥来认证)

    动态生成签名

    由于“签名值一次性”所以签名密钥必须动态生成,每次用不同的密钥。agent 每次生成签名前生成一个时间值,把时间与保存的字符串密钥组合在一起再生成一个签名值。签名值变成了动态。

    签名超时设置

    agent 每次发送签名与生成签名的时间到server ,server 端接收时会再本段生成一个时间,将两个时间值比较,当时间差超过10s ,则认证失败。(“签名值一次性”中对认证过的签名值记录在了字典中以{"client_time":"签名密钥"}形式存储,为了避免此字典无线增大占用内存,所以会对字典中的时间判断,超过10s 的数据进行删除,但是删除的签名数据再有相同的签名数据来认证时就不能通过“签名一次性”来排除了,所以“签名超时设置”解决了此问题)

     代码

    agent 端

    import requests
    import time
    import hashlib
    
    
    def gen_sign(ctime):
        key = "uiakjsdfasjdf898"                                 #此字符串在server端也保存一份   
        val = "%s|%s" %(key,ctime)                           #加入时间,动态生成签名密钥
        obj = hashlib.md5()                                        #生成一个对象
        obj.update(val.encode('utf-8'))                        #md5只能对字节形式的数据进行加密
        return obj.hexdigest()
    
    ctime = int(time.time()*1000)                              #*1000单位变ms,int 转化为整数(去掉小数点后的数字,而不是四舍五入)
    result = requests.post(
        url = 'http://127.0.0.1:8000/api/test/',              #注意路径结尾一定更要以/结尾
        params = {'sign':gen_sign(ctime),'ctime':ctime} #相当于在url = 'http://.../?key="alskdjflskdfjkjk"',也就是在url中以get形式发送数据
    )
    print(result.url,result.text)
    
    #认证逻辑:agent 把签名密钥与生成签名密钥的时间一并发送到server端,server 端通过自身保存的key与agent 的时间生成签名值,两个签名比对进行认证
    View Code

    server 端(view.py)

    import json
    import hashlib                                    #此模块里有md5的类
    import time
    from django.shortcuts import render,HttpResponse
    from django.views import View
    from django.conf import settings
    from rest_framework.views import APIView
    from rest_framework.response import Response
    
    #生成签名的函数
    def gen_sign(ctime):
        """
        生成签名
        :param ctime:
        :return:
        """
        val = '%s|%s' %(settings.URL_AUTH_KEY,ctime,)
        obj = hashlib.md5()                                #实力化一个md5对象
        obj.update(val.encode('utf-8'))                    #md5对象只能对bytes格式数据加密
        return obj.hexdigest()                            #返回生成的签名值
        
    #所有的验证成功的签名密钥都会记录在SIGN_RECORD 中
    SIGN_RECORD = {}
    
    class TestView(APIView):
        def post(self,request):
            print("请求来了")
            #由于继承了APIView所以此处的request 不是django原生的request,原生的为request._request
            client_sign = request._request.GET.get('sign')              #获取agent发来的签名密钥
            client_ctime = int(request._request.GET.get('ctime'))       #获取agent 发来的时间
            server_time = int(time.time() * 1000)                       #记录当前时间,*1000单位变成ms
            
            #认证第一关
            if server_time - client_ctime > 5000:                       #比对两个时间,超过5s,则认证失败
                return Response({'status': 'false', 'error': "路上时间太久了"})
            # 认证第二关
            if client_sign in SIGN_RECORD:                              #已经认证通过的签名会保存在SING_RECORD,新来密钥如果被认证过了就会认证失败
                return Response("签名已经被使用过了")
            #认证第三关
            server_sign = gen_sign(client_ctime)                        #server 端生成签名
            if client_sign != server_sign:                              #密钥比对来通过认证
                return Response({'status':'false','error':403})
            #认证通过进行以下操作
            SIGN_RECORD[client_sign] = client_ctime                     #通过认证的密钥存储起来,下次防止密钥重复使用
    
            #防止SIGN_RECORD无线增大,对于5s 之前的记录删除。对于已经认证通过并存储在该记录中的密钥,超过10s删除后,
            ## 那么认证第二关就失效了,所以认证第一关就是为此而设
            for k in list(SIGN_RECORD.keys()):
                v = SIGN_RECORD[k]
                if server_time - v  > 5000:
                    print("已经超过5s")
                    del SIGN_RECORD[k]
            return Response({'status':'true','data':666})
    View Code

    数据加密

    加密原理

    通过rsa 模块生成公钥和私钥分别保存在agent 与server 端,agent 通过公钥加密数据传输到server 端通过私钥解密。rsa 加密分为1024与2048两种,生成公私钥对象时指定参数即可 pub_key_obj, priv_key_obj = rsa.newkeys(1024) ,1024指的是能加密的数据的位数,换算成字节就是1024/8=128 bytes

    代码

    生成公私钥,分别保存在server/agent 两端

    # ######### 1. 生成公钥私钥 #########
    pub_key_obj, priv_key_obj = rsa.newkeys(1024)                 # 1024/8 = 128 ,128 - 11 = 117
    # 公钥字符串
    pub_key_str = pub_key_obj.save_pkcs1()
    pub_key_code = base64.standard_b64encode(pub_key_str)
    
    # 私钥字符串
    priv_key_str = priv_key_obj.save_pkcs1()
    priv_key_code = base64.standard_b64encode(priv_key_str)
    
    将生成的公私钥(经过base64.standard_b64encode处理的)分别保存在agent/server 配置文件settings.py中
    
    原生的公钥:
        b'-----BEGIN RSA PUBLIC KEY-----
    MIGJAoGBAJo2DEaukeIBTvc5vscIrh0gU79N+XRrf6NBGxGi6eOh7muzH3VV7UIn
    ZvfUE3Nxu97DiMAC1u2JEudM8iatMChSLxSh9qFNB36ejz7dCi9DrAH6Ce46JZ7h
    +iwlo9x7Qr4uLJrQsHhia4/i89aAooNV6I8Ne+WOe3V1PvEUs+PhAgMBAAE=
    -----END RSA PUBLIC KEY-----
    '
    base64.standard_b64encode编码处理后:
        b'LS0tLS1CRUdJTiBSU0EgUFVCTElDIEtFWS0tLS0tCk1JR0pBb0dCQUpvMkRFYXVrZUlCVHZjNXZzY0lyaDBnVTc5TitYUnJmNk5CR3hHaTZlT2g3bXV6SDNWVjdVSW4KWnZmVUUzTnh1OTdEaU1BQzF1MkpFdWRNOGlhdE1DaFNMeFNoOXFGTkIzNmVqejdkQ2k5RHJBSDZDZTQ2Slo3aAoraXdsbzl4N1FyNHVMSnJRc0hoaWE0L2k4OWFBb29OVjZJOE5lK1dPZTNWMVB2RVVzK1BoQWdNQkFBRT0KLS0tLS1FTkQgUlNBIFBVQkxJQyBLRVktLS0tLQo='
    View Code

    agent 端

    import rsa
    import base64
    from config import settings
    #数据加密函数
    def encrypt(value_bytes):
        """
        rsa 公钥加密
        :param value_bytes: 要加密的字节
        :return:
        """
        key_str = base64.standard_b64decode(settings.PUB_KEY)    #settings.py 中记录了公钥
        pk = rsa.PublicKey.load_pkcs1(key_str)
        #rsa 1024 能加密的数据大小为128(1024/8)字节,rsa 本身数据占用11字节,所以对大于128字节的数据分批次加密(每次加密117(128-11)字节)最后加密的数据拼接即可。
        data_list = []
        for i in range(0,len(value_bytes),117):
            chunk = value_bytes[i:i+117]
            result = rsa.encrypt(chunk, pk)
            data_list.append(result)
    
        return b''.join(data_list)
    
    #进行数据加密,并且发送到agent
    ctime = int(time.time()*1000)
    r1 = requests.post(
         url=self.asset_api,
         params = {'sign':gen_sign(ctime),'ctime':ctime},         #传输签名密钥
         data=encrypt(json.dumps(info).encode('utf-8')),          #传输数据rsa 加密
         headers={'Content-Type':'application/json'}
        )
    View Code

    server 端

    #使用私钥解密的函数
    import rsa
    import base64
    def decrypt(bytes_value):
        """
        rsa解密
        :param bytes_value: 要解密的数据为bytes 类型,因为rsa 只能对bytes 类型数据加密,所以解密数据为bytes类型
        :return: 解密完成的字节
        """
        key_str = base64.standard_b64decode(settings.PRIV_KEY)    #生成密钥后使用base64.standard_b64encode对公钥进行了编码处理,所以此处要用decode 解码
        pk = rsa.PrivateKey.load_pkcs1(key_str)                    #生成私钥证书    
        #rsa 1024 能加密的数据大小为128(1024/8)字节,所以需要对大于128字节的数据分批次加密(每次加密128字节)最后加密的数据拼接即可。所以解密时也是每次解密128字节,最后拼接即可
        result = []
        for i in range(0,len(bytes_value),128):
            chunk = bytes_value[i:i+128]                        
            val = rsa.decrypt(chunk, pk)                        #通过公钥证书对数据chunk 加密
            result.append(val)
        return b''.join(result)
    
    #利用私钥解密的函数解密数据
    class AssetView(APIAuthView):
        def post(self, request, *args, **kwargs):
            body = decrypt(request._request.body)           #通过decrypt 解密数据,body 中才是原生的数据
            asset_info = json.loads(body.decode('utf-8'))   #解密后的数据解码再序列化加载
    View Code
  • 相关阅读:
    简便的将DataSet导入到数据库中
    数据类型的小小研究:Access与SQL Server的数据类型
    【jxust acm 20120708】
    【D ECJTU_ACM 11级队员2012年暑假训练赛(2)】
    【hdu 2101 A + B Problem Too】
    【hdu 1014】
    【hdu 1164 Eddy's research I】
    【开始,安全编程】
    【hdu 1285 确定比赛名次】
    【hdu 1163 Eddy's digital Roots 】
  • 原文地址:https://www.cnblogs.com/fanggege/p/10425528.html
Copyright © 2011-2022 走看看