zoukankan      html  css  js  c++  java
  • python elasticsearch_dsl模块

    在整理elasticsearch_dsl模块过程中,着实让我头大。
    个人感觉就是资料太少而且很乱,不成体系,接口很多,没有规范。
    此文凑合着看,以后有时间后加以修改。
    还有一些部分官方文档都没有说清楚,以后弄懂了再补充。

    简介

    elasticsearch-dsl是基于elasticsearch-py封装实现的,提供了更简便的操作elasticsearch的方法。

    安装

    pip install elasticsearch_dsl
    

    连接elasticsearch

    使用 connections.create_connection()进行连接,根据源码:

    # alias是设置的别名
    conn = self._conns[alias] = Elasticsearch(**kwargs)
    return conn
    

    我们知道,create_connection的参数应该与elasticsearch.Elasticsearch是相一致的。详见此文档:Python Elasticsearch Client

    下面做一下简单的连接工作:

    from elasticsearch_dsl import connections, Search
    
    
    es = connections.create_connection(hosts=["localhost:9200"], timeout=20)
    

    除此之外,还可以通过alias给连接设置别名,后续可以通过别名来引用该连接,默认别名为default

    from elasticsearch_dsl import connections
    
    connections.create_connection(alias='my_new_connection', hosts=["localhost:9200"], timeout=60)
    

    使用别名后这样用:

    from elasticsearch_dsl import connections, Search, Q
    
    
    es = connections.create_connection(alias="eaaa", hosts=["localhost:9200"])
    
    e = Search(using="eaaa").index("account").query()
    
    for i in e:
        print(i.to_dict())
    

    elasticsearch_dsl.Search

    search对象代表整个搜索请求,包括:queries、filters、aggregations、sort、pagination、additional parameters、associated client。
    API被设置为可链接的即和用.连续操作。search对象是不可变的,除了聚合,对对象的所有更改都将导致创建包含该更改的浅表副本。

    当初始化Search对象时,你可以传递elasticsearch客户端作为using的参数。比如:

    from elasticsearch_dsl import connections, Search
    
    
    es = connections.create_connection(hosts=["localhost:9200"], timeout=20)
    s = Search(using=es)
    
    

    当然,不传也没关系,后面可以在指定。
    一般使用:

    from elasticsearch_dsl import connections, Search
    
    
    es = connections.create_connection(hosts=["localhost:9200"], timeout=20)
    
    s = Search(using=es).query("match", firstname="Hattie")
    # 这样写也行
    # s = Search(using=es).query({"match": {"firstname": "Hattie"}})
    
    for i in s:
        print(i.to_dict())
    	
    
    """
    结果:
    {'account_number': 6, 'balance': 5686, 'firstname': 'Hattie', 'lastname': 'Bond', 'age': 36, 'gender': 'M', 'address': '671 Bristol Street', 'employer': 'Netagy', 'email': 'hattiebond@netagy.com', 'city': 'Dante', 'state': 'TN'}
    
    """
    

    执行execute方法将请求发送给elasticsearch:
    response = s.execute()

    上面的例子没有调用execute()是因为我们调用了循环,在search对象是一个可迭代对象,它的源码:

    def __iter__(self):
    	"""
    	Iterate over the hits.
    	"""
    	return iter(self.execute())
    
    

    所以我们不需要执行execute()方法,
    迭代后可以通过to_dict()方法将Search对象序列化为一个dict对象,这样可以方便调试。

    • query方法
      查询,参数可以是Q对象,也可以是query模块中的一些类,还可以是自已写上如何查询。
    • filter方法
      如果你想要在过滤上下文中添加查询,可以使用filter()函数来使之变的简单。
      
      s = Search()
      s = s.filter('terms', tags=['search', 'python'])
      
      在背后,这会产生一个bool查询,并将指定的条件查询放入其filter分支,等价与下面的操作:
      s = Search()
      s = s.query('bool', filter=[Q('terms', tags=['search', 'python'])])
      
      s = Search()
      s = s.filter('terms', tags=['search', 'python'])
      
      
      # 过滤,在此为范围过滤,range是方法,timestamp是所要查询的field的名字,gte意为大于等于,lt意为小于,根据需要设定即可(似乎过滤只能接受数字形式的内容,如果是文本就会返回空)
      # 关于term和match的区别,term是精确匹配,match会模糊化,会进行分词,返回匹配度分数,(term查询字符串之接受小写字母,如果有大写会返回空即没有命中,match则是不区分大小写都可以进行查询,返回结果也一样)
      
      # 例1:范围查询
      s = s.filter("range", timestamp={"gte": 0, "lt": time.time()}).query("match", country="in")
      # 例2:普通过滤
      res_3 = s.filter("terms", balance_num=["39225", "5686"]).execute()
      
    • index方法
      指定索引
    • usring方法
      指定哪个elasticsearch

    注意:
    当调用.query()方法多次时,内部会使用&操作符:

    s = s.query().query()
    print(s.to_dict())
    # {"query": {"bool": {...}}}
    

    elasticsearch_dsl.query

    该库为所有的Elasticsearch查询类型都提供了类。以关键字参数传递所有的参数,最终会把参数序列化后传递给Elasticsearch,这意味着在原始查询和它对应的dsl之间有这一个清理的一对一的映射。

    
    from elasticsearch_dsl import connections, Search
    from elasticsearch_dsl.query import MultiMatch, Match, MatchAll, Filtered, Term, Terms, Prefix
    
    es = connections.create_connection(hosts=["localhost:9200"], timeout=20)
    
    
    
    # 相对与{"multi_match": {"query": "ha", "fields": ["firstname", "lastname"]}}
    m1 = MultiMatch(query='Ha', fields=['firstname', 'lastname'])
    
    # 相当于{"match": {"firstname": {"query": "Hughes"}}}
    m2 = Match(firstname={"query": "Hughes"})
    
    
    s = s.query(m2)
    
    for i in s:
        print(i.to_dict())
    

    这个库里面还有很多类,对应着dsl的查询方式,可以自己点击去看看。

    elasticsearch_dsl.Q

    使用快捷方式Q通过命名参数或者原始dict类型数据来构建一个查询实例。

    from elasticsearch_dsl import connections, Search, Q
    
    
    es = connections.create_connection(hosts=["localhost:9200"], timeout=20)
    # q = Q("match", city="Summerfield")
    q = Q("multi_match", query='Summerfield', fields=['city', 'firstname'])
    
    
    s = Search(using=es).index("account")
    s = s.query(q)
    
    for i in s:
        print(i.to_dict())
    
    

    可以看到,Q的格式一般是Q("查询类型", 字段="xxx")Q("查询类型", query="xxx", fields=['字段1', '字段2'])

    查询对象可以通过逻辑运算符组合起来:

    Q("match", title='python') | Q("match", title='django')
    # {"bool": {"should": [...]}}
    
    Q("match", title='python') & Q("match", title='django')
    # {"bool": {"must": [...]}}
    
    ~Q("match", title="python")
    # {"bool": {"must_not": [...]}}
    

    比如:

    from elasticsearch_dsl import Q, Search
    # q = Q("multi_match", query="123.244.101.255", fields=["clientip", 'timestamp'])
    q = Q("match", clientip="123.244.101.255") | Q("match", clientip="42.248.168.61")
    
    s = Search().query(q)
    
    for item in s:
        print(item.clientip)
    

    嵌套类型

    有时候你想要引用一个在其他字段中的字段,例如多字段(title.keyword)或者在一个json文档中的address.city。为了方便,Q允许你使用双下划线‘__’代替关键词参数中的‘.’

    s = Search()
    s = s.filter('term', category__keyword='Python')
    s = s.query('match', address__city='prague')
    

    查询

    简单的例子:

    from elasticsearch_dsl import Search
    from elasticsearch import Elasticsearch
    
    es_object = Elasticsearch(['url:9200'], sniffer_timeout=60, timeout=30)
    
    # 获取es中所有的索引
    # 返回类型为字典,只返回索引名
    es_object.cat.indices(format='json', h='index')
    
    # 查询多个索引
    es_s_object = Search(using=es_object, index=['log-2018-07-31', 'log-2018-07-30', 'log-2018-07-29'])
    
    # 查询一个索引
    es_s_object = Search(using=es_object, index='log-2018-07-31')
    
    # 条件查询1
    es_r_object = es_s_object.filter("range", timestamp={"gte": 1532879975, "lt": 1532880095})
    
    # 条件查询2
    es_r_object = es_r_object.filter("term", title='12345')
    
    # 结果转换为字典
    es_r_dict = es_r_object.execute().to_dict()
    
    

    数据准备

    1. 下载示例数据
      点击这里查看官方的示例数据,下载" accounts.zip"。

    2. 创建索引

      
      
      from elasticsearch import Elasticsearch
      
      es = Elasticsearch(host="localhost", port=9200)
      
      body = {
      	"mappings": {
      		"properties": {
      			"account_number": {
      				"type": "integer"
      			},
      			"balance": {
      				"type": "integer"
      			},
      			"firstname": {
      				"type": "text"
      			},
      			"lastname": {
      				"type": "text"
      			},
      			"age": {
      				"type": "integer"
      			},
      			"gender": {
      				"type": "keyword"
      			},
      			"address": {
      				"type": "text"
      			},
      			"employer": {
      				"type": "text"
      			},
      			"email": {
      				"type": "text"
      			},
      			"city": {
      				"type": "text"
      			},
      			"state": {"type": "text"}
      
      		}
      	}
      }
      # 创建 index
      es.indices.create(index="account", body=body)
      
    3. 把数据写入文档中

    accounts.zip解压目录下,执行命令:

    curl -H 'Content-Type: application/json' -XPOST 'localhost:9200/account/_doc/_bulk?pretty' --data-binary @accounts.json
    

    Windows平台:注意是否有curl命令,git bash自带,PowerShell好像也有

    开始查询

    我随便出的题,尽量让一些常用的查询方式用到,主要是用于加深印象的。

    1. 余额在15000到20000的顾客
    2. 余额在20000到30000的男性顾客
    3. state为WY,28岁或38岁的女性顾客
    4. 地址中有Street,年龄不在20-30的女顾客
    5. 按照年龄聚合,并且计算每个年龄的平均余额
    6. 按照年龄聚合,求20到30岁不同性别的平均余额

    如何使用聚合,可以在文章的"其它方法"中的“聚合”中查看

    # 1. 余额在15000到2000的顾客
    from elasticsearch_dsl import connections, Search, Q
    
    es = connections.create_connection(hosts=["localhost:9200"])
    s = Search(using=es, index="account")
    
    q = Q("range", balance={"gte": 15000, "lte": 20000})
    s1 = s.query(q)
    
    print("共查到%d条数据" % s1.count())
    
    for i in s1[90:]:
        print(i.to_dict())
    
    
    
    # 2. 余额在20000到30000的男性顾客
    from elasticsearch_dsl import connections, Search, Q
    
    es = connections.create_connection(hosts=["localhost:9200"])
    s = Search(using=es, index="account")
    
    # 确定多少钱
    q1 = Q("range", balance={"gte": 20000, "lte": 30000})
    # 确定为男性
    q2 = Q("term", gender="M")
    # and
    q = q1 & q2
    s1 = s.query(q)
    
    print("共查到%d条数据" % s1.count())
    
    for i in s1:
        print(i.to_dict())
    
    
    
    # 3. state为WY,28岁或38岁的女性顾客
    from elasticsearch_dsl import connections, Search, Q
    
    es = connections.create_connection(hosts=["localhost:9200"])
    s = Search(using=es, index="account")
    
    #  state为WY
    q1 = Q("match", state="WY")
    
    # 28或38的女性
    q2 = Q("bool", must=[Q("terms", age=[28, 38]), Q("term", gender="F")])
    
    # and
    q = q1 & q2
    s1 = s.query(q)
    
    print("共查到%d条数据" % s1.count())
    
    for i in s1:
    
        print(i.to_dict())
    
    
    # ############ 第二种写法 #############################
    
    from elasticsearch_dsl import connections, Search, Q
    
    es = connections.create_connection(hosts=["localhost:9200"])
    s = Search(using=es, index="account")
    
    #  state为WY
    q1 = Q("match", state="WY")
    
    q2 = Q("term", age=28) | Q("term", age=38)
    
    # 多次query就是& ==> and 操作
    s1 = s.query(q1).query(q2)
    
    print("共查到%d条数据" % s1.count())
    
    for i in s1:
        print(i.to_dict())
    
    
    # 4. 地址中有Street,年龄不在20-30的女顾客
    from elasticsearch_dsl import connections, Search, Q
    
    es = connections.create_connection(hosts=["localhost:9200"])
    s = Search(using=es, index="account")
    
    # 地址中有Street且为女性
    q1 = Q("match", address="Street") & Q("match", gender="F")
    
    # 年龄在20-30
    q2 = Q("range", age={"gte": 20, "lte": 30})
    
    # 使用filter过滤
    # query和filter的前后关系都行
    s1 = s.query(q1).filter(q2)
    
    print("共查到%d条数据" % s1.count())
    
    # scan()列出全部数据
    for i in s1.scan():
        print(i.to_dict())
    
    
    # 5. 按照年龄聚合,并且计算每个年龄的平均余额
    from elasticsearch_dsl import connections, Search, A, Q
    
    es = connections.create_connection(hosts=["localhost:9200"])
    s = Search(using=es, index="account")
    
    # 先用年龄聚合,然后拿到返平均数
    # size指定最大返回多少条数据,默认10条
    # 实质上account的数据中,age分组没有100个这么多
    
    a = A("terms", field="age", size=100).metric("age_per_balance", "avg", field="balance")
    
    s.aggs.bucket("res", a)
    
    
    # 执行并拿到返回值
    response = s.execute()
    
    
    # res是bucket指定的名字
    # response.aggregations.to_dict是一个
    # {'doc_count_error_upper_bound': 0, 'sum_other_doc_count': 463, 'buckets': [{'...
    # 的数据,和用restful查询的一样
    
    for i in response.aggregations.res:
        print(i.to_dict())
    
    
    # 6. 按照年龄聚合,求20到30岁不同性别的平均余额
    
    from elasticsearch_dsl import connections, Search, A, Q
    
    es = connections.create_connection(hosts=["localhost:9200"])
    s = Search(using=es, index="account")
    
    # 这次就用这种方法
    # range 要注意指定ranges参数和from to
    a1 = A("range", field="age", ranges={"from": 20, "to": 21})
    a2 = A("terms", field="gender")
    a3 = A("avg", field="balance")
    
    
    s.aggs.bucket("res", a1).bucket("gender_group", a2).metric("balance_avg", a3)
    
    # 执行并拿到返回值
    response = s.execute()
    
    # res是bucket指定的名字
    for i in response.aggregations.res:
        print(i.to_dict())
    s = s.query("match", age=41)
    
    """
    {'key': '20.0-21.0', 'from': 20.0, 'to': 21.0, 'doc_count': 44, 'gender_group': 
    {'doc_count_error_upper_bound': 0, 'sum_other_doc_count': 0, 'buckets': 
    [{'key': 'M', 'doc_count': 27, 'balance_avg': {'value': 29047.444444444445}},
     {'key': 'F', 'doc_count': 17, 'balance_avg': {'value': 25666.647058823528}}]}}
    """
    
    
    

    写一下总结
    由于account的数据是随机生成的,很多都没有共同的数据,本来想用一下multi_match,结果发现没有条件。
    列子中,用到了match, term, filter, range, bool,虽然有很多查询方式没用上,但是应该都差不多这样用。
    在使用过程中,我发现使用Q这个类构造查询方式实质上就和直接使用elasticsearchd-dsl一样(指的是格式上)。
    假如是数组,如:bool的must、terms,那么就要字段=[ ]
    假如是字典,如:range,那么就要字段={xxx: yyy, .... }
    假如是单值,如:term、match,那么就要字段=值
    假如查的是多个字段,如:multi_mathc,那么就要加上query="要查的值", fields=["字段1", "字段2", ...]
    然后各个条件的逻辑关系,可以通过多次query和filter或直接用Q("bool", must=[Q...], should=[Q...])再加上& | ~表示

    假如忘记了,该查询方式是否有数组、字典或直接就等于一个值的话
    可以看一看这篇文章中对应查询方式的记录:DSL高级检索(Query)

    其它方法

    排序

    要指定排序顺序,可以使用.order()方法。

    s = Search().sort(
        'category',
        '-title',
        {"lines" : {"order" : "asc", "mode" : "avg"}}
    )
    

    分页

    要指定from、size,使用slicing API:

    s = s[10:20]
    # {"from": 10, "size": 10}
    

    要访问匹配的所有文档,可以使用scan()函数,scan()函数使用scan、scroll elasticsearch API:

    for hit in s.scan():
        print(hit.title)
    

    需要注意的是这种情况下结果是不会被排序的。

    高亮

    要指定高亮的通用属性,可以使用highlight_options()方法:

    s = s.highlight_options(order='score')
    可以通过highlight()方法来为了每个单独的字段设置高亮:
    
    s = s.highlight('title')
    # or, including parameters:
    s = s.highlight('title', fragment_size=50)
    然后,响应中的分段将在每个结果对象上以.meta.highlight.FIELD形式提供,其中将包含分段列表:
    
    response = s.execute()
    for hit in response:
        for fragment in hit.meta.highlight.title:
            print(fragment)
    

    聚合

    你可以是使用A快捷方式来定义一个聚合。

    
    from elasticsearch_dsl import A
    
    A('terms', field='tags')
    # {"terms": {"field": "tags"}}
    

    为了实现聚合嵌套,你可以使用.bucket()、.metirc()以及.pipeline()方法。
    bucket ['bʌkɪt] 即为分组,其中第一个参数是分组的名字,自己指定即可,第二个参数是方法,第三个是指定的field。
    metric ['metrɪk]也是同样,metric的方法有sum、avg、max、min等等,但是需要指出的是有两个方法可以一次性返回这些值,stats和extended_stats,后者还可以返回方差等值。

    a = A('terms', field='category')
    # {'terms': {'field': 'category'}}
    
    a.metric('clicks_per_category', 'sum', field='clicks').bucket('tags_per_category', 'terms', field='tags')
    # {
    #   'terms': {'field': 'category'},
    #   'aggs': {
    #     'clicks_per_category': {'sum': {'field': 'clicks'}},
    #     'tags_per_category': {'terms': {'field': 'tags'}}
    #   }
    # }
    
    
    # 例1
    s.aggs.bucket("per_country", "terms", field="timestamp").metric("sum_click", "stats", field="click").metric("sum_request", "stats", field="request")
    
    # 例2
    s.aggs.bucket("per_age", "terms", field="click.keyword").metric("sum_click", "stats", field="click")
    
    # 例3
    s.aggs.metric("sum_age", "extended_stats", field="age")
    
    # 例4
    s.aggs.bucket("per_age", "terms", field="country.keyword")
    # 最后依然是要execute,此处注意s.aggs的操作不能用变量接收(如res=s.aggs的操作就是错误的),聚合的结果会在res中显示
    
    # 例5
    a = A("range", field="account_number", ranges=[{"to": 10}, {"from": 11, "to": 21}])
    
    res = s.execute()
    

    要向Search对象添加聚合,请使用.aggs属性作为顶级聚合:

    s = Search()
    a = A('terms', field='category')
    s.aggs.bucket('category_terms', a)
    # {
    #   'aggs': {
    #     'category_terms': {
    #       'terms': {
    #         'field': 'category'
    #       }
    #     }
    #   }
    # }
    

    要么

    s = Search()
    s.aggs.bucket('articles_per_day', 'date_histogram', field='publish_date', interval='day')
        .metric('clicks_per_day', 'sum', field='clicks')
        .pipeline('moving_click_average', 'moving_avg', buckets_path='clicks_per_day')
        .bucket('tags_per_day', 'terms', field='tags')
    
    s.to_dict()
    # {
    #   "aggs": {
    #     "articles_per_day": {
    #       "date_histogram": { "interval": "day", "field": "publish_date" },
    #       "aggs": {
    #         "clicks_per_day": { "sum": { "field": "clicks" } },
    #         "moving_click_average": { "moving_avg": { "buckets_path": "clicks_per_day" } },
    #         "tags_per_day": { "terms": { "field": "tags" } }
    #       }
    #     }
    #   }
    # }
    

    您可以按名称访问现有的key(类似字典的方式):

    s = Search()
    s.aggs.bucket('per_category', 'terms', field='category')
    s.aggs['per_category'].metric('clicks_per_category', 'sum', field='clicks')
    s.aggs['per_category'].bucket('tags_per_category', 'terms', field='tags')
    

    可以通过aggregations属性来访问聚合结果

    
    for tag in response.aggregations.per_tag.buckets:
        print(tag.key, tag.max_lines.value)
    	
    

    删除

    可以通过调用Search对象上的delete方法而不是execute来实现删除匹配查询的文档,如下:

    s = Search(index='i').query("match", title="python")
    response = s.delete()
    

    参考:
    elasticsearch-dsl配置、使用、查询
    查询操作实例
    包含用类的形式使用elasticsearch-dsl
    Elasticserch与Elasticsearch_dsl用法
    【Python】Elasticsearch和elasticsearch_dsl
    python3中的elasticsearch_dsl(例子)

    本文来自博客园,作者:忞翛,转载请注明原文链接:https://www.cnblogs.com/lczmx/p/14985538.html

  • 相关阅读:
    简练软考知识点整理-项目定义活动过程
    简练软考知识点整理-规划进度管理
    简练软考知识点整理-控制范围
    软考考前注意事项
    简练软考知识点整理-确认范围管理
    数据库之表关系
    数据库引擎
    数据库概念
    IO模型
    异步回调,线程队列,协程
  • 原文地址:https://www.cnblogs.com/lczmx/p/14985538.html
Copyright © 2011-2022 走看看