Reference: http://bigg.top/2015/11/29/elasticsearch%E7%9A%84python%E5%A2%9E%E5%88%A0%E6%9F%A5%E6%94%B9%E5%AE%9E%E4%BE%8B%E5%88%86%E6%9E%90/
ES的部署请查看相关文档,我这里就不在赘叙。提醒,官方建议ES的在60G
以上内存的环境下运行,如果你的服务器的内存是16G
,建议至少需要4台机器。
- ES连接到服务器比较容易,如下:
import elasticsearch
class ES(object): @classmethod def connect_host(cls): hosts=[{"host": "xx.xxx.x.xx"}, {"host": "xx.xxx.x.xx"}, {"host": "xx.xxx.x.xx"}, {"host": "xx.xxx.x.xx"},] es = elasticsearch.Elasticsearch( hosts, sniff_on_start=True, sniff_on_connection_fail=True, sniffer_timeout=600 ) return es
|
查询操作
- 通过对RESTAPI的改造,可以很容易实现查询功能。如下,实现了对一个domain相关doc的查询,筛选条件包括起止时间,数据排列顺序和限制查询数据的个数。
def es_query(domain="", start=None, end=None, reverse=False, limit_cnt=20, category=0): es = ES.connect_host() now = datetime.datetime.now() if reverse: order = "desc" else: order = "asc" if not start: start = now - datetime.timedelta(weeks=2000) if not end: end = now range_body = { "range": { "time": { "gte": start, "lte": end } } } and_list = [range_body] domain_body = { "term": { "domain": domain } } category_body = { "term": { "category": category } } if domain: and_list.append(domain_body) if category: and_list.append(category_body) q_body = { "size": limit_cnt, "sort": [ { "time": { "order": order } } ], "query": { "filtered": { "query": {"matchAll": {}}, "filter": { "and": and_list } } } } res = es.search(body=q_body) ret = [] for hit in res["hits"]["hits"]: value = {} src = hit["_source"] if src: try: the_time = src["time"] if len(the_time) < 20: value["time"] = datetime.datetime.strptime(the_time, "%Y-%m-%dT%H:%M:%S") else: value["time"] = datetime.datetime.strptime(the_time, "%Y-%m-%dT%H:%M:%S.%f") ret.append(value) except Exception as e: print str(e) ret = [] print "Query xxxxx data failed!" return ret
|
- 其中,
reverse
表示数据排列的顺序,linit_cnt
表示限制数量。其中涉及range
,sort
,size
,filter
,and
等来执行es.search
操作。最后一个for
循环是一个取数据的过程。
- 在实际应用过程中,对于一个复杂的查询,第一次操作失败率很高,如果查询结果有几千个,第一次的
query
查询到的success
个数通常只有1/3
左右。当然,当你用该查询条件再次查询时,可以瞬间得到完全成功的结果,所以在你对查询成功个数要求比较高的情况下,建议多次发起请求,这样可以得到比较完整的结果。
删除操作
- ES的查询分为按
index
删除和按doc
删除。按index
查询相对比较容易理解,即删除该索引下的所有数据,删除之后该索引就不存在了。但是有时我们会碰到一些按照doc
的情况,即按照一定的query
条件查询到相关的doc,然后删除相关的所有记录。ES官方不推荐进行这种操作,而且还有一定的失败率。如果一定需要这方面的功能,证明你的数据不适合用ES进行存储。
- 由于我当时对ES的认识不够,把大量的数据存储在了ES,因此对
doc
的删除操作需求比较大,写了一个删除操作功能(仅供参考,不建议使用,如果需要删除,建议存储数据之前设计好数据结构,方便以index
为单位删除)
def es_delete(domain, m_type="xxxx"): m_data = { "query": { "query_string": { "query": "domain: %s AND type: %s" % (domain, m_type) } } } data = json.dumps(m_data) request = urllib2.Request(QUERY_URI, data) request.get_method = lambda: "DELETE" urllib2.urlopen(request) print "Deleted the data!"
|
更新操作
- ES不适合对大量的数据(doc)进行修改,与删除一样,这是官方极度不推荐的。当然,按照一定的查询条件更新某些
doc
也是可以实现的。如果你和我一样,遇到了比较极端的情形或是一个强迫症患者。请组合以上两个操作,写一个比较复杂的query
执行删除操作,然后把新的数据(doc)插入到对应的索引和类型中。
插入操作
- 插入操作是ES的最基本操作,ES提供了最基本的插入功能,ES入库时需要批量的插入操作。举个简单的插入操作例子:
es = ES.connect_host() es.index(index=data_index, doc_type="xxxx", body=data, request_timeout=10000)
|
- 其中,
index
表示索引,doc_type
表示数据类型,body
表示具体的doc
数据,最后一个参数表示超时时间。如果是日志文件或其它记录内容,建议index
设置为时间或时间的组合体,如log_2015_11_29
。数据类型即当前索引下数据的分类名称,可以把当前的数据按照不同的类型分类,同时也方便了查询,查询时可以很方便的过滤需要的类型。
相关参考
之前时间比较闲,翻译了部分与Python相关的ElasticSearch文档,如有疑问,欢迎回复评论,相互讨论学习。