zoukankan      html  css  js  c++  java
  • ElasticSearch改造研报查询实践

    背景:

      1,系统简介:通过人工解读研报然后获取并录入研报分类及摘要等信息,系统通过摘要等信息来获得该研报的URI

      2,现有实现:老系统使用MSSQL存储摘要等信息,并将不同的关键字分解为不同字段来提供搜索查询

      3,存在问题:

        -查询操作繁琐,死板:例如要查某个机构,标题含有周报的研报,现有系统需要勾选相应字段再输入条件

        -查询速度缓慢,近千万级别数据响应时间4-5s

      4,改进:使用es优化,添加多个关键字模糊查询(非长文本数据,因此未使用_socre进行评分查询)

        -例如:输入“国泰君安 周报”就可查询到所有相关的国泰君安的周报

     

    1,新建Index

    curl -X PUT 'localhost:9200/src_test_1' -H 'Content-Type: application/json' -d '
    {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
      "mappings": {
        "doc_test": {
          "properties": {
            "title": {#研报综合标题
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
            },
            "author": {#作者
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
            },
            "institution": {#机构
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_max_word"
            },
              "industry": {#行业
                  "type": "text",
                  "analyzer": "ik_max_word",
                  "search_analyzer": "ik_max_word"
              },
              "grade": {#评级
                  "type": "text",
                  "analyzer": "ik_max_word",
                  "search_analyzer": "ik_max_word"
              },
              "doc_type": {#研报分类
                  "type": "text",
                  "analyzer": "ik_max_word",
                  "search_analyzer": "ik_max_word"
              },
             "time": {#发布时间
              "type": "date" ,
              "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
             },
              "doc_uri": {#地址
               "type": "text",
                "index":false
             },
              "doc_size": {#文件大小
               "type": "integer",
                "index":false
             },
              "market": {#市场
              "type": "byte"
             }
          }
        }
      }
    }'

     ※特别提示对于需要模糊查询的中文文本字段最好都设置text属性(keyword无法被分词:用于精确查找),并使用ik_max_word分词器。

    ※使用ik_max_word原因:针对该场景,例如我想使用“国泰”关键词进行匹配,如果使用默认ik会将“国”,“泰”分开进行查询,而不是需求的“国泰”这个词

    2,数据导入(CSV分批)

    import pandas as pd
    import numpy as np
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import bulk
    es = Elasticsearch()
    
    data_will_insert = []
    x = 1
    
    # #使用pandas读取csv数据;如果出现乱码加:encoding = "ISO-8859-1"
    src_data = pd.read_csv('ResearchReportEx.csv')
    
    for index,i in src_data.iterrows():
        x+=1
        #每次插入100000条
        if x%100000 == 99999:
            #es批量插入
            success, _ = bulk(es, data_will_insert, index='src_test_1', raise_on_error=True)
            print('Performed %d actions' % success)
            data_will_insert = []
    
        #判断市场
        if i['ExchangeType'] == 'CN':
            market = 0
        elif i['ExchangeType'] == 'HK':
            market = 1
        elif i['ExchangeType'] == 'World':
            market = 2
        else:
            market = 99
    
        data_will_insert.append({"_index":'src_test_1',"_type": 'doc_test','_source':
                    {
                    'title':i['Title'],
                    'author':i['AuthorName'],
                    'time':i['CreateTime']+':00',
                    'institution':i['InstituteNameCN'],
                    'doc_type':i['KindName'] if i['Kind2Name'] is np.NaN else i['KindName']+'|%s' % i['Kind2Name'],
                    'industry':'' if i['IndustryName'] is np.NaN else i['IndustryName'],
                    'grade':'' if i['GradeName'] is np.NaN else i['GradeName'],
                    'doc_uri':i['FileURL'],
                    'doc_size':i['Size'],
                    'market':market
                    }
                    })
    
    #将最后剩余在list中的数据插入
    if len(data_will_insert)>0:
        success, _ = bulk(es, data_will_insert, index='src_test_1', raise_on_error=True)
        print('Performed %d actions' % success)

    3,查询

    import time
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import scan
    
    # es连接
    es = Elasticsearch()
    
    
    # 计算运行时间装饰器
    def cal_run_time(func):
        def wrapper(*args, **kwargs):
            start_time = time.time()
            res = func(*args, **kwargs)
            end_time = time.time()
            print(str(func) + '---run time--- %s' % str(end_time - start_time))
            return res
    
        return wrapper
    
    
    @cal_run_time
    def query_in_es():
        body = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "multi_match": {
                                "query": "国泰 报告",
                                "type": "cross_fields",#跨字段匹配
                                "fields": ["title", "institution","grade"
                                           "doc_type","author","industry"],#在这6个字段中进行查找
                                "operator": "and" 
                            }#此查询条件等于:query中的关键词都在fields中所有字段拼接成的字符中
                        },
                        {
                            "range": {
                                "time": {
                                    "gte": '2018-02-01'#默认查询限制时间
                                }
                            }
                        }
                    ],
                }
            }
        }
    
        # 根据body条件查询
        scanResp = scan(es, body, scroll="10m", index="src_test_1", doc_type="doc_test", timeout="10m")
        row_num = 0
    
        for resp in scanResp:
            print(resp['_source'])
            row_num += 1
    
        print(row_num)
    
    
    query_in_es()

     ※测试结果速度相当快:多关键字查询只需零点几秒

  • 相关阅读:
    Rafy 领域实体框架演示(2)
    Rafy 领域实体框架示例(1)
    Corrupt JPEG data: 1 extraneous bytes before marker 0xd9 JPEG datastream contains no image
    Raw image encoder error: Empty JPEG image (DNL not supported)) in throwOnEror
    QByteArray数据进行CRC32校验时产生的随机结果
    socket 发送字符串0x00时被截断
    invalid argument (errno: 22)
    libpng error: IHDR: CRC error
    Cannot find Makefile. Check your build settings.
    QT中常用工具总结
  • 原文地址:https://www.cnblogs.com/dxf813/p/8447196.html
Copyright © 2011-2022 走看看