一.官网提供的Elasticsearch的Python接口包
1.github地址:https://github.com/elastic/elasticsearch-dsl-py
2.安装:pip install elasticsearch-dsl
3.有很多api,使用可参考github中的文档
二.定义写入es的Pipeline:
1.生成索引,type及映射:
有可能会报IllegalOperation异常,访问本地9200端口查看es版本,然后将python中的elasticsearch和elasticsearch-dsl改成相近版本即可
# _*_ encoding:utf-8 _*_ __author__ = 'LYQ' __date__ = '2018/10/29 11:02' #新版本把DocType改为Docment from datetime import datetime from elasticsearch_dsl import DocType,Date, Nested, Boolean, analyzer, Completion, Keyword, Text, Integer from elasticsearch_dsl.connections import connections # es连接到本地,可以连接到多台服务器 connections.create_connection(hosts=["localhost"]) class ArticleType(DocType): "定义es映射" # 以ik解析 title = Text(analyzer="ik_max_word") create_date = Date() # 不分析 url = Keyword() url_object_id = Keyword() front_image_url = Keyword() front_image_path = Keyword() praise_nums = Integer() fav_nums = Integer() comment_nums = Integer() tags = Text(analyzer="ik_max_word") content = Text(analyzer="ik_max_word") class Meta: #定义索引和type index = "jobbole" doc_type = "artitle" if __name__ == "__main__": #调用init()方法便能生成相应所应和映射 ArticleType.init()
2.创建相应item:
#导入定义的es映射 from models.es import ArticleType from w3lib.html import remove_tags class ElasticsearchPipeline(object): """ 数据写入elasticsearch,定义pipeline,记得配置进setting """ class ElasticsearchPipeline(object): """ 数据写入elasticsearch """ class ElasticsearchPipeline(object): """ 数据写入elasticsearch """ def process_item(self, item, spider): #将定义的elasticsearch映射实列化 articletype=ArticleType() articletype.title= item["title"] articletype.create_date = item["create_date"] articletype.url = item["url"] articletype.front_image_url = item["front_image_url"] if "front_image_path" in item: articletype.front_image_path = item["front_image_path"] articletype.praise_nums = item["praise_nums"] articletype.fav_nums = item["fav_nums"] articletype.comment_nums = item["comment_nums"] articletype.tags = item["tags"] articletype.content = remove_tags(item["content"]) articletype.meta.id = item["url_object_id"] articletype.save() return item
查看9100端口,数据插入成功
class JobboleArticleSpider(scrapy.Item): ...... def save_to_es(self): "在item中分别定义存入es,方便不同的字段的保存" articletype = ArticleType() articletype.title = self["title"] articletype.create_date = self["create_date"] articletype.url = self["url"] articletype.front_image_url = self["front_image_url"] if "front_image_path" in self: articletype.front_image_path = self["front_image_path"] articletype.praise_nums = self["praise_nums"] articletype.fav_nums = self["fav_nums"] articletype.comment_nums = self["comment_nums"] articletype.tags = self["tags"] articletype.content = remove_tags(self["content"]) articletype.meta.id = self["url_object_id"] articletype.save()
class ElasticsearchPipeline(object): """ 数据写入elasticsearch """ def process_item(self, item, spider): # 将定义的elasticsearch映射实列化 #调用item中的方法 item.save_to_es() return item
三.搜索建议:
实质调用anylyer接口如下:
GET _analyze { "analyzer": "ik_max_word", "text" : "Python网络基础学习" }
es文件中:
from elasticsearch_dsl.analysis import CustomAnalyzer as _CustomAnalyzer esc=connections.create_connection(ArticleType._doc_type.using) class Customanalyzer(_CustomAnalyzer): """自定义analyser""" def get_analysis_definition(self): # 重写该函数返回空字典 return {} ik_analyser = Customanalyzer("ik_max_word", filter=["lowercase"]) class ArticleType(DocType): "定义es映射" suggest = Completion(analyzer=ik_analyser) ......
生成该字段的信息
2.item文件:
...... from models.es import esc def get_suggest(index, info_tuple): """根据字符串和权重生成搜索建议数组""" used_words = set() suggests = [] for text, weight in info_tuple: if text: # 调用es得analyer接口分析字符串 # 返回解析后得分词数据 words = esc.indices.analyze(index=index, analyer="ik_max_word", params={"filter": ["lowercase"]}, body=text) # 生成式过滤掉长度为1的 anylyzed_words = set([r["token"] for r in words if len(r) > 1]) # 去重 new_words = anylyzed_words - used_words else: new_words = set() if new_words: suggests.append({"input": list(new_words), "weight": weight})
return suggests class JobboleArticleSpider(scrapy.Item): ...... def save_to_es(self): articletype = ArticleType() .......# 生成搜索建议字段,以及字符串和权重 articletype.suggest = get_suggest(ArticleType._doc_type.index,((articletype.title,1),(articletype.tags,7)) ) articletype.save()