zoukankan      html  css  js  c++  java
  • 微信支付V3

    微信支付V3

    1、微信开放平台申请微信支付

    https://open.weixin.qq.com/cgi-bin/index
    

    2、微信开发平台文档中心JSAPI下单

    https://pay.weixin.qq.com/wiki/doc/apiv3/apis/chapter3_1_1.shtml
    

    3、业务流程图

    4、代码开发

    • 4.1 文件架构

      ├── WX
      │   ├── wxpay.py: 微信支付逻辑
      │   ├── wxpay_keys: 微信支付证书相关
      │   │   ├── apiclient_cert.p12
      │   │   ├── apiclient_cert.pem
      │   │   ├── apiclient_key.pem
      
    • 4.2 wxpay.py

      • 4.2.1 包安装

        pip install requests
        pip install cryptography 
        
      • 4.2.2 导包

        import json
        import random
        import string
        import os
        import base64
        import time
        import requests
        from base64 import b64encode
        from Cryptodome.PublicKey import RSA
        from Cryptodome.Signature import pkcs1_15
        from Cryptodome.Hash import SHA256
        from cryptography.hazmat.primitives.ciphers.aead import AESGCM
        
      • 4.2.3 微信支付相关数据

        # 微信支付数据
        WXPAY_DATA = {
            'APP_ID': 'wx*********',  # 你的应用ID
            'API_KEY': '*****************',  # 你的微信商户平台API KEY
            'MCH_ID': '***********',  # 你的直连商户号MCH_ID
            'PAY_KEY': '***********',  # 你的商户平台PAY_KEY
            'REQUEST_URL': "https://api.mch.weixin.qq.com/v3/pay/transactions/jsapi",  # 请求地址,固定写死
            'SERIAL_NO': '******************',  # 证书序列号  说明:证书序列号及key、cert、p12都要去申请并保证一致
        }
        
        # 证书路径  说明:证书序列号及key、cert、p12都要去申请并保证一致
        basedir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        KEY_PATH = os.path.join(basedir, "wxpay_keys", "apiclient_key.pem")
        CERT_PATH = os.path.join(basedir, "wxpay_keys", "apiclient_cert.pem")
        P12_CERT_PATH = os.path.join(basedir,  "wxpay_keys", "apiclient_cert.p12")
        
      • 4.2.4 微信支付通用类

        class WXPayUtil:
            """
            微信支付工具类
            """
        
            @staticmethod
            def get_sign(sign_str: str) -> str:
                """
                加密
                :param sign_str:
                :return:
                """
                rsa_key = RSA.importKey(open(KEY_PATH).read())
                signer = pkcs1_15.new(rsa_key)
                digest = SHA256.new(sign_str.encode('utf8'))
                sign = b64encode(signer.sign(digest)).decode('utf-8')
                return sign
        
            @staticmethod
            def decrypt(nonce: str, ciphertext: str, associated_data: str):
                """
                解密
                :param nonce:
                :param ciphertext:
                :param associated_data:
                :return:
                """
                key = WXPAY_DATA['PAY_KEY']
                key_bytes = str.encode(key)
                nonce_bytes = str.encode(nonce)
                ad_bytes = str.encode(associated_data)
                data = base64.b64decode(ciphertext)
                aes = AESGCM(key_bytes)
                return aes.decrypt(nonce_bytes, data, ad_bytes)
        
            @staticmethod
            def create_out_trade_no(user_id: int) -> str:
                """
                创建唯一微信商户订单号
                :return:
                """
                local_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
                user_id = '{:0>7d}'.format(user_id)
                result = f'wx{local_time[2:]}{user_id}'
                return result
        
            @staticmethod
            def get_random_str() -> str:
                """
                获取随机字符串
                :return:
                """
                return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
        
            @staticmethod
            def get_headers(random_str: str, sign: str, time_stamps: str) -> dict:
                """
                请求头
                :param random_str:
                :param sign:
                :param time_stamps:
                :return:
                """
                headers = {
                    'Content-Type': 'application/json; charset=UTF-8',
                    'Accept': 'application/json',
                    'User-Agent': '*/*',
                    'Authorization': 'WECHATPAY2-SHA256-RSA2048 ' +
                                     f'mchid="{WXPAY_DATA["MCH_ID"]}",nonce_str="{random_str}",'
                                     f'signature="{sign}",timestamp="{time_stamps}",'
                                     f'serial_no="{WXPAY_DATA["SERIAL_NO"]}"'
                }
                return headers
        
            @staticmethod
            def get_data(out_trade_no, notify_url, total_fee, open_id):
                """
                支付数据
                :param out_trade_no: 自定义订单号
                :param notify_url: 回调地址
                :param total_fee: 金额
                :param open_id: open_id
                :return:
                """
                data = {
                    'appid': WXPAY_DATA['APP_ID'],  # appid
                    'mchid': WXPAY_DATA['MCH_ID'],  # 商户号
                    'description': '商品描述',  # 商品描述
                    'out_trade_no': out_trade_no,  # 系统里的唯一订单号
                    'notify_url': notify_url,  # 支付结果回调url
                    "amount": {
                        "total": total_fee * 100,  # 微信支付 1等于1分钱,所以要乘以100
                        "currency": "CNY"
                    },
                    "payer": {"openid": open_id},  # 用户的openid
                }
                data = json.dumps(data)  # 只能序列化一次
                return data
        
            @staticmethod
            def get_res(response, time_stamps, random_str) -> dict:
                """
                给前端返回获取的支付信息
                :param response:
                :param time_stamps:
                :param random_str:
                :return:
                """
                if 'prepay_id' not in response.json():
                    return dict()
                res = {
                    "message": dict(
                        package='prepay_id=' + response.json()['prepay_id'],
                        timeStamp=time_stamps,
                        nonceStr=random_str,
                        paySign=WXPayUtil.get_sign(
                            f"{WXPAY_DATA['APP_ID']}
        "
                            f"{time_stamps}
        {random_str}
        "
                            f"{'prepay_id=' + response.json()['prepay_id']}
        "),
                        signType='RSA',
                    ),
                    "meta": {
                        "msg": "",
                        "status": 200
                    }
                }
                return res
        
            @staticmethod
            def get_sign_str(time_stamps, random_str, data):
                return f"POST
        {'/v3/pay/transactions/jsapi'}
        {time_stamps}
        {random_str}
        {data}
        "
        
      • 4.2.5 与FastAPI框架结合使用示例

        """
        @Author: GuHaoHao
        @coding: utf-8
        """
        import json
        import random
        import string
        import os
        import base64
        import time
        import uvicorn
        import requests
        from base64 import b64encode
        
        from Cryptodome.PublicKey import RSA
        from Cryptodome.Signature import pkcs1_15
        from Cryptodome.Hash import SHA256
        from cryptography.hazmat.primitives.ciphers.aead import AESGCM
        
        from fastapi import FastAPI
        
        app = FastAPI()
        
        # 微信支付数据
        WXPAY_DATA = {
            'APP_ID': 'wx*********',  # 你的应用ID
            'API_KEY': '*****************',  # 你的微信商户平台API KEY
            'MCH_ID': '***********',  # 你的直连商户号MCH_ID
            'PAY_KEY': '***********',  # 你的商户平台PAY_KEY
            'REQUEST_URL': "https://api.mch.weixin.qq.com/v3/pay/transactions/jsapi",  # 请求地址,固定写死
            'SERIAL_NO': '******************',  # 证书序列号
        }
        
        # 证书路径  说明:证书序列号及key、cert、p12都要去申请
        basedir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        KEY_PATH = os.path.join(basedir, "wxpay_keys", "apiclient_key.pem")
        CERT_PATH = os.path.join(basedir, "wxpay_keys", "apiclient_cert.pem")
        P12_CERT_PATH = os.path.join(basedir,  "wxpay_keys", "apiclient_cert.p12")
        
        
        class WXPayUtil:
            """
            微信支付工具类
            """
        
            @staticmethod
            def get_sign(sign_str: str) -> str:
                """
                加密
                :param sign_str:
                :return:
                """
                rsa_key = RSA.importKey(open(KEY_PATH).read())
                signer = pkcs1_15.new(rsa_key)
                digest = SHA256.new(sign_str.encode('utf8'))
                sign = b64encode(signer.sign(digest)).decode('utf-8')
                return sign
        
            @staticmethod
            def decrypt(nonce: str, ciphertext: str, associated_data: str):
                """
                解密
                :param nonce:
                :param ciphertext:
                :param associated_data:
                :return:
                """
                key = WXPAY_DATA['PAY_KEY']
                key_bytes = str.encode(key)
                nonce_bytes = str.encode(nonce)
                ad_bytes = str.encode(associated_data)
                data = base64.b64decode(ciphertext)
                aes = AESGCM(key_bytes)
                return aes.decrypt(nonce_bytes, data, ad_bytes)
        
            @staticmethod
            def create_out_trade_no(user_id: int) -> str:
                """
                创建唯一微信商户订单号
                :return:
                """
                local_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
                user_id = '{:0>7d}'.format(user_id)
                result = f'wx{local_time[2:]}{user_id}'
                return result
        
            @staticmethod
            def get_random_str() -> str:
                """
                获取随机字符串
                :return:
                """
                return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
        
            @staticmethod
            def get_headers(random_str: str, sign: str, time_stamps: str) -> dict:
                """
                请求头
                :param random_str:
                :param sign:
                :param time_stamps:
                :return:
                """
                headers = {
                    'Content-Type': 'application/json; charset=UTF-8',
                    'Accept': 'application/json',
                    'User-Agent': '*/*',
                    'Authorization': 'WECHATPAY2-SHA256-RSA2048 ' +
                                     f'mchid="{WXPAY_DATA["MCH_ID"]}",nonce_str="{random_str}",'
                                     f'signature="{sign}",timestamp="{time_stamps}",'
                                     f'serial_no="{WXPAY_DATA["SERIAL_NO"]}"'
                }
                return headers
        
            @staticmethod
            def get_data(out_trade_no, notify_url, total_fee, open_id):
                """
                支付数据
                :param out_trade_no: 自定义订单号
                :param notify_url: 回调地址
                :param total_fee: 金额
                :param open_id: open_id
                :return:
                """
                data = {
                    'appid': WXPAY_DATA['APP_ID'],  # appid
                    'mchid': WXPAY_DATA['MCH_ID'],  # 商户号
                    'description': '商品描述',  # 商品描述
                    'out_trade_no': out_trade_no,  # 系统里的唯一订单号
                    'notify_url': notify_url,  # 支付结果回调url
                    "amount": {
                        "total": total_fee * 100,  # 微信支付 1等于1分钱,所以要乘以100
                        "currency": "CNY"
                    },
                    "payer": {"openid": open_id},  # 用户的openid
                }
                data = json.dumps(data)  # 只能序列化一次
                return data
        
            @staticmethod
            def get_res(response, time_stamps, random_str) -> dict:
                """
                给前端返回获取的支付信息
                :param response:
                :param time_stamps:
                :param random_str:
                :return:
                """
                if 'prepay_id' not in response.json():
                    return dict()
                res = {
                    "message": dict(
                        package='prepay_id=' + response.json()['prepay_id'],
                        timeStamp=time_stamps,
                        nonceStr=random_str,
                        paySign=WXPayUtil.get_sign(
                            f"{WXPAY_DATA['APP_ID']}
        "
                            f"{time_stamps}
        {random_str}
        "
                            f"{'prepay_id=' + response.json()['prepay_id']}
        "),
                        signType='RSA',
                    ),
                    "meta": {
                        "msg": "",
                        "status": 200
                    }
                }
                return res
        
            @staticmethod
            def get_sign_str(time_stamps, random_str, data):
                return f"POST
        {'/v3/pay/transactions/jsapi'}
        {time_stamps}
        {random_str}
        {data}
        "
        
        
        @app.post("/wx_pay")
        async def wx_pay():
            """
            微信支付
            :return:
            """
            NOTIFY_URL = 'https://8.136.214.28:5000/wx_pay_result'  # 异步通知,通知成功则购买商品成功  todo 可修改为自己的回调地址
        
            # todo 以下为伪造数据,具体可通过前后端传参及数据库操作获取数据
            total_fee = 1  # 购买商品钱数
            user_id = 1   # 用户id
            open_id = '*************'   # 用户openId
        
            # 自己的唯一微信订单号
            out_trade_no = WXPayUtil.create_out_trade_no(user_id)
        
            data = WXPayUtil.get_data(out_trade_no=out_trade_no,
                                      notify_url=NOTIFY_URL,
                                      total_fee=total_fee,
                                      open_id=open_id)
            random_str = WXPayUtil.get_random_str()
            time_stamps = str(int(time.time()))
        
            sign_str = WXPayUtil.get_sign_str(
                time_stamps=time_stamps,
                random_str=random_str,
                data=data
            )
            sign = WXPayUtil.get_sign(sign_str)
        
            try:
                response = requests.post(WXPAY_DATA['REQUEST_URL'],
                                         data=data,
                                         headers=WXPayUtil.get_headers(
                                             random_str=random_str,
                                             sign=sign,
                                             time_stamps=time_stamps)
                                         )
        
                print(response.text)  # 返回{"prepay_id":"wx*************************"}
        
                res = WXPayUtil.get_res(
                    response=response,
                    time_stamps=time_stamps,
                    random_str=random_str
                )
                if not res:
                    return {'message': '操作接口失败'}
        
                # todo 充值逻辑,数据库操作,比如添加一条充值记录
                return {'message': '操作接口成功'}
        
            except requests.ConnectTimeout:
                return {'message': '操作接口失败'}
        
        
        @app.post("/wx_pay_result")
        async def wx_pay_result(**kwargs):
            wx_id = kwargs.get('id')
            create_time = kwargs.get('create_time')
            resource_type = kwargs.get('resource_type')
            event_type = kwargs.get('event_type')
            summary = kwargs.get('summary')
            resource = kwargs.get('resource')
        
            if not all([wx_id, create_time, resource_type, event_type, summary, resource]):
                return {"code": "FAIL", "message": "失败"}
        
            if isinstance(resource, str):
                resource = json.loads(resource)
            nonce = resource.get('nonce')
            ciphertext = resource.get('ciphertext')
            associated_data = resource.get('associated_data')
        
            try:
                data_dict = json.loads(WXPayUtil.decrypt(nonce, ciphertext, associated_data))
                print(data_dict)
        
                if not data_dict:
                    return {"code": "FAIL", "message": "失败"}
        
                # todo 根据订单号,把数据库的订单状态修改为支付成功,更改订单记录状态为”已到账“
                return {"code": "SUCCESS", "message": "成功"}
            except ValueError:
                return {"code": "FAIL", "message": "失败"}
        
        if __name__ == '__main__':
            uvicorn.run(
                "wx_pay_example",  # 不建议取中文名
                host='127.0.0.1',
                port=5000,
                log_level='info'
            )
        
  • 相关阅读:
    C++引用之引用的使用
    C++引用之声明方法
    C++const与指针
    C++默认参数值函数
    LeanCloud 调研报告
    [译] 为何流处理中局部状态是必要的
    Z-Stack
    Think twice before starting the adventure
    Multi-pattern string match using Aho-Corasick
    C pointer again …
  • 原文地址:https://www.cnblogs.com/ghh520/p/15020522.html
Copyright © 2011-2022 走看看