zoukankan      html  css  js  c++  java
  • elasticsearch简介和elasticsearch_dsl

    elasticsearch

    es是基于lucene分片(shard)存储的近实时的分布式搜索引擎

    名词解释:
    Lucene:使用java语言编写的存储与查询框架,通过组织文档与文本关系信息进行倒排索引,内部形成多个segment段进行存储,是es的核心组件,但不具备分布式能力。

    segment:Lucene内部最小的存储单元,也是es的最小存储单元,多个小segment可合为一个较大的segment,并但不能拆分。

    shard:es为解决海量数据的处理能力,在Lucene之上设计了分片的概念,每个分片存储部分数据,分片可以设置多个副本,通过内部routing算法将数据路由到各个分片上,以支持分布式存储与查询。

    近实时:严格讲es并不是索引即可见的数据库,首先数据会被写入主分片所在机器的内存中,再触发flush操作,形成一个新的segment数据段,只有flush到磁盘的数据才会被异步拉取到其它副本节点,如果本次搜索命中副本节点且数据没有同步的话,那么是不会被检索到的;es默认flush间隔是1s,也可通过修改refresh_interval参数来调整间隔(为提升性能和体验,一版设置30s-60s)。

    分布式:es天生支持分布式,配置与使用上与单机版基本没什么区别,可快速扩张至上千台集群规模、支持PB级数据检索;通过内部路由算法将数据储存到不同节点的分片上;当用户发起一次查询时,首先会在各个分片上完成提前批处理(这个会在之后章节详细讲解),处理后的数据汇总到请求节点再做一次全局处理后返回。

    当然,也有人将es定义为开箱即用的NoSql文档数据库,这么说也没错,es借助其平滑扩展的能力实现了nosql数据库对海量数据增删改查的能力,目前市面上基于文档存储的nosql数据库有MongoDB、couchbase、es、orientdb等,几种数据库都有其各自的使用场景,其中es与MongoDB备受国内开发人员青睐,社区活跃度极高,尤其es

    ElasticSearch与数据库的对应关系(7.0之前)

    ESRDBS
    index database
    type table
    filed column

    通过Python库elasticsearch_dsl处理ElasticSearch

    二、Mapping&Setting

    众所周知完成一个索引库的创建需要配置mapping与setting两部分;

    mapping:

    常用数据类型:text、keyword、number、array、range、boolean、date、geo_point、ip、nested、object

     text:默认会进行分词,支持模糊查询(5.x之后版本string类型已废弃,请大家使用text)。
     keyword:不进行分词;keyword类型默认开启doc_values来加速聚合排序操作,占用了大量磁盘io 如非必须可以禁用doc_values。
     number:如果只有过滤场景 用不到range查询的话,使用keyword性能更佳,另外数字类型的doc_values比字符串更容易压缩。
     array:es不需要显示定义数组类型,只需要在插入数据时用'[]'表示即可,'[]'中的元素类型需保持一致。
     range:对数据的范围进行索引;目前支持 number range、date range 、ip range。
     boolean: 只接受true、false 也可以是字符串类型的“true”、“false”
     date:支持毫秒、根据指定的format解析对应的日期格式,内部以long类型存储。
     geo_point:存储经纬度数据对。
     ip:将ip数据存储在这种数据类型中,方便后期对ip字段的模糊与范围查询。
     nested:嵌套类型,一种特殊的object类型,存储object数组,可检索内部子项。
     object:嵌套类型,不支持数组。

    es7.0新增数据类型:alias、date_nanos、features、vector

    alias:并不实际存在,而是对已有字段的一种别名映射,搜索该字段与搜索实际字段返回的内容没有本质的区别。
    date_nanos:另一种时间类型,可精确到纳秒,用法类似date。
    features:用来存储特征向量,数据不能为0和负数,查询时只能使用rank_feature query,该字段主要为支持后续机器学习相关功能做准备。
    vector:存储特征数组,支持稀疏与稠密向量存储,该字段主要为支持后续机器学习相关功能做准备。

    doc_values:列式存储,为支持快速聚合与排序场景而设计,不在该类场景的可禁用

     "user_id": { 
              "type": "keyword",
              "doc_values": false
            } 

    index:控制字段索引方式

        analyzed:先分词再索引
        not_analyzed:不分词直接索引
        no:不被索引

    ignore_malformed:是否忽略脏数据

    ignore_malformed设置为true,如果遇到数据格式或类型错误数据将被忽略,其它字段会正常插入
    如果设置为false,一旦数据不符合要求整个文档将被拒绝。

    _source:不需要回显数据内容的可选择禁用该字段

      "_source": {
            "enabled": false
       } 
    需要注意的是禁用该项后将不能支持update和索引库的reindex操作,需谨慎。
    

    includes&excludes:_source字段黑白名单控制,可控制哪些字段在查询结果的source中出现

        "_source": {
            "includes": [
              "*.count",
              "meta.*"
            ],
            "excludes": [
              "meta.description",
              "meta.attributes.*"
            ]
          }

    dynamic:动态mapping,禁用后将不会自动创建field,但数据仍可以正常插入

    "dynamic":"false"

    _all:es6.x默认已禁用全文索引,es7.0彻底移除该配置

    "_all": {
           "enabled": false
         }

    norms:控制该字段是否参与相关度排名计算,如果该字段只做过滤用可禁用该项以提升搜索性能

    "categorys": {
          "type": "text",
          "norms": false
        }

    index_options:细粒度控制倒排索引方式

    docs:只索引文档id
    freqs:文档id和词频,词频可用于评分
    positions:增加位置信息,位置可用于模糊和短语查询
    offsets:增加偏移量,高亮时会用到偏移量信息

    setting:

        "index.max_result_window":20000   #控制查询返回的最大结果数
        "index.merge.scheduler.max_thread_count": 2   #segment段合并时可使用的最大线程数,为避免过度的io操作,该值一般不超过2
        "index.routing.allocation.total_shards_per_node":10    #分配到单个节点的最大分片数
        "index.refresh_interval":"-1"    #index刷新频率,频繁刷新会降低性能,一般设置为30s-60s;-1表示禁用刷新
        "index.translog.durability":"async"    #translog刷新方式,如果对数据安全性要求不算太高,可设置为async以提升性能
        "index.translog.flush_threshold_size":"1024mb"    #translog刷新字节条件,超过1g才会刷新
        "index.translog.sync_interval":"120s"    #translog刷新时间条件,超过120s才会刷新
        "index.unassigned.node_left.delayed_timeout":"1d"    #当有节点宕机后索引库多久触发副本balance操作
        "index.search.slowlog.threshold.query.info":"1s"    #超过1s的查询会被记录到慢查询日志里
        "index.store.type":"niofs"    #网络通信协议
        "index.number_of_replicas":0    #index分片的副本数
        "index.number_of_shards":8    #index分片数,需要注意的是es7.0默认索引分片数调整为1了
        "index.codec":"best_compression"    #index数据的压缩方式,best_compression压缩可节省4-8倍的存储空间
    下面列举一下elasticsearch的可配置项:
            1. 集群名称,默认为elasticsearch:
    cluster.name: elasticsearch
            2. 节点名称,es启动时会自动创建节点名称,但你也可进行配置:
    node.name: "Franz Kafka"
            3. 是否作为主节点,每个节点都可以被配置成为主节点,默认值为true:
    node.master: true
            4. 是否存储数据,即存储索引片段,默认值为true:
    node.data: true
            master和data同时配置会产生一些奇异的效果:
            1) 当master为false,而data为true时,会对该节点产生严重负荷;
            2) 当master为true,而data为false时,该节点作为一个协调者;
            3) 当master为false,data也为false时,该节点就变成了一个负载均衡器。
            你可以通过连接http://localhost:9200/_cluster/health或者http://localhost:9200/_cluster/nodes,或者使用插件http://github.com/lukas-vlcek/bigdesk或http://mobz.github.com/elasticsearch-head来查看集群状态。
            5. 每个节点都可以定义一些与之关联的通用属性,用于后期集群进行碎片分配时的过滤:
    node.rack: rack314
            6. 默认情况下,多个节点可以在同一个安装路径启动,如果你想让你的es只启动一个节点,可以进行如下设置:
    node.max_local_storage_nodes: 1
            7. 设置一个索引的碎片数量,默认值为5:
    index.number_of_shards: 5
            8. 设置一个索引可被复制的数量,默认值为1:
    index.number_of_replicas: 1
            当你想要禁用公布式时,你可以进行如下设置:
    index.number_of_shards: 1
    index.number_of_replicas: 0
            这两个属性的设置直接影响集群中索引和搜索操作的执行。假设你有足够的机器来持有碎片和复制品,那么可以按如下规则设置这两个值:
            1) 拥有更多的碎片可以提升索引执行能力,并允许通过机器分发一个大型的索引;
            2) 拥有更多的复制器能够提升搜索执行能力以及集群能力。
            对于一个索引来说,number_of_shards只能设置一次,而number_of_replicas可以使用索引更新设置API在任何时候被增加或者减少。
            ElasticSearch关注加载均衡、迁移、从节点聚集结果等等。可以尝试多种设计来完成这些功能。
            可以连接http://localhost:9200/A/_status来检测索引的状态。
            9. 配置文件所在的位置,即elasticsearch.yml和logging.yml所在的位置:
    path.conf: /path/to/conf
            10. 分配给当前节点的索引数据所在的位置:
    path.data: /path/to/data
            可以可选择的包含一个以上的位置,使得数据在文件级别跨越位置,这样在创建时就有更多的自由路径,如:
    path.data: /path/to/data1,/path/to/data2
            11. 临时文件位置:
    path.work: /path/to/work
            12. 日志文件所在位置:
    path.logs: /path/to/logs
            13. 插件安装位置:
    path.plugins: /path/to/plugins
            14. 插件托管位置,若列表中的某一个插件未安装,则节点无法启动:
    plugin.mandatory: mapper-attachments,lang-groovy
            15. JVM开始交换时,ElasticSearch表现并不好:你需要保障JVM不进行交换,可以将bootstrap.mlockall设置为true禁止交换:
    bootstrap.mlockall: true
            请确保ES_MIN_MEM和ES_MAX_MEM的值是一样的,并且能够为ElasticSearch分配足够的内在,并为系统操作保留足够的内存。
            16. 默认情况下,ElasticSearch使用0.0.0.0地址,并为http传输开启9200-9300端口,为节点到节点的通信开启9300-9400端口,也可以自行设置IP地址:
    network.bind_host: 192.168.0.1
            17. publish_host设置其他节点连接此节点的地址,如果不设置的话,则自动获取,publish_host的地址必须为真实地址:
    network.publish_host: 192.168.0.1
            18. bind_host和publish_host可以一起设置:
    network.host: 192.168.0.1
            19. 可以定制该节点与其他节点交互的端口:
    transport.tcp.port: 9300
            20. 节点间交互时,可以设置是否压缩,转为为不压缩:
    transport.tcp.compress: true
            21. 可以为Http传输监听定制端口:
    http.port: 9200
            22. 设置内容的最大长度:
    http.max_content_length: 100mb
            23. 禁止HTTP
    http.enabled: false
            24. 网关允许在所有集群重启后持有集群状态,集群状态的变更都会被保存下来,当第一次启用集群时,可以从网关中读取到状态,默认网关类型(也是推荐的)是local:
    gateway.type: local
            25. 允许在N个节点启动后恢复过程:
    gateway.recover_after_nodes: 1
            26. 设置初始化恢复过程的超时时间:
    gateway.recover_after_time: 5m
            27. 设置该集群中可存在的节点上限:
    gateway.expected_nodes: 2
            28. 设置一个节点的并发数量,有两种情况,一种是在初始复苏过程中:
    cluster.routing.allocation.node_initial_primaries_recoveries: 4
            另一种是在添加、删除节点及调整时:
    cluster.routing.allocation.node_concurrent_recoveries: 2
            29. 设置复苏时的吞吐量,默认情况下是无限的:
    indices.recovery.max_size_per_sec: 0
            30. 设置从对等节点恢复片段时打开的流的数量上限:
    indices.recovery.concurrent_streams: 5
            31. 设置一个集群中主节点的数量,当多于三个节点时,该值可在2-4之间:
    discovery.zen.minimum_master_nodes: 1
            32. 设置ping其他节点时的超时时间,网络比较慢时可将该值设大:
    discovery.zen.ping.timeout: 3s
    http://elasticsearch.org/guide/reference/modules/discovery/zen.html上有更多关于discovery的设置。
            33. 禁止当前节点发现多个集群节点,默认值为true:
    discovery.zen.ping.multicast.enabled: false
            34. 设置新节点被启动时能够发现的主节点列表:
    discovery.zen.ping.unicast.hosts: ["host1", "host2:port", "host3[portX-portY]"]
    
     
    
    //--------------------------------------------------------------------------------------------
    
     
    
    gateway类型,表示持久化数据存放位置,默认local,推荐的方式,此外还有NFS、HDFS、S3
    gateway.type : local
    #集群名称,区分集群的唯一名称
    cluster.name : 'TEST'
     
    #索引文件存放目录
    #path.data : '/var/elasticsearch/data'
    #日志文件存放目录
    #path.logs : '/var/elasticsearch/logs'
     
    #网络配置
    #network.tcp.keep_alive : true
    #network.tcp.send_buffer_size : 8192
    #network.tcp.receive_buffer_size : 8192
    #gateway.recover_after_nodes : 1
    #gateway.recover_after_time : 10s
    #gateway.expected_nodes : 2
     
    #自动发现相关配置
    #discovery.zen.fd.connect_on_network_disconnect : true
    #discovery.zen.initial_ping_timeout : 10s
    #discovery.zen.fd.ping_interval : 2s
    #discovery.zen.fd.ping_retries  : 10
     
    #索引snapshot时间只对当gateway设置为NFS时有效
    #index.gateway.snapshot_interval : 1s
    #刷新时间间隔
    #index.engine.robin.refresh_interval : -1
     
    #默认索引碎片数
    index.number_of_shards : 3
    #默认索引副本数
    index.number_of_replicas : 1
     
    #默认索引合并因子
    #index.merge.policy.merge_factor : 100
    #index.merge.policy.min_merge_docs : 1000
    #index.merge.policy.use_compound_file : true
    #indices.memory.index_buffer_size : 5%
     
    #Gateway相关配置
    # Gateway Settings
    #gateway:
    #  recover_after_nodes: 1
    #  recover_after_time: 5m
    #  expected_nodes: 2
    #提示:当集群期望节点达不到的时候,集群就会处于block,无法正常索引和查询,说明集群中某个节点未能正常启动,这正是我们期望的效果,block住,避免照成数据的不一致
     
    #强制所有内存锁定,不要没事搞个swap什么的来影响性能
    # Force all memory to be locked, forcing JVM to never swap
    #  (make sure to set MIN and MAX mem to the same value)
    #bootstrap:
    #  mlockall: true
     
     
    #当禁用multcast广播的时候,可以手动设置集群的节点ip
    # Unicast Discovery (disable multicast)
    #discovery:
    #  zen:
    #    multicast.enabled: false
    #    unicast.hosts: ["host1", "host2"]
    --------------------------------------------------------------------------------
    默认配置为:节点每隔1s同master发送1次心跳,超时时间为30s,测试次数为3次,超过3次,则认为该节点同master已经脱离了。以上为elasticsearch的默认配置。在实际生产环境中,每隔1s,太频繁了,会产生太多网络流量。我们可以在elasticsearch.yml如下修改。 
    
    discovery.zen.fd.ping_timeout: 120s  
    discovery.zen.fd.ping_retries: 6  
    discovery.zen.fd.ping_interval: 30s  
    扩展配置

    三、elasticsearch7.0有哪些重大改进

    1、彻底废弃多type支持,包括api层面,之前版本可在一个索引库下创建多个type。

    2、彻底废弃_all字段支持,为提升性能默认不再支持全文检索,即7.0之后版本进行该项配置会报错。

    3、新增应用程序主动监测功能,搭配对应的kibana版本,用户可监测应用服务的健康状态,并在出现问题后及时发出通知。

    4、取消query结果中hits count的支持(聚合查询除外),使得查询性能大幅提升(3x-7x faster)。这意味着,每次查询后将不能得到精确的结果集数量。

    5、新增intervals query ,用户可设置多字符串在文档中出现的先后顺序进行检索。

    6、新增script_core ,通过此操作用户可以精确控制返回结果的score分值。

    7、优化集群协调子系统,缩减配置项提升稳定性。

    8、新增 alias、date_nanos、features、vector等数据类型。

    9、7.0自带java环境,所以我们在安装es时不再需要单独下载和配置java_home。

    10、7.0将不会再有OOM的情况,JVM引入了新的circuit breaker(熔断)机制,当查询或聚合的数据量超出单机处理的最大内存限制时会被截断,并抛出异常(有点类似clickhouse)。

    11、丰富多彩的kibana功能。

    四、python api elasticsearch_dsl的使用

    import datetime
    from elasticsearch_dsl import Document, Date, Nested, InnerDoc, Keyword, Integer, Long
    from elasticsearch_dsl import Search
    from elasticsearch import Elasticsearch
    from elasticsearch_dsl.connections import create_connection
    
    
    class AppInfo(InnerDoc):
    id = Keyword()
    type = Keyword()
    rank = Keyword()


    class TemplateModel(Document):
    keyword_id = Integer()
    country_id = Integer()
    hint = Integer()
    keyword = Keyword()
    search_count = Integer()
    appstoreList = Nested(AppInfo)


    class KeywordSearch(TemplateModel):
    class Index:
    # name = "index_name" # pass
    settings = {
    'number_of_shards': 5, # 分片
    'number_of_replicas': 1, # 副本备份
    'max_result_window': 20000, # 默认查询数量
    'refresh_interval': "30s",
    # "translog": {"sync_interval": "15s", "durability": "async"}
    }


    class KeywordToApp(Document):
    keyword_id = Keyword()
    country_id = Keyword()
    id = Keyword()
    keyword = Keyword()
    hint = Integer()
    search_count = Integer()
    app_type = Integer()
    rank = Integer()
    de = Integer()

    class Index:
    # name = "index_name" # pass
    settings = {
    'number_of_shards': 5, # 分片
    'number_of_replicas': 1, # 副本备份
    'max_result_window': 20000, # 默认查询数量
    'refresh_interval': "30s",
    # "translog": {"sync_interval": "15s", "durability": "async"}
    }

    添加连接

    client = Elasticsearch('127.0.0.1:9190')  # 连接
    print(client) # <Elasticsearch([{'host': '127.0.0.1', 'port': 9190}])>

    client = create_connection(alias="alias_test", hosts=["127.0.0.1:9190"]) # 使用别名
    print(KeywordSearch._get_connection(using="alias_test", )) # <Elasticsearch([{'host': '127.0.0.1', 'port': 9190}])>
    # elasticsearch_dsl是高级模块 elasticsearch是低级模块
    # elasticsearch_dsl 基于 elasticsearch


    client = Elasticsearch('127.0.0.1', http_auth=('root', 'password'), timeout=3600)  # 使用密码认证

    查看index详情

        indexs = client.indices.get('*')  # 获取所有的index详情
        print(indexs)
        print(indexs.keys())  # 所有index的名字
        index = list(indexs.keys())[0]
        print(index)  # 获取第一个index的名字

    创建和删除index

        KeywordSearch._index._name = f'1_test'  # 赋值model中的index的名字
        print(KeywordSearch._index._name)
        
        # 创建空index 方法1 mapping根据定义filed去生成
        # print(KeywordSearch.init(using="alias_test", ))  # ,使用model迁移创建index以及定义的mapping
        
        # 创建空index  方法2  无mapping
        # KeywordSearch._get_connection(using="alias_test", ).index(index=f'{i}_test')
        
        # 删除index
        # print(KeywordSearch._get_connection(using="alias_test", ).indices.delete(index= f'1_test')) # 删除index

    写入更新数据

    更新保存数据1
    KeywordSearch(_id=1,
    keyword_id=1,
    country_id=1,
    appstoreList=[{"id": 1, "rank": 1}],
    create_time=datetime.datetime.now().__format__("%Y-%m-%d %H:%M:%S")
    ).save(using="alias_test")

    # 更新保存数据2
    KeywordSearch._get_connection(using="alias_test", ).update(
    index=f'1_test',
    id=1,
    body={"doc": {"keyword_id": 1,
    "country_id": 1,
    "appstoreList": [{"id": 2, "rank": 2}],
    "create_time": datetime.datetime.now().__format__("%Y-%m-%d %H:%M:%S")},
    "doc_as_upsert": True # 不存在便插入
    }) # 数据更新 "_version": 2

    查询数据

    class ElasticSearchUtil:

    @classmethod
    def InsertDocument(cls, using, index, body, id=None):
    '''
    插入一条数据body到指定的index,可指定Id,若不指定,会自动生成
    '''
    return using.index(index=index, body=body, id=id)

    @classmethod
    def bulkUpdate(cls, using, index, body):
    '''
    批量插入更新指定index、id对应的数据
    '''
    action = [{"_op_type": "update",
    "_index": index,
    "_type": "_doc",
    "_id": str(i.get("keyword_id")) + str(i.get("de")),
    "doc": i,
    "doc_as_upsert": True} for i in body]
    return helpers.bulk(using, action, index=index)

    @classmethod
    def deleteDocByQuery(cls, using, index, query):
    '''
    删除idnex下符合条件query的所有数据
    :return:
    '''
    return using.delete_by_query(index=index, body=query, conflicts="proceed", request_timeout=100)

    @classmethod
    def deleteDocByDeCount(cls, using, index, keyword_id, de_count):
    '''
    删除idnex下符合条件query的所有数据
    :return:
    '''
    query = {
    "query": {
    "bool": {
    "filter": [{
    "term": {"keyword_id": keyword_id},
    },
    {"range": {
    "de": {"gte": de_count}
    }
    }
    ]
    }
    }
    }

    return using.delete_by_query(index=index, body=query, conflicts="proceed", request_timeout=100)

    @classmethod
    def searchDoc(cls, using, index=None, query=None):
    '''
    查找index下所有符合条件的数据
    '''
    return using.search(index=index, body=query, request_timeout=300)

    @classmethod
    def getDocById(cls, using, index, id):
    '''
    获取指定index、id对应的数据
    '''
    return using.get(index=index, id=id)

    @classmethod
    def updateDocById(cls, using, index, id, body=None):
    '''
    更新id所对应的数据
    '''
    return using.update(index=index, id=id, body=body)

    @classmethod
    def updateDocByQuery(cls, using, index, query):
    '''
    批量更新 符合该条件的批量更改hint字段
    query:
    '''
    return using.update_by_query(index=index, body=query, request_timeout=60)

    @classmethod
    def insertBulk(cls, using, index, body=None):
    '''
    批量插入doc
    '''
    return using.bulk(index=index, body=body, request_timeout=60)
    if __name__ == '__main__':
        # 批量删除上一次的数据
        query = {'query': {'match': {'keyword_id': item.get("keyword_id")}}}
        ElasticSearchUtil.deleteDocByQuery(KeywordToApp._get_connection(using="search_1"),
                                           KeywordToApp._index._name,
                                           query=query)
        # 批量更新新的数据
        if key_app:
            ElasticSearchUtil.insertBulk(KeywordToApp._get_connection(using="search_2"),
                                         KeywordToApp._index._name,
                                         key_app
                                         )

    五、遇到的问题

    1.默认node最大分片数是1000

    如果单个node想要创建更多的index和分片

    {"Content-Type":"application/json"}
    PUT
    39.105.220.74:9190/_cluster/settings
    {
      "transient": {
        "cluster": {
          "max_shards_per_node": 10000
        }
      }
    }

     2.解决丢失分片。又yellow变gree

    POST
    http://180.76.153.235:9190/_cluster/reroute?retry_failed=true
    {"Content-Type":"application/json"}

      

  • 相关阅读:
    App.js和App.css(用于移动应用的js和css)
    cookie和session使用
    html实现返回上一页的几种方法(javaScript:history.go(-1);)
    sublime找到成对标签(Ctrl+Shift+")
    Java NIO框架Netty课程(一) – Hello Netty
    信息增益的特征选择方法
    Java线程学习笔记(两) 线程异常处理
    薏米红豆粥的功效和实践演示
    文件翻译002片:Process Monitor帮助文档(Part 2)
    Spring MVC 3 深入总结
  • 原文地址:https://www.cnblogs.com/clbao/p/11989121.html
Copyright © 2011-2022 走看看