import json
import time
import random
import string
import base64
# 创建订单
import pprint
import requests
from Cryptodome import Hash
from Cryptodome.PublicKey import RSA
from Cryptodome.Signature import PKCS1_v1_5
from Cryptodome.Hash import SHA256
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import config
from app import logger
# 签名验签相关
class ToSign:
# app/handler/wechat_app/
try:
with open("app/handler/wechat_app/apiclient_key.pem", 'r') as f:
private_key = f.read()
except Exception:
with open("apiclient_key.pem", 'r') as f:
private_key = f.read()
@classmethod
def set_default(cls):
cls.timestamp = "%.f" % time.time() # "%.f" % time.time() # 时间戳
cls.nonce_str = "".join(random.sample(string.ascii_letters + string.digits, 16)) # 随机字符串
@classmethod
def set_sign_data(cls, method: str, url: str, body: dict = None):
"""设置默认数据 """
cls.method = method
cls.url = url
if body:
cls.body = json.dumps(body) # 转换为json字符串
else:
cls.body = ""
@classmethod
def sign_str(cls):
"""生成欲签名字符串"""
return str("
".join([cls.method, cls.url,
cls.timestamp, cls.nonce_str,
cls.body])+"
")
# 签名
@classmethod
def sign(cls, sign_str):
"""签名 """
pkey = RSA.importKey(cls.private_key)
h = SHA256.new(sign_str.encode('utf-8'))
signature = PKCS1_v1_5.new(pkey).sign(h)
sign = base64.b64encode(signature).decode()
return sign
# 获取请求头authorization
@classmethod
def authorization_str(cls):
sign_ = cls.sign(cls.sign_str())
"""拼接header authorization"""
authorization = 'WECHATPAY2-SHA256-RSA2048 '
'mchid="{mchid}",'
'nonce_str="{nonce_str}",'
'signature="{sign}",'
'timestamp="{timestamp}",'
'serial_no="{serial_no}"'.
format(mchid=config.mchid,
nonce_str=cls.nonce_str,
sign=sign_,
timestamp=cls.timestamp,
serial_no=config.serial_no
)
return authorization
# 验签
@classmethod
def check_sign(cls, plain_text: str, sign: str, certificate=None) -> bool: # 明文、 密文
# base64 解码
sign_str = base64.b64decode(sign)
# 这里采用的是从接获获得的证书,微信支付证书
signature2 = RSA.importKey(certificate)
verifier = PKCS1_v1_5.new(signature2)
digest = Hash.SHA256.new()
digest.update(plain_text.encode("utf8"))
return verifier.verify(digest, sign_str)
# 解密
def decrypt(nonce, ciphertext, associated_data):
"""
解密数据获取平台证书
https://pay.weixin.qq.com/wiki/doc/apiv3/wechatpay/wechatpay5_1.shtml
https://pay.weixin.qq.com/wiki/doc/apiv3/wechatpay/wechatpay4_2.shtml
"""
key = config.Apiv3Key
key_bytes = str.encode(key)
nonce_bytes = str.encode(nonce)
ad_bytes = str.encode(associated_data)
data = base64.b64decode(ciphertext)
aesgcm = AESGCM(key_bytes)
# 解密出来的是加密字符串。取出你想要的数据
return aesgcm.decrypt(nonce_bytes, data, ad_bytes)
# 获取微信支付平台证书, 这个证书用于,验签 https://pay.weixin.qq.com/wiki/doc/apiv3/wechatpay/wechatpay4_1.shtml
def get_credential():
# 获取平台证书序列号
ToSign.set_default()
ToSign.set_sign_data("GET", "/v3/certificates")
authorization = ToSign.authorization_str()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": authorization
}
res = requests.get("https://api.mch.weixin.qq.com/v3/certificates", headers=headers)
if res.status_code != 200:
logger.error("获取获取微信支付平台证书失败")
logger.error(res.json())
plain_text, sign, res_data = get_attestation_about(res)
# 进行解密, 获取证书
# 这里可能会有很多证书,选启用时间最晚的一个
result_data = max(res_data["data"], key=lambda x: x["effective_time"])
nonce = result_data["encrypt_certificate"]["nonce"]
ciphertext = result_data["encrypt_certificate"]["ciphertext"]
associated_data = result_data["encrypt_certificate"]["associated_data"]
# 明文串
str_ = decrypt(nonce=nonce, ciphertext=ciphertext, associated_data=associated_data)
check_bool = ToSign.check_sign(plain_text=plain_text, sign=sign, certificate=str_)
if check_bool:
return str_, result_data
else:
logger.error(f"获取支付证书验签失败,plain_text:{plain_text}, sign: {sign}")
return "", {}
# 获取眼前需要的数据
def get_attestation_about(response):
time_stamp = response.headers.get("Wechatpay-Timestamp")
nonce = response.headers.get("Wechatpay-Nonce")
sign = response.headers.get("Wechatpay-Signature")
logger.info("time_stamp:"+time_stamp)
logger.info("nonce:"+nonce)
logger.info("sign:"+sign)
try:
req_data = json.dumps(response.json()).replace(" ", "")
except Exception:
req_data = response.json
req_data = json.dumps(req_data, ensure_ascii=False).replace(" ", "")
plain_text = f"{time_stamp}
{nonce}
{req_data}
"
return plain_text, sign, json.loads(req_data) # 加密字符串, 签名, body->json
调用
# 签名
ToSign.set_default()
ToSign.set_sign_data("POST", "/v3/pay/transactions/jsapi", data)
authorization_str = ToSign.authorization_str()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": authorization_str
}
res_data = requests.post(url, json=data, headers=headers)
if 'prepay_id' not in res_data.json():
return {"success": False, "message": "调取微信支付失败"}
# 验签 https://pay.weixin.qq.com/wiki/doc/apiv3/wechatpay/wechatpay5_1.shtml
"""
1554209980
c5ac7061fccab6bf3e254dcf98995b8c
{"data":[{"serial_no":"5157F09EFDC096DE15EBE81A47057A7232F1B8E1","effective_time":"2018-03-26T11:39:50+08:00","expire_time":"2023-03-25T11:39:50+08:00","encrypt_certificate":{"algorithm":"AEAD_AES_256_GCM","nonce":"d215b0511e9c","associated_data":"certificate","ciphertext":"..."}}]}
"""
ToSign.check_sign(ToSign.sign_str(), sign) # 第一个参数是根据微信返回信息生成的签名串, 第二个是你从微信得到的签名
#
注意:
1.调用ToSign 的时候先调用set_default来进行初始化时间戳和随机字符串
2. json化是必须加ensure_ascii,否则会进行编码导致验签失败。