zoukankan      html  css  js  c++  java
  • python异步加协程获取比特币市场信息

    目标

      选取几个比特币交易量大的几个交易平台,查看对应的API,获取该市场下货币对的ticker和depth信息。我们从网站上选取4个交易平台:bitfinex、okex、binance、gdax。对应的交易对是BTC/USD,BTC/USDT,BTC/USDT,BTC/USD。

    、ccxt

      开始想着直接请求市场的API,然后再解析获取下来的数据,但到github上发现一个比较好得python库,里面封装好了获取比特币市场的相关函数,这样一来就省掉分析API的时间了。因此我只要传入市场以及对应的货币对,利用库里面的函数 fetch_ticker 和 fetch_order_book 就可以获取到市场的ticker和depth信息(具体的使用方法可以查看ccxt手册)。接下来以市场okex为例,利用ccxt库获取okex的ticker和depth信息。

    # 引入库
    import ccxt
    
    # 实例化市场
    exchange = ccxt.okex()
    # 交易对
    symbol = 'BTC/USDT'
    
    # 获取ticker信息
    ticker = exchange.fetch_ticker(symbol)
    # 获取depth信息
    depth = exchange.fetch_order_book(symbol)
    
    print('ticker:%s, depth:%s' % (ticker, depth))
    

       运行后会得到结果如下图,从此可以看出已经获取到了ticker和depth信息。

     

     二、获取四个市场的信息(for循环)

       接下来我们获取四个市场的信息,深度里面有asks和bids,数据量稍微有点儿多,这里depth信息我只去前面五个,对于ticker我也只提取里面的info信息(具体代表什么含义就要参考一下对应市场的API啦)。将其简单的封装后,最开始我想的是for循环。想到啥就开始吧:

    # 引入库
    import ccxt
    import time
    
    now = lambda: time.time()
    start = now()
    
    def getData(exchange, symbol):
        data = {}  # 用于存储ticker和depth信息
        # 获取ticker信息
        tickerInfo = exchange.fetch_ticker(symbol)
        # 获取depth信息
        depth = {}
        # 获取深度信息
        exchange_depth = exchange.fetch_order_book(symbol)
        # 获取asks,bids 最低5个,最高5个信息
        asks = exchange_depth.get('asks')[:5]
        bids = exchange_depth.get('bids')[:5]
        depth['asks'] = asks
        depth['bids'] = bids
    
        data['ticker'] = tickerInfo
        data['depth'] = depth
    
        return data
    
    def main():
        # 实例化市场
        exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()]
        # 交易对
        symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']
    
        for i in range(len(exchanges)):
            exchange = exchanges[i]
            symbol = symbols[i]
            data = getData(exchange, symbol)
            print('exchange: %s data is %s' % (exchange.id, data))
    
    if __name__ == '__main__':
        main()
        print('Run Time: %s' % (now() - start))
    

       运行后会发现虽然每个市场的信息都获取到了,执行完差不多花掉5.7秒,因为这是同步的,也就是按顺序执行的,要是要想每隔一定时间同时获取四个市场的信息,很显然这种结果不符合我们的要求。

     

    三、异步加协程(coroutine)

      前面讲的循环虽然可以输出结果,但耗时长而且达不到想要的效果,接下来采用异步加协程(参考知乎上的一篇文章),要用到异步首先得引入asyncio库,这个库是3.4以后才有的,它提供了一种机制,使得你可以用协程(coroutines)、IO复用(multiplexing I/O)在单线程环境中编写并发模型。这里python文档有个小例子。

    import asyncio
    
    async def compute(x, y):
        print("Compute %s + %s ..." % (x, y))
        await asyncio.sleep(1.0)
        return x + y
    
    async def print_sum(x, y):
        result = await compute(x, y)
        print("%s + %s = %s" % (x, y, result))
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1, 2))
    loop.close()
    

      

      当事件循环开始运行时,它会在Task中寻找coroutine来执行调度,因为事件循环注册了print_sum(),因此print_sum()被调用,执行result = await compute(x, y)这条语句(等同于result = yield from compute(x, y)),因为compute()自身就是一个coroutine,因此print_sum()这个协程就会暂时被挂起,compute()被加入到事件循环中,程序流执行compute()中的print语句,打印”Compute %s + %s …”,然后执行了await asyncio.sleep(1.0),因为asyncio.sleep()也是一个coroutine,接着compute()就会被挂起,等待计时器读秒,在这1秒的过程中,事件循环会在队列中查询可以被调度的coroutine,而因为此前print_sum()compute()都被挂起了,因此事件循环会停下来等待协程的调度,当计时器读秒结束后,程序流便会返回到compute()中执行return语句,结果会返回到print_sum()中的result中,最后打印result,事件队列中没有可以调度的任务了,此时loop.close()把事件队列关闭,程序结束。

      接下来我们采用异步和协程(ps:ccxt库也有对应的异步),运行后发现时间只用了1.9秒,比之前快了好多倍。  

    Run Time: 1.9661316871643066 

    相关代码:

    # 引入库
    import ccxt.async as ccxt
    import asyncio
    import time
    
    now = lambda: time.time()
    start = now()
    
    async def getData(exchange, symbol):
        data = {}  # 用于存储ticker和depth信息
        # 获取ticker信息
        tickerInfo = await exchange.fetch_ticker(symbol)
        # 获取depth信息
        depth = {}
        # 获取深度信息
        exchange_depth = await exchange.fetch_order_book(symbol)
        # 获取asks,bids 最低5个,最高5个信息
        asks = exchange_depth.get('asks')[:5]
        bids = exchange_depth.get('bids')[:5]
        depth['asks'] = asks
        depth['bids'] = bids
    
        data['ticker'] = tickerInfo
        data['depth'] = depth
    
        return data
    
    def main():
        # 实例化市场
        exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()]
        # 交易对
        symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']
    
        tasks = []
        for i in range(len(exchanges)):
            task = getData(exchanges[i], symbols[i])
            tasks.append(asyncio.ensure_future(task))
    
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
    
    if __name__ == '__main__':
        main()
        print('Run Time: %s' % (now() - start))
    

      

     三、定时爬取并用mongodb保存数据

      在前面的基础上,添加一个定时任务,实现每隔一段时间爬取一次数据,并将数据保存到mongodb数据库。只需再前面的代码上稍微改改就可以啦,代码和运行结果如下:

    import asyncio
    import ccxt.async as ccxt
    import time
    import pymongo
    
    # 获取ticker和depth信息
    async def get_exchange_tickerDepth(exchange, symbol):  # 其中exchange为实例化后的市场
        # print('start get_ticker')
        while True:
            print('%s is run %s' % (exchange.id, time.ctime()))
    
            # 获取ticher信息
            tickerInfo = await exchange.fetch_ticker(symbol)
            ticker = tickerInfo.get('info')
    
            if type(ticker) == type({}):
                ticker['timestamp'] = tickerInfo.get('timestamp')
                ticker['high'] = tickerInfo.get('high')
                ticker['low'] = tickerInfo.get('low')
                ticker['last'] = tickerInfo.get('last')
            else:
                ticker = tickerInfo
            # print(ticker)
    
            # 获取深度信息
            depth = {}
            exchange_depth = await exchange.fetch_order_book(symbol)
            # 获取asks,bids 最低5个,最高5个信息
            asks = exchange_depth.get('asks')[:5]
            bids = exchange_depth.get('bids')[:5]
            depth['asks'] = asks
            depth['bids'] = bids
            # print('depth:{}'.format(depth))
            data = {
                'exchange': exchange.id,
                'countries': exchange.countries,
                'symbol': symbol,
                'ticker': ticker,
                'depth': depth
            }
    
            # 保存数据
            save_exchangeDate(exchange.id, data)
            print('********* %s is finished, time %s *********' % (exchange.id, time.ctime()))
    
            # 等待时间
            await asyncio.sleep(2)
    
    
    # 存库
    def save_exchangeDate(exchangeName, data):
        # 链接MongoDB
        connect = pymongo.MongoClient(host='localhost', port=27017)
        # 创建数据库
        exchangeData = connect['exchangeDataAsyncio']
        # 创建表
        exchangeInformation = exchangeData[exchangeName]
        # print(table_name)
        # 数据去重后保存
        count = exchangeInformation.count()
        if not count > 0:
            exchangeInformation.insert_one(data)
        else:
            for item in exchangeInformation.find().skip(count - 1):
                lastdata = item
            if lastdata['ticker']['timestamp'] != data['ticker']['timestamp']:
                exchangeInformation.insert_one(data)
    
    def main():
        exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(),
                      ccxt.gdax()]
        symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']
        tasks = []
        for i in range(len(exchanges)):
            task = get_exchange_tickerDepth(exchanges[i], symbols[i])
            tasks.append(asyncio.ensure_future(task))
    
        loop = asyncio.get_event_loop()
    
        try:
            # print(asyncio.Task.all_tasks(loop))
            loop.run_forever()
    
        except Exception as e:
            print(e)
            loop.stop()
            loop.run_forever()
        finally:
            loop.close()
    
    if __name__ == '__main__':
        main()
    

    五、小结

      使用协程可以实现高效的并发任务。Python在3.4中引入了协程的概念,可是这个还是以生成器对象为基础,3.5则确定了协程的语法。这里只简单的使用了asyncio。当然实现协程的不仅仅是asyncio,tornado和gevent都实现了类似的功能。这里我有一个问题,就是运行一段时间后,里面的市场可能有请求超时等情况导致协程停止运行,我要怎样才能获取到错误然后重启对应的协程。如果有大神知道的话请指点指点。

     

    六、参考链接

    1. Python黑魔法 --- 异步IO( asyncio) 协程  http://python.jobbole.com/87310/

    2. Python并发编程之协程/异步IO  https://www.ziwenxie.site/2016/12/19/python-asyncio/

    3. 从0到1,Python异步编程的演进之路  https://zhuanlan.zhihu.com/p/25228075

    4. Tasks and coroutines  https://docs.python.org/3/library/asyncio-task.html

     

  • 相关阅读:
    [记录] 原生JS 的常用方法封装
    [记录] JavaScript 中的DOM操作
    [记录] JavaScript 中的正则表达式(案例分析)
    图片查看器插件(带缩略图) viewer.js
    (二) 美化滚动条 JScrollPane.js
    关于数据库存储过程管理的一点建议
    根据属性名称来改变XML节点值
    hdu 3911 Black And White
    poj 3667 Hotel
    hdu 4217 Data Structure?
  • 原文地址:https://www.cnblogs.com/xiaxuexiaoab/p/8410682.html
Copyright © 2011-2022 走看看