zoukankan      html  css  js  c++  java
  • elasticsearch之python操作(非原生)

    elasticsearch 模块

    Elasticsearch低级客户端。提供从Python到ES REST端点的直接映射。

    连接集群节点

    • 指定连接
    es = Elasticsearch(
        ['172.16.153.129:9200'],
        # 认证信息
        # http_auth=('elastic', 'changeme')
    )
    • 动态连接
    es = Elasticsearch(
        ['esnode1:port', 'esnode2:port'],
        # 在做任何操作之前,先进行嗅探
        sniff_on_start=True,
        # 节点没有响应时,进行刷新,重新连接
        sniff_on_connection_fail=True,
        # 每 60 秒刷新一次
        sniffer_timeout=60
    )
    • 对不同的节点,赋予不同的参数
    es = Elasticsearch([
        {'host': 'localhost'},
        {'host': 'othernode', 'port': 443, 'url_prefix': 'es', 'use_ssl': True},
    ])
    • 假如使用了 ssl
    es = Elasticsearch(
        ['localhost:443', 'other_host:443'],
        #打开SSL 
        use_ssl=True,
        #确保我们验证了SSL证书(默认关闭)
        verify_certs=True,
        #提供CA证书的路径
        ca_certs='/path/to/CA_certs',
        #PEM格式的SSL客户端证书
        client_cert='/path/to/clientcert.pem',
        #PEM格式的SSL客户端密钥
        client_key='/path/to/clientkey.pem'
    )

    获取相关信息

    • 测试集群是否启动
    In [40]: es.ping()
    Out[40]: True
    • 获取集群基本信息
    In [39]: es.info()
    Out[39]:
    {'cluster_name': 'sharkyun',
     'cluster_uuid': 'rIt2U-unRuG0hJBt6BXxqw',
     'name': 'master',
     'tagline': 'You Know, for Search',
     'version': {'build_date': '2017-10-06T20:33:39.012Z',
      'build_hash': '1a2f265',
      'build_snapshot': False,
      'lucene_version': '6.6.1',
      'number': '5.6.3'}}
    • 获取集群的健康状态信息
    In [41]: es.cluster.health()
    Out[41]:
    {'active_primary_shards': 6,
     'active_shards': 6,
     'active_shards_percent_as_number': 50.0,
     'cluster_name': 'sharkyun',
     'delayed_unassigned_shards': 0,
     'initializing_shards': 0,
     'number_of_data_nodes': 1,
     'number_of_in_flight_fetch': 0,
     'number_of_nodes': 1,
     'number_of_pending_tasks': 0,
     'relocating_shards': 0,
     'status': 'yellow',
     'task_max_waiting_in_queue_millis': 0,
     'timed_out': False,
     'unassigned_shards': 6}
    • 获取当前连接的集群节点信息
    In [43]: es.cluster.client.info()
    • 获取集群目前所有的索引
    In [55]: print(es.cat.indices())
    yellow open logstash-2017.11.04 Zt2K7k0yRZaIwmEsZ9H3DA 5 1 301000 0 162.3mb 162.3mb
    yellow open .kibana             1Epb3nPFRimFJoRwKHtXIg 1 1      2 0  13.4kb  13.4kb
    • 获取集群的更多信息
    es.cluster.stats()

    利用实例的 cat 属性得到更简单易读的信息

    In [85]: es.cat.health()
    Out[85]: '1510431262 04:14:22 sharkyun yellow 1 1 6 6 0 0 6 0 - 50.0%
    '
    
    In [86]: es.cat.master()
    Out[86]: 'VXgFbKAaTtGO5a1QAfdcLw 172.16.153.129 172.16.153.129 master
    '
    
    In [87]: es.cat.nodes()
    Out[87]: '172.16.153.129 27 49 0 0.02 0.01 0.00 mdi * master
    '
    
    In [88]: es.cat.indices()
    Out[88]: 'yellow open logstash-2017.11.04 Zt2K7k0yRZaIwmEsZ9H3DA 5 1 301000 0 162.3mb 162.3mb
    yellow open .kibana             1Epb3nPFRimFJoRwKHtXIg 1 1      2 0  13.4kb  13.4kb
    '
    
    In [89]: es.cat.count()
    Out[89]: '1510431323 04:15:23 301002
    '
    
    In [90]: es.cat.plugins()
    Out[90]: ''
    
    In [91]: es.cat.templates()
    Out[91]: 'logstash logstash-* 0 50001
    filebeat filebeat-* 0 
    '
    • 任务
    es.tasks.get()
    
    es.tasks.list()

    查询

    • 发送查询请求
    es = Elasticsearch(
            ['172.16.153.129:9200']
        )
        
    response = es.search(
        index="logstash-2017.11.14", # 索引名
        body={             # 请求体
          "query": {       # 关键字,把查询语句给 query
              "bool": {    # 关键字,表示使用 filter 查询,没有匹配度
                    "must": [      # 表示里面的条件必须匹配,多个匹配元素可以放在列表里
                        {
                            "match": {  # 关键字,表示需要匹配的元素
                                "TransId": '06100021650016153'   # TransId 是字段名, 06100021650016153 是此字段需要匹配到的值
                            }
                        },
                        {
                            "match": {
                                "Ds": '2017-05-06'
                            }
                        },
                        {
                            "match": {
                                "Gy": '2012020235'
                            }
                        }, ],
                     "must_not": {   # 关键字,表示查询的结果里必须不匹配里面的元素
                            "match": {  # 关键字
                                "message": "M("    # message 字段名,这个字段的值一般是查询到的结果内容体。这里的意思是,返回的结果里不能包含特殊字符 'M('
                            }
                     }
                }
            },
            
            # 下面是对返回的结果继续排序
            "sort": [{"@timestamp": {"order": "desc"}}],
            "from": start,  # 从匹配到的结果中的第几条数据开始返回,值是匹配到的数据的下标,从 0 开始
            "size": size    # 返回多少条数据
          }
    )
    • 得到返回结果的总条数
    total = res['hits']['total']
    • 循环返回的结果,得到想要的内容
    res_dict={}
    for hit in res['hits']['hits']:
        log_time = "%s|%s" % (hit['_source']['Ds'], hit['_source']['Us'])
        res_dict[log_time] = "%s|%s|%s|%s" % (hit['_source']['beat']['hostname'],hit['_source']['FileName'], hit['_source']['FileNum'],hit['_source']['Messager'])
    • 实例查询7天之内的流水号为:06100021650016153 的日志信息
    query_body={
        'bool': {
            'must_not': {'match': {'message': 'M('}}, 
            'must': [
                {'match': {'TransId': '06100021650016153'}}, 
                {'range': {'@timestamp': {'gte': u'now-7d', 'lte': 'now'}}}
            ]
        }
    }
    res = es.search(
        index='logstash-2017.11.14',
        body={
            "query": query_body,
            "sort":[{"@timestamp": {"order": "desc"}}]})
        }
    )
     

    更高级的 elasticsearch_dsl 模块

    官网

    小试牛刀

    • 单一字段查询
    es = Elasticsearch(
            ['172.16.153.129:9200']
        )
    s = Search(using=es,
        index="logstash-2017.11.14").filter("match",Gy='20160521491').query("match", TransId='06100021650016153').exclude("match", message="M(")
        
    response = s.execute()    
    using  
        指明用那个已经连接的对象
    query  
        接收的是查询体语句
    exclude
        接收的是不匹配的字段 就像 must_not
        
    filter
        接收的是过滤语句 ,过滤的条件意思是在返回结果中有这些条件的信息       
    
    • 统计结果总数
    s.count()
    
    response = sexecute()
    response.hits.total
    • 获取结果
    res_dict={}
    for hit in s:
    
        log_time = "%s|%s" % (hit.Ds, hit.Us')
        res_dict[log_time] = "%s|%s|%s|%s" % (hit.beat['hostname'],hit.FileName, hit.FileNum,hit.Messager)

    设置连接

    有几种方法来配置库的连接。

    最简单的选择,也是最有用的,就是定义一个默认连接,每次调用API时都会使用这个连接,而不需要显式传递其他连接。

    除非要从应用程序访问多个群集,否则强烈建议您使用该 create_connection 方法创建一个默认连接,所有操作都将自动使用该连接。

    显式传递一个连接

    如果你不想提供全局配置(也就是默认连接),你可以传入你自己的连接(实例elasticsearch.Elasticsearch)作为参数, 使用 using 接受它:

    s = Search(using=Elasticsearch('localhost'))

    甚至你可以下面的方式来覆盖一个对象已经关联的任何连接

    s = s.using(Elasticsearch('otherhost:9200'))

    默认链接

    默认连接
    要定义全局使用的默认连接,请使用 connections模块和create_connection方法:

    from elasticsearch_dsl.connections import connections
    
    client = connections.create_connection(hosts=['172.16.153.129:9200'], 
        http_auth=('elastic', 'changeme'), timeout=20)
    • 执行搜索就不必再传连接对象了
    s = Search(index="logstash-2017.11.14").filter("match",Gy='20160521491').query("match", TransId='06100021650016153').exclude("match", message="M(")

    多集群环境的连接

    多个集群
    您可以使用以下配置方法同时定义到多个群集的多个连接:

    from elasticsearch_dsl.connections import connections
    
    clients = connections.configure(
        default={'hosts': 'localhost'},
        dev={
            'hosts': ['esdev1.example.com:9200'],
            'sniff_on_start': True
        }
    )

    上面的情况是适用于第一次连接时的情况

    • 运行中设置连接
    # if you have configuration to be passed to Elasticsearch.__init__
    # 直接传递一个配置信息给 Elasticsearch
    connections.create_connection('qa', hosts=['esqa1.example.com'], sniff_on_start=True)
    
    # if you already have an Elasticsearch instance ready
    # 追加一个已经准备好的连接对象
    connections.add_connection('qa', my_client)

    使用连接对象的别名

    当使用多个连接时,您可以使用您在下面注册的字符串别名来引用它们:

    s = Search(using='qa')
    • 如果在该别名下没有注册的连接,KeyError 异常将会被抛出。
    KeyError: "There is no connection with alias 'qa'."
    

    Search DSL

    该Search对象
    该Search对象代表整个搜索请求:

    • 查询(queries)
    • 过滤器(filters)
    • 聚合(aggregations)
    • 排序(sort)
    • 分页(pagination)
    • 附加的参数(additional parameters)
    • 关联客户端(associated client)

    API 被设计为可链接的。除了聚合功能以外,这意味着Search对象是不可变的(对对象的所有更改都将导致创建(拷贝)一个包含更改的副本)。这意味着您可以安全地将Search对象传递给外部代码,而不必担心这个对象会被修改。

    实例化对象时,您可以传递低级别的elasticsearch客户端实例Search:

    from elasticsearch import Elasticsearch
    from elasticsearch_dsl import Search
    
    client = Elasticsearch()
    
    s = Search(using=client)
    

    ==所有的方法都会返回一个对象的副本,从而安全地传递给外部代码。==

    • API 是可连接的,允许你在一个语句中调用多个方法:
    s = Search().using(client).query("match", title="python")
    
    • 要将请求发送到Elasticsearch:
    s.execute()
    
    • 如果您只是想遍历搜索返回的匹配,则可以遍历该Search对象:
    for hit in s:
    
        print(log_time = "%s|%s" % (hit.Ds, hit.Us'))
        print(hit.beat['hostname'],hit.FileName, hit.FileNum,hit.Messager)
    
    

    DS、US、beat、FileName等都是映射好的字段名,就是用 elasticsearch 模块的 search 方法得到的结果里的 hit['_source'] 里面的内容。

    搜索结果将被缓存。随后调用execute或试图遍历已经执行的Search对象将不会触发额外的请求发送到Elasticsearch。强制请求时指定 ignore_cache=True调用execute。

    • 这个 Search 的对象也可以转换为之前的 Query DSL 格式
    s.to_dict()
    

    查询

    Elasticsearch_dsl 的 query 类为所有Elasticsearch查询类型提供类。 传递所有参数作为关键字参数。 这些类接受任何关键字参数,然后dsl将传递给构造函数的所有参数作为结果字典中的顶级关键字序列化(因此生成的json被发送到elasticsearch)。 这意味着在DSL中原始查询和其等价物之间存在明确的一对一映射:

    from elasticsearch_dsl.query import MultiMatch, Match
    
    # {"multi_match": {"query": "python django", "fields": ["title", "body"]}}
    MultiMatch(query='python django', fields=['title', 'body'])
    
    # {"match": {"title": {"query": "web framework", "type": "phrase"}}}
    Match(title={"query": "web framework", "type": "phrase"})
    

    ==在某些情况下,由于python对标识符的限制,这种方法不支持字段中含有特殊字符的情况,比如:@timestamp。==

    在这种情况下,你必须史使用原来的字典形式:Range(** {'@timestamp': {'lt': 'now'}})

    强大的 Q

    • 您可以使用Q快捷方式可以把带参数的名称或原始数据的dict构建成 Search 对应类的实例:
    from elasticsearch_dsl import Q
    Q("multi_match", query='python django', fields=['title', 'body'])
    Q({"multi_match": {"query": "python django", "fields": ["title", "body"]}})
    # 这两种方式最后转换的结果是一致的
    MultiMatch(fields=['title', 'body'], query='python django')
    
    • 要将查询添加到Search对象,请使用以下.query()方法:
    q = Q("multi_match", query='python django', fields=['title', 'body'])
    s = s.query(q)
    
    • 当然,Q 接收的参数,,query 方法都支持
    s = s.query("multi_match", query='python django', fields=['title', 'body'])
    
    • 用 Q 实现组合查询

      Q 对象可以使用逻辑运算符进行组合:

    Q("match", title='python') | Q("match", title='django')
    # {"bool": {"should": [...]}}
    # 匹配到任意条件即可
    
    Q("match", title='python') & Q("match", title='django')
    # {"bool": {"must": [...]}}
    # 列表里的条件必须同时匹配
    
    ~Q("match", title="python")
    # {"bool": {"must_not": [...]}}
    # 非
    
    • 实现组合查询的另一种方法

    query 方法可以被连续调用

    In [193]: sa = Search().query().query('match',title='python').query('match',body='django')
    
    In [194]: sa.to_dict()
    Out[194]:
    {
        "query": {
            "bool": {
                "must": [
                    {"match": {
                        "title": "python"}
                    },
                    {"match": {
                        "body": "django"}
                    }
                ]
            }
        }
    }
    
    • 假如你希望对查询的条件进行精确的控制,请使用 Q 构造组合查询:
    q = Q('bool',
        must=[Q('match', title='python')],
        should=[Q(...), Q(...)],
        minimum_should_match=1
    )
    s = Search().query(q)
    

    过滤器

    过滤请使用 filter 方法

    s = Search()
    s = s.filter('terms', tags=['search', 'python'])
    # {'query': {'bool': {'filter': [{'terms': {'tags': ['search', 'python']}}]}}}
    

    在幕后,这将产生一个Bool查询并将指定的 terms查询放入其filter分支,使其等价于:

    s = Search()
    s = s.query('bool', filter=[Q('terms', tags=['search', 'python'])])
    # {'query': {'bool': {'filter': [{'terms': {'tags': ['search', 'python']}}]}}}
    

    ==下面没搞懂==

    如果您想使用post_filter元素进行分面导航,请使用该 .post_filter()方法。

    你也可以用 exclude() 排除查询项目:

    s = Search()
    s = s.exclude('terms', tags=['search', 'python'])
    

    官网上说下面是简写,简直不敢相信

    s = s.query('bool', filter=[~Q('terms', tags=['search', 'python'])])
    

    聚合

    定义一个聚合,请使用 A

    A('terms', field='tags')
    # {"terms": {"field": "tags"}}
    
    • 嵌套聚合,可以使用.bucket(),.metric()和 .pipeline()方法:
    a = A('terms', field='category')
    # {'terms': {'field': 'category'}}
    
    a.metric('clicks_per_category', 'sum', field='clicks')
        .bucket('tags_per_category', 'terms', field='tags')
    # {
    #   'terms': {'field': 'category'},
    #   'aggs': {
    #     'clicks_per_category': {'sum': {'field': 'clicks'}},
    #     'tags_per_category': {'terms': {'field': 'tags'}}
    #   }
    # }
    
    • 要将聚合添加到Search对象,请使用.aggs充当顶级聚合的属性
    s = Search()
    a = A('terms', field='category')
    s.aggs.bucket('category_terms', a)
    # {
    #   'aggs': {
    #     'category_terms': {
    #       'terms': {
    #         'field': 'category'
    #       }
    #     }
    #   }
    # }
    

    或者下面这样的,有点儿变态

    s = Search()
    s.aggs.bucket('articles_per_day', 'date_histogram', field='publish_date', interval='day')
        .metric('clicks_per_day', 'sum', field='clicks')
        .pipeline('moving_click_average', 'moving_avg', buckets_path='clicks_per_day')
        .bucket('tags_per_day', 'terms', field='tags')
    
    s.to_dict()
    # {
    #   "aggs": {
    #     "articles_per_day": {
    #       "date_histogram": { "interval": "day", "field": "publish_date" },
    #       "aggs": {
    #         "clicks_per_day": { "sum": { "field": "clicks" } },
    #         "moving_click_average": { "moving_avg": { "buckets_path": "clicks_per_day" } },
    #         "tags_per_day": { "terms": { "field": "tags" } }
    #       }
    #     }
    #   }
    # }
    
    • 您可以通过名称访问现有存储桶
    s = Search()
    
    s.aggs.bucket('per_category', 'terms', field='category')
    s.aggs['per_category'].metric('clicks_per_category', 'sum', field='clicks')
    s.aggs['per_category'].bucket('tags_per_category', 'terms', field='tags')
    

    ==当链接多个聚合时,什么.bucket()和.metric()方法返回之间是有区别的==
    ==.bucket()返回新定义的存储区,同时.metric()返回其父容器以允许进一步链接。==
    ==与Search对象上的其他方法相反,定义聚合是在原地完成的(不返回副本)。==

    排序

    要指定排序顺序,请使用 .sort() 方法:

    s = Search().sort(
        'category',
        '-title',
        {"lines" : {"order" : "asc", "mode" : "avg"}}
    )
    

    ==它接受可以是字符串或字典的位置参数。字符串值是一个字段名称,可以用-符号前缀来指定降序。==

    • 恢复排序使用无参的 sort() 方法
    s = s.sort()
    

    分页

    要指定from / size参数,请使用Python切片AP

    s = s[10:20]
    # {"from": 10, "size": 10}
    
    • 如果要访问与查询匹配的所有文档,可以使用 scan使用扫描/滚动弹性搜索API的方法
    for hit in s.scan():
        print(hit.title)
    

    ==请注意,在这种情况下,结果将不会被排序。==

    突出高亮

    • 要设置突出显示的常用属性,请使用以下highlight_options方法:
    s = s.highlight_options(order='score')
    
    • 为各个字段启用突出显示使用以下highlight方法完成:
    s = s.highlight('title')
    # or, including parameters:
    s = s.highlight('title', fragment_size=50)
    
    • 返回的结果将在被赋值到一个对象(变量)上可用,.meta.highlight.FIELD 将包含结果的列表:
    response = s.execute()
    for hit in response:
        for fragment in hit.meta.highlight.title:
            print(fragment)
    

    Suggestions 建议

    ==此部分不懂,有待研究==

    要在Search对象上指定建议请求,请使用以下suggest方法:

    s = s.suggest('my_suggestion', 'pyhton', term={'field': 'title'})
    

    第一个参数是建议名称(它将返回的名称),第二个是你希望建议者处理的实际文本,关键字参数将被添加到建议的json中,这意味着它应该成为其中一个term,phrase或者completion指出应该使用哪种类型的建议者。

    如果您只希望运行搜索的建议部分(通过_suggest 端点),您可以通过execute_suggest以下方式进行:

    s = s.suggest('my_suggestion', 'pyhton', term={'field': 'title'})
    suggestions = s.execute_suggest()
    
    print(suggestions.my_suggestion)
    

    额外的属性和参数

    • 要设置搜索请求的额外属性,请使用该.extra()方法。这可以被用于定义在不能经由像特定的API方法来限定所述主体的键explain或search_after:
    s = s.extra(explain=True)
    
    • 要设置查询参数,请使用以下.params()方法:
    s = s.params(search_type="count")
    
    • 如果您需要限制elasticsearch返回的字段,请使用以下 source()方法:
    #只返回选定的字段
    s = s.source(['title', 'body'])
    
    #不返回任何字段,只是元数据
    s = s.source(False)
    
    #明确包含/排除字段
    s = s.source(include=["title"], exclude=["user.*"])
    
    #重置字段选择
    s = s.source(None)
    

    序列化和反序列化

    Search 的对象可以使用 .to_dict() 方法序列化为一个字典

    您也可以使用 Search 类的 .from_dict() 方法创建一个Search对象。
    这将创建一个新的对象,并使用字典中的数据填充这个新的对象

    s = Search.from_dict({"query": {"match": {"title": "python"}}})
    

    如果要修改现有的Search对象,并重写它的属性,可以使用这个实例的 update_from_dict() 方法,改变是实时生效的:

    In [2]: s = Search()
    
    In [4]: s.to_dict()
    Out[4]: {'query': {'match_all': {}}}
    
    In [5]: s.update_from_dict({"query": {"match": {"title": "python"}}, "size": 42}
       ...: )
    Out[5]: <elasticsearch_dsl.search.Search at 0x10a7c8550>
    
    In [6]: s.to_dict()
    Out[6]: {'query': {'match': {'title': 'python'}}, 'size': 42}
    

    返回结果

    您可以通过调用 Search 对象的 .execute() 方法来执行搜索,之后将返回一个 Response 对象。
    你可以将这个返回的对象(结果)赋值给一个对象。
    该 Response 对象允许您通过属性访问的方式(也就是 . )来访问 Response 对象字典中的任何键。

    它还提供了一些方便的帮手:

    response = s.execute()
    
    print(response.success())
    # 是否成功
    # True
    
    print(response.took)
    # 命中数
    # 12
    
    print(response.hits.total)
    
    print(response.suggest.my_suggestions)
    

    如果要检查response对象的内容,只需使用其 to_dict方法访问原始数据即可打印。

    Hits

    要访问搜索返回的匹配对象,请访问该hits属性或只是遍历该Response对象:

    response = s.execute()
    print('Total %d hits found.' % response.hits.total)
    for h in response:
        print(h.title, h.body)
    

    结果

    单独的命中包装在一个便利的类,允许属性访问返回的字典中的键。结果的所有元数据都可以通过meta(不带下划线开头 _ )访问:

    response = s.execute()
    h = response.hits[0]
    print('/%s/%s/%s returned with score %f' % (
        h.meta.index, h.meta.doc_type, h.meta.id, h.meta.score))
    

    响应 = s 。执行()
    h = 响应。命中[ 0 ]
    打印('/ %S / %S / %S 与得分返回%F ' % (
    ħ 。元。指数, ħ 。元。DOC_TYPE , ħ 。元。ID , ħ 。元。得分))
    注意

    ==如果刚好文档中有一个字段叫 meta ,可以使用字典的键方法来访问它:hit['meta']。==

    聚合

    聚合可通过aggregations属性获得:

    for tag in response.aggregations.per_tag.buckets:
        print(tag.key, tag.max_lines.value)
    

    MultiSearch

    如果您需要同时执行多个搜索,则可以使用 MultiSearch 类,这将使用该类的 _msearch API :

    from elasticsearch_dsl import MultiSearch, Search
    
    ms = MultiSearch(index='blogs')
    
    ms = ms.add(Search().filter('term', tags='python'))
    ms = ms.add(Search().filter('term', tags='elasticsearch'))
    
    responses = ms.execute()
    
    for response in responses:
        print("Results for query %r." % response.search.query)
        for hit in response:
            print(hit.title)
    

    持久化

    映射

    • 您可以使用dsl库为应用程序定义映射和基本的持久层。

    映射定义遵循与查询dsl类似的模式:

    from elasticsearch_dsl import Keyword, Mapping, Nested, Text
    
    # name your type
    m = Mapping('my-type')
    
    # add fields
    m.field('title', 'text')
    
    # you can use multi-fields easily
    m.field('category', 'text', fields={'raw': Keyword()})
    
    # you can also create a field manually
    comment = Nested()
    comment.field('author', Text())
    comment.field('created_at', Date())
    
    # and attach it to the mapping
    m.field('comments', comment)
    
    # you can also define mappings for the meta fields
    m.meta('_all', enabled=False)
    
    # save the mapping into index 'my-index'
    m.save('my-index')
    

    ==注意==

    默认情况下,所有的字段(除了Nested)都会有单个值。
    您可以在创建/定义字段期间,通过向构造函数传入 multi=True(m.field('tags', Keyword(multi=True)))来始终覆盖此期望值。

    那么,即使字段没有被设置,字段的值也将是一个空的列表,使您能够写入。
    doc.tags.append('search')

    • 特别是如果您使用动态映射,则可以根据Elasticsearch中的现有类型更新映射,或直接从现有类型创建映射:
    # get the mapping from our production cluster
    m = Mapping.from_es('my-index', 'my-type', using='prod')
    
    # update based on data in QA cluster
    m.update_from_es('my-index', using='qa')
    
    # update the mapping on production
    # 在生产上更新映射
    m.save('my-index', using='prod')
    
    • 常用字段选项:

    multi
    如果设置True为该字段的值将被设置为[]第一次访问。
    required
    指示字段是否需要文档的有效值。

    Analysis

    要指定字段的analyzer值,Text您可以使用分析仪的名称(作为字符串),并依靠定义的分析仪(如内置分析仪)或手动定义分析仪。

    或者,您可以创建自己的分析器并让持久层处理其创建:

    from elasticsearch_dsl import analyzer, tokenizer
    
    my_analyzer = analyzer('my_analyzer',
        tokenizer=tokenizer('trigram', 'nGram', min_gram=3, max_gram=3),
        filter=['lowercase']
    )
    
    

    每个分析对象需要有一个名字(my_analyzer和trigram在我们的例子)和断词,令牌过滤器和过滤器炭还需要指定类型(nGram在我们的例子)。

    ==在创建依赖于自定义分析器的映射时,索引必须不存在或被关闭。要创建多个DocType定义的映射,您可以使用Index对象==

    DocType

    如果你想在你的文档中创建一个类似于模型的包装,请使用 DocType类:

    from datetime import datetime
    from elasticsearch_dsl import DocType, Date, Nested, Boolean, 
        analyzer, InnerObjectWrapper, Completion, Keyword, Text
    
    html_strip = analyzer('html_strip',
        tokenizer="standard",
        filter=["standard", "lowercase", "stop", "snowball"],
        char_filter=["html_strip"]
    )
    
    class Comment(InnerObjectWrapper):
        def age(self):
            return datetime.now() - self.created_at
    
    class Post(DocType):
        title = Text()
        title_suggest = Completion()
        created_at = Date()
        published = Boolean()
        category = Text(
            analyzer=html_strip,
            fields={'raw': Keyword()}
        )
    
        comments = Nested(
            doc_class=Comment,
            properties={
                'author': Text(fields={'raw': Keyword()}),
                'content': Text(analyzer='snowball'),
                'created_at': Date()
            }
        )
    
        class Meta:
            index = 'blog'
    
        def add_comment(self, author, content):
            self.comments.append(
              {'author': author, 'content': content})
    
        def save(self, ** kwargs):
            self.created_at = datetime.now()
            return super().save(** kwargs)
    

    文档生命周期 (Document life cycle)

    在首次使用Post文档类型之前,您需要在Elasticsearch中创建映射。为此,您可以使用Index对象或通过调用init类方法直接创建映射:

    # create the mappings in Elasticsearch
    Post.init()
    

    要创建一个新Post文档只需实例化这个类并传入你想要设置的任何字段,就可以使用标准属性设置来改变/添加更多的字段。请注意,您不限于显式定义的字段:

    # instantiate the document
    first = Post(title='My First Blog Post, yay!', published=True)
    # assign some field values, can be values or lists of values
    first.category = ['everything', 'nothing']
    # every document has an id in meta
    first.meta.id = 47
    
    
    # save the document into the cluster
    first.save()
    
    

    所有的元数据字段(id,parent,routing,index等等)可以被访问(和设置)。
    通过meta属性或直接用下划线变体访问它们:

    post = Post(meta={'id': 42})
    
    # prints 42, same as post._id
    print(post.meta.id)
    
    # override default index, same as post._index
    post.meta.index = 'my-blog'
    
    • To retrieve(检索,得到) an existing document use the get class method:
    # retrieve the document
    first = Post.get(id=42)
    # now we can call methods, change fields, ...
    first.add_comment('me', 'This is nice!')
    # and save the changes into the cluster again
    first.save()
    
    # you can also(也) update just individual fields which will call the update API
    # and also(并且) update the document in place(首先)
    first.update(published=True, published_by='me')
    
    • If the document is not found in elasticsearch an exception (elasticsearch.NotFoundError) will be raised. If you wish to return None instead just pass in ignore=404 to suppress the exception:
    p = Post.get(id='not-in-es', ignore=404)
    p is None
    
    • 当您想要同时检索多个文档时,id 您可以使用以下mget方法:
    posts = Post.mget([42, 47, 256])
    

    mgetNotFoundError如果有任何文件没有找到,并且RequestError文件中有任何内容导致错误,将会默认提出。您可以通过设置参数来控制此行为:

    raise_on_error
        如果True(默认),那么任何错误都会引发异常。否则,包含错误的所有文档将被视为丢失。
    missing
        可以有三个可能的值:('none'默认)'raise'和 'skip'。如果文档丢失或出错,将被替换为None,将引发异常或文档将完全跳过。
    

    所有有关的信息DocType,包括它的信息Mapping都可以通过_doc_type类的属性来访问:

    # name of the type and index in elasticsearch
    Post._doc_type.name
    Post._doc_type.index
    
    # the raw Mapping object
    # 原始映射对象
    Post._doc_type.mapping
    
    # the optional name of the parent type (if defined)
    # 父类型的可选名称(如果已定义)
    Post._doc_type.parent
    
    • The _doc_type attribute is also home to the refresh method which will update the mapping on the DocType from elasticsearch.

    _doc_type 属性也是可以通过 refresh 方法来更新elasticsearch的DocType上的映射。

    假如你使用的是动态映射,并希望类知道这些字段,(比如,你希望日期字段能被正确的序列化)你这样将会是很有用的:

    Post._doc_type.refresh()
    
    • To delete a document just call its delete method:
    first = Post.get(id=42)
    first.delete()
    

    Search

    To search for this document type, use the search class method:

    # by calling .search we get back a standard Search object
    # 通过调用 .search(), 我们得到一个标准的搜索对象
    s = Post.search()
    # the search is already limited to the index and doc_type of our document
    s = s.filter('term', published=True).query('match', title='first')
    
    
    results = s.execute()
    
    # when you execute the search the results are wrapped in your document class (Post)
    for post in results:
        print(post.meta.score, post.title)
    
    • 或者,您可以只取一个Search对象,并限制它返回我们的文档类型,用正确的类包装:
    s = Search()
    s = s.doc_type(Post)
    
    • 您也可以将文档类与标准文档类型(只是字符串)结合起来,这将像以前一样处理。您也可以传入多个DocType 子类,响应中的每个文档将被包装在它的类中。

    If you want to run suggestions, just use the suggest method on the Search object:

    s = Post.search()
    s = s.suggest('title_suggestions', 'pyth', completion={'field': 'title_suggest'})
    
    # you can even execute just the suggestions via the _suggest API
    suggestions = s.execute_suggest()
    
    for result in suggestions.title_suggestions:
        print('Suggestions for %s:' % result.text)
        for option in result.options:
            print('  %s (%r)' % (option.text, option.payload))
    

    class Meta 选项

    在Meta文档定义的类中,您可以为文档定义各种元数据:

    doc_type
    elasticsearch中的doc_type的名称。默认情况下,它将从类名(MyDocument - > my_document)
    index
    文档的默认索引,默认情况下它是空的,并且每个操作(比如get或save需要一个明确的index参数)
    using
    默认使用的连接别名,默认为 'default'
    mapping
    Mapping类的可选实例,用作从文档类本身上的字段创建的映射的基础。
    在任何属性Meta是的实例类MetaField将被用于控制元字段(的映射_all,_parent等等)。只需将参数(不带前导下划线)命名为要映射的字段并将任何参数传递给MetaField类:

    class Post(DocType):
        title = Text()
    
        class Meta:
            all = MetaField(enabled=False)
            parent = MetaField(type='blog')
            dynamic = MetaField('strict')
    

    索引(Index)

    Index是一个类,负责在elasticsearch映射和设置中保存与索引有关的所有元数据。
    在定义 index 时最为有用,因为它允许同时轻松创建多个 index。在迁移中设置弹性搜索对象时,这非常有用:

    from elasticsearch_dsl import Index, DocType, Text, analyzer
    
    blogs = Index('blogs')
    
    # define custom settings
    blogs.settings(
        number_of_shards=1,
        number_of_replicas=0
    )
    
    # define aliases
    blogs.aliases(
        old_blogs={}
    )
    
    # register a doc_type with the index
    blogs.doc_type(Post)
    
    # can also be used as class decorator when defining the DocType
    @blogs.doc_type
    class Post(DocType):
        title = Text()
    
    # You can attach custom analyzers to the index
    
    html_strip = analyzer('html_strip',
        tokenizer="standard",
        filter=["standard", "lowercase", "stop", "snowball"],
        char_filter=["html_strip"]
    )
    
    blogs.analyzer(html_strip)
    
    # delete the index, ignore if it doesn't exist
    blogs.delete(ignore=404)
    
    # create the index in elasticsearch
    blogs.create()
    
    • 您还可以为您的索引设置模板,并使用该clone方法创建特定的副本:
    blogs = Index('blogs', using='production')
    blogs.settings(number_of_shards=2)
    blogs.doc_type(Post)
    
    # create a copy of the index with different name
    company_blogs = blogs.clone('company-blogs')
    
    # create a different copy on different cluster
    dev_blogs = blogs.clone('blogs', using='dev')
    # and change its settings
    dev_blogs.setting(number_of_shards=1)




  • 相关阅读:
    Go语言踩过的坑---记录GOPATH在GOLAND中的坑
    反射小例子
    制作pip包
    mac常用软件安装链接
    YCSB压测elasticsearch
    SSO和Auth2.0
    JAVA内部类的四大作用
    修改fastadmin,添加模糊查询
    AH00558: apache2: Could not reliably determine the server's fully qualified domain name, using 172.18.0.2. Set the 'ServerName' directive globally to suppress this message
    taro3.x: 封装实现chat emit on
  • 原文地址:https://www.cnblogs.com/xingxia/p/elasticsearch_python3.html
Copyright © 2011-2022 走看看