zoukankan      html  css  js  c++  java
  • Python 计算AWS4签名,Header鉴权与URL鉴权

    AWS4 版本签名计算参考

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    #  @Time: 2021/7/24 8:12
    #  @Author:zhangmingda
    #  @File: api_for_aws4_signature.py
    #  @Software: PyCharm
    #  Description:
    
    from urllib.request import quote
    import hashlib
    import hmac
    import datetime
    import requests
    import json
    import base64
    
    class KscClient(object):
        def __init__(self, ak, sk,service, domain, region, use_ssl=False):
            self.ak = ak
            self.sk = sk
            self.host = service + "." + domain
            self.region = region
            self.service = service
            self.protocol = 'https' if use_ssl else 'http'
            self.base_url = self.protocol + '://' + self.host + '/'
    
        # 创建规范化请求查询字符串方法
        def generate_canonical_querystring(self, **params):
            '''
            :param params: 查询字符串字典
            :return: url编码后的查询字符串key1=value1&key2=value2 ...
            '''
            sorted_params = ''
            sorted_param_keys = sorted(params.keys())
            for param in sorted_param_keys:
                # print(param)
                sorted_params += quote(param) + '=' + quote(params.get(param), safe='') + '&'  # 查询字符串中的值任何字符串都进行URL编码
                # print(sorted_params)
            return sorted_params[:-1]
    
        #  生成标准化请求头,请求头大写转小写,排序返回字符串
        def generate_canonical_headers(self, **headers):
            '''
            :param headers: 所有用来签名的请求头
            :return: 返回小写请求头排序后的key:value 字符串,每个key:value 之间
    换行
            '''
            sorted_headers = ''                                         # 准备标准请求头key+value字符串
            sorted_signed_headers = ''                                  # 准备签名的请求头key组合字符串
            header_keys = headers.keys()                                # 获取请求头所有key
            lower_headers = {}                                          # 准备存放小写请求头和值的字典
            for header in header_keys:                                  # 把大写请求头字典都转换为小写请求头字典
                lower_headers[header.lower()] = headers.get(header)
            sorted_lower_header_keys = sorted(lower_headers.keys())     # 把签名的请求头排序
    
            '''排序后的请求头和值进行组合字符串'''
            for lower_header in sorted_lower_header_keys:
                sorted_headers += lower_header + ':' + str(lower_headers[lower_header]).strip() + '
    '
                sorted_signed_headers += lower_header + ";"
            # print("============请求头排序==============")
            # print(sorted_headers)
            # print('------被签名的请求头----')
            # print(sorted_signed_headers)
            # print("==========请求头排序end============")
            sorted_lower_headers = {
                'sorted_headers': sorted_headers,                           # 签名的请求头key 和 value
                'signed_headers': sorted_signed_headers[:-1]                # 被签名的请求头字符串组合,最后一个分号; 不要
            }
            return sorted_lower_headers
    
        # 十六进制请求体Sha256Hash值--->请求体用
        def bytes_data_sha256_hex(self, bytes_data=''.encode()):
            '''
            :param binary_data: 字节码文件
            :return: 返回请求体内容的sha256 哈希值
            '''
            sha256 = hashlib.sha256()
            sha256.update(bytes_data)
            return sha256.hexdigest().lower()
    
        # 构建被签名字符串,把标准请求canonical_request 哈希取16进制用
        def str_sha256_hex(self, string=''):
            '''
            :param string:
            :return: 字符串的sha256哈希值
            '''
            sha256 = hashlib.sha256()
            sha256.update(string.encode())
            return sha256.hexdigest()
    
        # UTC时间字符串工具
        def get_utc_datetime(self):
            now_utc = datetime.datetime.utcnow()
            datetime_utc = now_utc.strftime('%Y%m%dT%H%M%SZ')
            date_utc = now_utc.strftime('%Y%m%d')
            return datetime_utc, date_utc
    
        # 生成规范化请求的字符串方法
        def generate_canonical_request(self, method, encode_uri, canonical_querystring, canonical_headers, signed_headers,payload_sha256_hex):
            ''' 1.1 规范化请求CanonicalRequest计算方法'''
            return method + '
    ' 
                   + encode_uri + '
    ' 
                   + canonical_querystring + '
    ' 
                   + canonical_headers + '
    ' 
                   + signed_headers + '
    ' 
                   + payload_sha256_hex  # 请求体不参与签名 时 : + 'UNSIGNED-PAYLOAD'
    
        # 生成被签名字符串的方法
        def generate_string_tosign(self, algorithm, datetime_utc, credentialscope, canonical_reques_sha256_hex ):
            ''' 1.2 被签名字符串计算方法'''
            return algorithm + '
    ' 
                            + datetime_utc + '
    ' 
                            + credentialscope + '
    ' 
                            + canonical_reques_sha256_hex
    
        # 第二步构建签名key的工具方法
        def encode_string_to_hmac_256_digest(self, encode_salt, msg, digestmod=hashlib.sha256):
            '''
            :param encode_salt: 盐
            :param msg: 字符串信息
            :param digestmod: 摘要算法
            :return: HMAC-SHA256 加盐哈希后的字节
            '''
            digest_maker = hmac.new(encode_salt, msg.encode(), digestmod=digestmod)
            return digest_maker.digest()
    
        # 创建签名KEY的方法
        def generate_signing_key(self, signature_version,sk, date_utc, region, service, termchar='aws4_request'):
            k_secret = signature_version + sk
            k_date = self.encode_string_to_hmac_256_digest(k_secret.encode(), date_utc)
            k_region = self.encode_string_to_hmac_256_digest(k_date, region)
            k_service = self.encode_string_to_hmac_256_digest(k_region, service)
            k_signing = self.encode_string_to_hmac_256_digest(k_service, termchar)
            return k_signing
    
        # 创建签名的方法
        def generate_signature(self, signing_key, string_tosign):
            digest_maker = hmac.new(signing_key, string_tosign.encode(), digestmod=hashlib.sha256)
            hex_signature = digest_maker.hexdigest()
            # print('hex_signature:', digest_maker.hexdigest())
            return hex_signature
    
        # 构建带签名的请求头,或签名url查询字符串
        def get_auth_data(self, method, query_params, append_headers={}, binary_payload=''.encode()):
            # ========================================1. 创建被签名字符串======================================
            # 1.1 构建标准化请求
            method = method             # 1.1.1 HTTP方法
            canonical_uri = '/'         # 1.1.2 资源URI
            encode_uri = quote(canonical_uri, safe="/")  # 1.1.2 将URI编码成%字符串格式 不包含? 后的url查询参数
            payload_sha256_hex = self.bytes_data_sha256_hex(binary_payload)  # 1.1.5 请求体sha256 十六进制HASH值
            datetime_utc, date_utc = self.get_utc_datetime()
            # 准备请求头
            signature_headers = {
                'Host': self.host,
                'X-amz-date': datetime_utc
            }
            signature_headers.update(append_headers)
            # 请求头排序格式化
            sorted_lower_headers = self.generate_canonical_headers(**signature_headers)
            canonical_headers = sorted_lower_headers.get('sorted_headers')      # 1.1.4 小写排序后的请求头key:value
            signed_headers = sorted_lower_headers.get('signed_headers')         # 1.1.5 小写排序后的签名头
            # 1.2 创建信任状
            credentialscope = date_utc + "/" + self.region + "/" + self.service + "/aws4_request"
            # 查询参数字典
            auth_params = {}
            # 签名放在URL中时计算签名传递参数
            if query_params:
                auth_params = {
                    'X-Amz-Algorithm': "AWS4-HMAC-SHA256",
                    'X-Amz-Credential': self.ak + '/' + credentialscope,
                    'X-Amz-Date': datetime_utc,
                    'X-Amz-SignedHeaders': signed_headers,
                }
                auth_params.update(query_params)
                # print('auth_params: ', auth_params)
            auth_in_header_canonical_querystring = self.generate_canonical_querystring(**query_params)      # 1.1.3 通过header传递签名的查询字符串
            auth_in_queryparam_canonical_querystring = self.generate_canonical_querystring(**auth_params)   # 1.1.3 通过URL传递签名的查询字符串
            canonical_request_auth_in_header = self.generate_canonical_request(method, encode_uri, auth_in_header_canonical_querystring, canonical_headers, signed_headers, payload_sha256_hex)
            canonical_request_auth_in_url = self.generate_canonical_request(method, encode_uri, auth_in_queryparam_canonical_querystring, canonical_headers, signed_headers, payload_sha256_hex)
            print("Header传递签名的规范化请求".center(50, "="))
            print(canonical_request_auth_in_header)
            print("canonical_request_auth_in_url".center(50, "*"))
            print(canonical_request_auth_in_header)
            print("规范化化请求done".center(50, "="))
    
            # 1.2 创建被签名字的符串StringToSign
            canonical_request_auth_in_header_sha256_hex = self.str_sha256_hex(canonical_request_auth_in_header)  # 规范化请求的256哈希值
            canonical_request_auth_in_url_sha256_hex = self.str_sha256_hex(canonical_request_auth_in_url)        # 规范化请求的256哈希值
            algorithm = 'AWS4-HMAC-SHA256'
            string_tosign_auth_in_header = self.generate_string_tosign(algorithm, datetime_utc, credentialscope, canonical_request_auth_in_header_sha256_hex)
            string_tosign_auth_in_url = self.generate_string_tosign(algorithm, datetime_utc, credentialscope, canonical_request_auth_in_url_sha256_hex)
    
            print('Header传递签名被签名的字符串'.center(50, '='))
            print(string_tosign_auth_in_header)
            print('URL传递签名被签名的字符串'.center(50, '*'))
            print(string_tosign_auth_in_url)
            print('被签名的字符串Done!'.center(50,'='))
    
            # ========================================2. 创建签名 ======================================
            # 2.1创建签名签名秘钥
            signature_version = 'AWS4'
            signing_key = self.generate_signing_key(signature_version, self.sk, date_utc, self.region, self.service)
            print("签名秘钥:",signing_key)
            # 2.2 创建签名
            signature_for_header = self.generate_signature(signing_key, string_tosign_auth_in_header)
            signature_for_url = self.generate_signature(signing_key, string_tosign_auth_in_url)
            print('Header鉴权的签名:', signature_for_header)
            print('url鉴权的签名:', signature_for_url)
            # ========================================3 请求头或URL 传递签名鉴权和参数 ======================================
            authorization_headers = {
                'Authorization': "AWS4-HMAC-SHA256 Credential="
                                 + self.ak + '/'
                                 + credentialscope + ", "
                                 + "SignedHeaders="
                                 + signed_headers + ', '
                                 + 'Signature=' + signature_for_header,
            }
            # 3.1请求头携带签名,返回带签名的请求头
            authorization_headers.update(signature_headers)
    
            # 3.2 url携带签名返回带签名的URL查询字符串
            request_query_params = {
                'X-Amz-Signature': signature_for_url
            }
            request_query_params.update(auth_params)
            authorization_params = self.generate_canonical_querystring(**request_query_params)
            # print('request_query_params: ', request_query_params)
            # print('authorization_params:',authorization_params)
            auth_data = {
                'authorization_headers': authorization_headers,
                'authorization_params': request_query_params,
                'signature_headers': signature_headers,  # get object 在URL里面传递签名 请求头要带的签名头
            }
            return auth_data
    
        #  通过标准请求头方式传递签名GET文件
        def get_test_auth_in_header(self, query_params={}, append_headers={}):
            method = 'GET'                                      # 1.1.1 HTTP方法
            auth_data = self.get_auth_data(method, query_params, append_headers)
            authorization_headers = auth_data.get('authorization_headers')
            req = requests.get(url=self.base_url, headers=authorization_headers, params=query_params)
            print('Header鉴权的URL:', req.url)
            print(req.status_code)
            print(req.text)
    
        def get_test_auth_in_query_param(self,query_params={}, append_headers={}):
            method = 'GET'  # 1.1.1 HTTP方法
            auth_data = self.get_auth_data(method, query_params, append_headers)
            auth_query_params = auth_data.get('authorization_params')
            signature_headers = auth_data.get('signature_headers')
            canonical_querystring = self.generate_canonical_querystring(**auth_query_params)
    
            # 通过URL传递鉴权=requests 传参方式1
            url_for_auth = self.base_url + "?" + canonical_querystring
            req2 = requests.get(url=url_for_auth, headers=signature_headers)
            print("手工拼接鉴权的URL:", url_for_auth)
            print(req2.status_code)
            print(req2.text)
            # 通过URL传递鉴权=requests 传参方式2
            req3 = requests.get(self.base_url, headers=signature_headers, params=auth_query_params)
            # req3 = requests.get(url=self.base_url, headers=signature_headers, params=canonical_querystring)
            print("requests自动拼接鉴权的URL:", req3.url)
            print(req3.status_code)
            print(req3.text)
    
    if __name__ == '__main__':
        ak = 'XXXXXXXXXXXXXXX'
        sk = 'XXXXXXXXXXXXXXXXXXXXXXXXX'
        service = 'kcm'
        domain = 'api.ksyun.com'
        region = 'cn-beijing-6'
        ksc = KscClient(ak, sk, service, domain, region)
        query_params = {
            'Action': 'GetDownloadLink',
            'Version': '2016-03-04',
            'CertificateId': 'kcm2021022216204501'
        }
        ksc.get_test_auth_in_header(query_params=query_params)
        # ksc.get_test_auth_in_query_param(query_params=query_params)

    Header 传递鉴权测试

     URL传递鉴权测试

  • 相关阅读:
    phpstorm操作集锦
    图片、音频获取二进制流或url的blob值
    sublime text 3 快捷键
    dd与sql 打印工具
    php生成二维码(可带logo)
    jQuery append加入的元素 绑定事件无效
    Linux运维架构师学习之路
    硬盘安装win7
    Composer安装与使用
    Js循环做法
  • 原文地址:https://www.cnblogs.com/zhangmingda/p/15054612.html
Copyright © 2011-2022 走看看