zoukankan      html  css  js  c++  java
  • 微信支付 jsapi v3 签名 和 验签

    import json
    import time
    import random
    import string
    import base64
    # 创建订单
    import pprint
    
    import requests
    from Cryptodome import Hash
    from Cryptodome.PublicKey import RSA
    from Cryptodome.Signature import PKCS1_v1_5
    from Cryptodome.Hash import SHA256
    from cryptography.hazmat.primitives.ciphers.aead import AESGCM
    
    import config
    from app import logger
    
    
    # 签名验签相关
    class ToSign:
        # app/handler/wechat_app/
        try:
            with open("app/handler/wechat_app/apiclient_key.pem", 'r') as f:
                private_key = f.read()
        except Exception:
            with open("apiclient_key.pem", 'r') as f:
                private_key = f.read()
    
    
        @classmethod
        def set_default(cls):
            cls.timestamp = "%.f" % time.time()  # "%.f" % time.time()  # 时间戳
            cls.nonce_str = "".join(random.sample(string.ascii_letters + string.digits, 16))  # 随机字符串
    
        @classmethod
        def set_sign_data(cls, method: str, url: str, body: dict = None):
            """设置默认数据 """
            cls.method = method
            cls.url = url
            if body:
                cls.body = json.dumps(body)  # 转换为json字符串
            else:
                cls.body = ""
    
        @classmethod
        def sign_str(cls):
            """生成欲签名字符串"""
            return str("
    ".join([cls.method, cls.url,
                                  cls.timestamp, cls.nonce_str,
                                  cls.body])+"
    ")
    
        # 签名
        @classmethod
        def sign(cls, sign_str):
            """签名 """
            pkey = RSA.importKey(cls.private_key)
            h = SHA256.new(sign_str.encode('utf-8'))
            signature = PKCS1_v1_5.new(pkey).sign(h)
            sign = base64.b64encode(signature).decode()
            return sign
    
        # 获取请求头authorization
        @classmethod
        def authorization_str(cls):
            sign_ = cls.sign(cls.sign_str())
            """拼接header authorization"""
            authorization = 'WECHATPAY2-SHA256-RSA2048 ' 
                            'mchid="{mchid}",' 
                            'nonce_str="{nonce_str}",' 
                            'signature="{sign}",' 
                            'timestamp="{timestamp}",' 
                            'serial_no="{serial_no}"'. 
                format(mchid=config.mchid,
                       nonce_str=cls.nonce_str,
                       sign=sign_,
                       timestamp=cls.timestamp,
                       serial_no=config.serial_no
                       )
            return authorization
    
        # 验签
        @classmethod
        def check_sign(cls, plain_text: str, sign: str, certificate=None) -> bool:  # 明文、 密文
            # base64 解码
            sign_str = base64.b64decode(sign)
            # 这里采用的是从接获获得的证书,微信支付证书
            signature2 = RSA.importKey(certificate)
            verifier = PKCS1_v1_5.new(signature2)
            digest = Hash.SHA256.new()
            digest.update(plain_text.encode("utf8"))
            return verifier.verify(digest, sign_str)
    
    
    # 解密
    def decrypt(nonce, ciphertext, associated_data):
        """
        解密数据获取平台证书
        https://pay.weixin.qq.com/wiki/doc/apiv3/wechatpay/wechatpay5_1.shtml
        https://pay.weixin.qq.com/wiki/doc/apiv3/wechatpay/wechatpay4_2.shtml
        """
        key = config.Apiv3Key
        key_bytes = str.encode(key)
        nonce_bytes = str.encode(nonce)
        ad_bytes = str.encode(associated_data)
        data = base64.b64decode(ciphertext)
        aesgcm = AESGCM(key_bytes)
        # 解密出来的是加密字符串。取出你想要的数据
        return aesgcm.decrypt(nonce_bytes, data, ad_bytes)
    
    
    # 获取微信支付平台证书, 这个证书用于,验签 https://pay.weixin.qq.com/wiki/doc/apiv3/wechatpay/wechatpay4_1.shtml
    def get_credential():
        # 获取平台证书序列号
        ToSign.set_default()
        ToSign.set_sign_data("GET", "/v3/certificates")
        authorization = ToSign.authorization_str()
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "Authorization": authorization
        }
        res = requests.get("https://api.mch.weixin.qq.com/v3/certificates", headers=headers)
        if res.status_code != 200:
            logger.error("获取获取微信支付平台证书失败")
            logger.error(res.json())
        plain_text, sign, res_data = get_attestation_about(res)
    
        # 进行解密, 获取证书
        # 这里可能会有很多证书,选启用时间最晚的一个
        result_data = max(res_data["data"], key=lambda x: x["effective_time"])
        nonce = result_data["encrypt_certificate"]["nonce"]
        ciphertext = result_data["encrypt_certificate"]["ciphertext"]
        associated_data = result_data["encrypt_certificate"]["associated_data"]
        # 明文串
        str_ = decrypt(nonce=nonce, ciphertext=ciphertext, associated_data=associated_data)
        check_bool = ToSign.check_sign(plain_text=plain_text, sign=sign, certificate=str_)
        if check_bool:
            return str_, result_data
        else:
            logger.error(f"获取支付证书验签失败,plain_text:{plain_text}, sign: {sign}")
            return "", {}
    
    
    # 获取眼前需要的数据
    def get_attestation_about(response):
        time_stamp = response.headers.get("Wechatpay-Timestamp")
        nonce = response.headers.get("Wechatpay-Nonce")
        sign = response.headers.get("Wechatpay-Signature")
        logger.info("time_stamp:"+time_stamp)
        logger.info("nonce:"+nonce)
        logger.info("sign:"+sign)
        try:
            req_data = json.dumps(response.json()).replace(" ", "")
        except Exception:
            req_data = response.json
            req_data = json.dumps(req_data, ensure_ascii=False).replace(" ", "")
        plain_text = f"{time_stamp}
    {nonce}
    {req_data}
    "
        return plain_text, sign, json.loads(req_data)  # 加密字符串, 签名, body->json
    

    调用

    # 签名
    ToSign.set_default()
    ToSign.set_sign_data("POST", "/v3/pay/transactions/jsapi", data)
    
    authorization_str = ToSign.authorization_str()
    headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "Authorization": authorization_str
        }
    res_data = requests.post(url, json=data, headers=headers)
    if 'prepay_id' not in res_data.json():
        return {"success": False, "message": "调取微信支付失败"}
    
    # 验签  https://pay.weixin.qq.com/wiki/doc/apiv3/wechatpay/wechatpay5_1.shtml
    """
    1554209980
    c5ac7061fccab6bf3e254dcf98995b8c
    {"data":[{"serial_no":"5157F09EFDC096DE15EBE81A47057A7232F1B8E1","effective_time":"2018-03-26T11:39:50+08:00","expire_time":"2023-03-25T11:39:50+08:00","encrypt_certificate":{"algorithm":"AEAD_AES_256_GCM","nonce":"d215b0511e9c","associated_data":"certificate","ciphertext":"..."}}]}
    """
    ToSign.check_sign(ToSign.sign_str(), sign) # 第一个参数是根据微信返回信息生成的签名串, 第二个是你从微信得到的签名
    
    
    # 
    

    注意:
    1.调用ToSign 的时候先调用set_default来进行初始化时间戳和随机字符串
    2. json化是必须加ensure_ascii,否则会进行编码导致验签失败。

  • 相关阅读:
    part11-1 Python图形界面编程(Python GUI库介绍、Tkinter 组件介绍、布局管理器、事件处理)
    part10-3 Python常见模块(正则表达式)
    Cyclic Nacklace HDU
    模拟题 Right turn SCU
    状态DP Doing Homework HDU
    Dp Milking Time POJ
    区间DP Treats for the Cows POJ
    DP Help Jimmy POJ
    Dales and Hills Gym
    Kids and Prizes Gym
  • 原文地址:https://www.cnblogs.com/ShanCe/p/14748613.html
Copyright © 2011-2022 走看看