zoukankan      html  css  js  c++  java
  • Elasticsearch+Mongo亿级别数据导入及查询实践

    数据方案:
    • 在Elasticsearch中通过code及time字段查询对应doc的mongo_id字段获得mongodb中的主键_id
    • 通过获得id再进入mongodb进行查询
     
    1,数据情况:
    • 全部为股票及指数的分钟K线数据(股票代码区分度较高)
    • Elasticsearch及mongodb都未分片且未优化参数配置,mongo表中只有主键_id索引
    • mongodb数据量:

        

    • Elasticsearch数据量:

        

    2,将数据从mongo源库导入Elasticsearch

    import time
    from pymongo import MongoClient
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import bulk
    es = Elasticsearch()
    
    conn = MongoClient('127.0.0.1', 27017)
    db = conn.kline_db
    my_set = db.min_kline
    x = 1
    tmp = []
    
    #此处有个坑mongo查询时由于数据量比较大时间较长需要设置游标不过期:no_cursor_timeout=True
    for i in my_set.find(no_cursor_timeout=True):
        x+=1
        #每次插入100000条
        if x%100000 == 99999:
            #es批量插入
            success, _ = bulk(es, tmp, index='test_2', raise_on_error=True)
            print('Performed %d actions' % success)
            tmp = []
        if i['market'] == 'sz':
            market = 0
        else:
            market = 1
        #此处有个秒数时间类型及时区转换
        tmp.append({"_index":'test_2',"_type": 'kline','_source':{'code':i['code'],'market':market,
                    'time':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(i['kline_time']/1000 - 8*60*60))
                    ,'mongo_id':str(i['_id'])}})
    
    #将最后剩余在tmp中的数据插入
    if len(tmp)>0:
        success, _ = bulk(es, tmp, index='test_2', raise_on_error=True)
        print('Performed %d actions' % success)

    3,Elasticsearch+mongo查询时间统计

    import time
    from pymongo import MongoClient
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import scan
    from bson.objectid import ObjectId
    
    #es连接
    es = Elasticsearch()
    
    #mongo连接
    conn = MongoClient('127.0.0.1', 27017)
    db = conn.kline_db  #连接kline_db数据库,没有则自动创建
    my_set = db.min_kline
    
    tmp = []
    
    #计算运行时间装饰器
    def cal_run_time(func):
        def wrapper(*args,**kwargs):
            start_time = time.time()
            res = func(*args,**kwargs)
            end_time = time.time()
            print(str(func) +'---run time--- %s' % str(end_time-start_time))
            return res
        return wrapper
    
    @cal_run_time
    def query_in_mongo(tmp_list):
        k_list = []
        kline_data = my_set.find({'_id':{'$in':tmp_list}})
        for k in kline_data:
            k_list.append(k)
        return k_list
    
    @cal_run_time
    def query_in_es():
        #bool多条件查询 must相当于and
        body = {
            "query": {
                "bool": {
                    "must": [{
                        "range": {#范围查询
                            "time": {
                                "gte": '2017-01-10 00:00:00',  # >=
                                "lte": '2017-04-12 00:00:00'  # <=
                            }
                        }
                    },
                        {"terms": {# == 或  in:terms 精确查询
                            "code": ['000002','000001']
                        }
                        }
                    ]
                }
    
            }
        }
    
        #根据body条件记性查询
        scanResp = scan(es, body, scroll="10m", index="test_2",doc_type="kline", timeout="10m")
    
        #解析结果字典并放入tmp列表中
        for resp in scanResp:
            tmp.append(ObjectId(resp['_source']['mongo_id']))
    
        print(len(tmp))
    
        #--------------此处有个坑,直接使用search方法查询到的结果集中最多只有10条记录----------------
        # zz = es.search(index="test_2", doc_type="kline", body=body)
        # print(zz['hits']['total'])
        # for resp in zz['hits']['hits']:
        #     tmp.append(ObjectId(resp['_source']['mongo_id']))
    
    query_in_es()
    
    query_in_mongo(tmp)

    运行结果如下:

    第一行:查询的doc个数:28320

    第二行:es查询所用时间:0.36s

    第三行:mongo使用_id查询所用时间 :0.34s

    从结果来看对于3亿多数据的查询Elasticsearch的速度还是相当不错的

    ※Elasticsearch主要的优势在于可以进行快速的分词模糊查询,所以股票K线这个场景并没有充分发挥其优势,至于查询效率,其实mysql,mongo等只要分库分表合理一样能够达到。

    ※Elasticsearch+Mongo这个架构主要针对场景:使用mongo存储海量数据,且这张表读写都很频繁。

  • 相关阅读:
    微信聊天框测试思路
    巧用&&和|| 让逻辑代码更简洁,逼格看起来更高一点(玩笑脸)
    获取URL中的参数
    解决移动端点击闪烁问题
    npm安装依赖包 --save-dev 和 --save; package.json的devDependencies和dependencies 的区别!
    vue-cli 3配置接口代理
    js小方法积累,将一个数组按照n个一份,分成若干数组
    web前端识别文字转语音
    html 锚点
    ES6 必须要用的数组Filter() 方法,不要再自己循环遍历了!!!
  • 原文地址:https://www.cnblogs.com/dxf813/p/8371214.html
Copyright © 2011-2022 走看看