zoukankan      html  css  js  c++  java
  • mytest3.py-api接入平台获取数据

    mytest3.py-api接入平台获取数据

    import base64
    import datetime
    import hashlib
    import urllib
    import urllib.parse
    import requests
    import hmac
    
    # 火币网上的接入(原来的代码中有的api地址和账户信息)
    TRADE_URL = "https://api.huobi.pro"
    ACCESS_KEY = "5acf7fa3-2b53abe3-5d0e6b2e-885f5"
    SECRET_KEY = "13439487-047b4da1-87e463f0-efa3c"
    
    
    # 返回api产生的数据
    def api_key_get(params, request_path):
        method = 'GET'
        timestamp = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S')
        params.update({'AccessKeyId': ACCESS_KEY,
                       'SignatureMethod': 'HmacSHA256',
                       'SignatureVersion': '2',
                       'Timestamp': timestamp})
    
        host_url = TRADE_URL
        host_name = urllib.parse.urlparse(host_url).hostname
        host_name = host_name.lower()
        params['Signature'] = createSign(params, method, host_name, request_path, SECRET_KEY)
    
        url = host_url + request_path
        return http_get_request(url, params)
    
    
    # 创建签名字符串
    def createSign(pParams, method, host_url, request_path, secret_key):
        sorted_params = sorted(pParams.items(), key=lambda d: d[0], reverse=False)
        encode_params = urllib.parse.urlencode(sorted_params)
        payload = [method, host_url, request_path, encode_params]
        payload = '
    '.join(payload)
        payload = payload.encode(encoding='UTF8')
        secret_key = secret_key.encode(encoding='UTF8')
    
        digest = hmac.new(secret_key, payload, digestmod=hashlib.sha256).digest()
        signature = base64.b64encode(digest)
        signature = signature.decode()
        return signature
    
    
    # 返回get请求获取数据
    def http_get_request(url, params, add_to_headers=None):
        headers = {
            "Content-type": "application/x-www-form-urlencoded",
            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36',
        }
        if add_to_headers:
            headers.update(add_to_headers)
        postdata = urllib.parse.urlencode(params)
        response = requests.get(url, postdata, headers=headers, timeout=5)
        try:
    
            if response.status_code == 200:
                return response.json()
            else:
                return
        except BaseException as e:
            print("httpGet failed, detail is:%s,%s" % (response.text, e))
            return
    
    
    if __name__ == '__main__':
        params = {'account-id': '4656265'}
    
        request_path = '/v1/account/accounts/%s/balance' % params['account-id']
    
        res = api_key_get(params, request_path)
        print(res)
        print(type(res))
    

      

  • 相关阅读:
    java ssh 框架下 利用junit4 spring-test进行单元测试
    在写junit test 的时候出现的java.lang.UnsupportedClassVersionError问题
    IBatis 2.x 和 MyBatis 3.0.x 的区别(从 iBatis 到 MyBatis)
    mysql GET DIAGNOSTICS 语法
    如何优化用SQL语句INSERT INTO … SELECT插入数据时锁全表的问题
    mysql中binlog_format模式与配置详解
    MySql 里的IFNULL、NULLIF和ISNULL用法区别
    mysql order by 优化 |order by 索引的应用
    几款开源ESB总线的比较
    ETL简介
  • 原文地址:https://www.cnblogs.com/andy9468/p/9617176.html
Copyright © 2011-2022 走看看