zoukankan      html  css  js  c++  java
  • elasticsearch reIndex

    elasticsearch 基础 —— ReIndex
    Reindex会将一个索引的数据复制到另一个已存在的索引,但是并不会复制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。
    
    一、reindex的常用操作
    1、reindex基础实现
        _reindex会将一个索引的快照数据copy到另一个索引,默认情况下存在相同的_id会进行覆盖(一般不会发生,除非是将两个索引的数据copy到一个索引中),可以使用以下命令将索引快照进行copy:
    
    POST _reindex
    {
      "source": {
        "index": "my_index_name"
      },
      "dest": {
        "index": "my_index_name_new"
      }
    } 
    2、version_type(冲突的解决)
        version_type属性默认值为internal,即当发生冲突后会覆盖之前的document,而当设置为external则会新生成一个另外的document,设置方式如下:
    
    POST _reindex
    {
      "source": {
        "index": "my_index_name"
      },
      "dest": {
        "index": "my_index_name_new",
        "version_type": "external"
      }
    }
    3、op_type和conflicts
      将op_type设置为create时,只会对发生不同的document进行reindex,(若定时机制的reindex则可以使用该方式只对最新的不存在的document进行reindex)。并且可以将conflicts属性设置为proceed,将冲突进行类似于continue的操作,设置方式如下:
    
    POST _reindex
    {
      "conflicts": "proceed",
      "source": {
        "index": "my_index_name"
      },
      "dest": {
        "index": "my_index_name_new",
        "op_type": "create"
      }
    }
    4、query的reindex
      对满足query条件的数据进行reindex操作,查询方式如下:
    
    POST _reindex
    {
      "source": {
        "index": "my_index_name",
        "type": "my_type_name",
        "query": {  // query的条件
          "term": {
            "user": "user_value"
          }
        }
      },
      "dest": {
        "index": "my_index_name_new"
      }
    } 
    5、多Index、Type数据的reindex
      可以将多个索引或类型的数据reindex到一个新的索引中,当然还可以使用query查询条件只对其中满足条件的部分数据进行reindx,若不设置冲突则还是默认会进行覆盖,只是不能保证相同ID的数据那个索引的数据会被先索引而被覆盖,设置方式如下:
    
    POST _reindex
    {
        "source": {
            "index": [
                "index_name_1",
                "index_name_1"
            ],
            "type": [
                "type_name_1",
                "type_name_2"
            ],
            "query": {//query的条件
                "term": {
                    "user": "kimchy"
                }
            }
        },
        "dest": {
            "index": "all_together_index_name"
        }
    }
    6、size、sort(reindex的条数和排序控制)
    POST _reindex
    {
      "size": 10000,   // 值reindex按照sort排序后的size条数据
      "source": {
        "index": "my_index_name",
        "sort": { "date": "desc" } 
      },
      "dest": {
        "index": "my_index_name_new"
      }
    }
    7、source条件的reindex
      满足_source中包含数组field(字段)的数据才会被reindex,设置方式如下:
    
    POST _reindex
    {
      "source": {
        "index": "my_index_name",
        "_source": ["field_name_1", "field_name_2"]
      },
      "dest": {
        "index": "my_index_name_new"
      }
    }
    8、script类型的reindex
      与_update_by_query相同的是reindex也可以使用script,但是不同的是reindex可以修改源索引的数据信息,比如:
    
    POST _reindex
    {
      "source": {
        "index": "my_index_name"
      },
      "dest": {
        "index": "my_index_name_new",
        "version_type": "external"
      },
      "script": {
        "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
        "lang": "painless"
      }
    }
      修改字段名称
    
      以下会在copy后将新索引中的flag字段名称修改为tag:
    
    POST _reindex
    {
      "source": {
        "index": "test"
      },
      "dest": {
        "index": "test2"
      },
      "script": {
        "source": "ctx._source.tag = ctx._source.remove("flag")"
      }
    }
      ctx.op 
    
        ctx.op只能等于noop或delete,等于其他值将报错,并且设置ctx的其他字段也会报错。设置为noop后不会有任何操作发生,设置为delete后会从目标索引中删除满足条件的数据,并且都会在response body中返回总条数。
    
    谨慎操作字段
    
      _id
    
      _type
    
      _index
    
      _version
    
      _routing
    
      _parent
    
      这些字段都可以在reindex的操作中自行定义,但是需要谨慎操作。
    
      _version 字段可以设置为null或者在ctx的map中清除该字段,则reindex时效果都是没有copy其值,会引起数据的覆盖。
    
      而routing值可以设置为以下值:
    
            keep 默认值,会copy对应的路由值到新的index中。
    
            discard 将值设置为null
    
            =<some text> 将值设置为指定值,设置方式如下:
    
    POST _reindex
    {
        "source": {
            "index": "source",
            "query": {//设置查询条件
                "match": {
                    "company": "cat"
                },
                "size": 100//满足条件的100条
            }
        },
        "dest": {
            "index": "dest",
            "routing": "=cat"//reindex到新的索引中使用该路由值
        }
    }
    二、远程reindex
      可以将远程(其他集群)的数据reindex到当前的集群环境中,但是需要设置当前集群的elsticsearch.yml配置中设置远程白名单列表,配置reindex.remote.whitelist属性,如otherhost:9200, another:9200, 127.0.10.*:9200, localhost:* 。只要环境可访问,则可以在任何版本之间对数据进行reindex,那么这也是版本es升级的数据迁移不错的选择。为了使发送到旧版本的弹性搜索的查询,查询参数被直接发送到远程主机,而不需要进行验证或修改。
    
        但是manual 和 automatic slicing.不能使用远程reindex,设置方式如下:
    
    POST _reindex
    {
      "source": {
        "remote": {
          "host": "http://otherhost:9200", // 远程es的ip和port列表
          "socket_timeout": "1m",
          "connect_timeout": "10s"  // 超时时间设置
        },
        "index": "my_index_name", // 源索引名称
        "query": {         // 满足条件的数据
          "match": {
            "test": "data"
          }
        }
      },
      "dest": {
        "index": "dest_index_name"  // 目标索引名称
      }
    }
    三、URL Parameters(reindex参数设置)
        Url可选参数有pretty,refresh, wait_for_completion, wait_for_active_shards, timeout, requests_per_second.
    
      1、refresh
            Index API的refresh只会让接收新数据的碎片被刷新,而reindex的refresh则会刷新所有索引。
    
      2、wait_for_completion
       将参数设置为false则会执行一些预执行检查,启动请求,然后返回一个任务,该任务可以用于任务api来取消或获得任务的状态。Es会在.tasks/task/${taskId}中创建记录ID。
    
      3、wait_for_active_shards
            在Bulk API的情况下,requests_per_second可以设置在继续索引之前,控制多少个碎片的拷贝数必须是活跃的。而timeout 超时控制每个写请求等待不可用的碎片等待的时间。
    
      4、requests_per_second
       每秒的请求数据,显然是节流控制参数,运行设置一个正整数,设置为-1表示不进行控制。
    
     
    
    四、返回参数说明
    {
      "took" : 639,    // 执行全过程使用的毫秒数
      "updated": 0,    // 成功修改的条数
      "created": 123,  // 成功创建的条数
      "batches": 1,    // 批处理的个数
      "version_conflicts": 2, // 版本冲突个数
      "retries": {     // 重试机制
        "bulk": 0,     // 重试的批个数
        "search": 0    // 重试的查询个数
      }
      "throttled_millis": 0, // 由于设置requests_per_second参数而sleep的毫秒数
      "failures" : [ ]  // 失败的数据
    }
    五 、Task API 操作
      1、使用Task API查看reindex的情况
    GET _tasks?detailed=true&actions=*reindex
    状态如下:可知道当前的taskId = 9620804
    
    2、使用TaskId查看执行的状态 
    {
      "nodes": {
        "_b5PSdInTVWaji9TUrWANg": {
          "name": "node-2",
          "transport_address": "192.168.10.15:9300",
          "host": "192.168.10.15",
          "ip": "192.168.10.15:9300",
          "roles": [
            "master",
            "data",
            "ingest"
          ],
          "attributes": {
            "ml.max_open_jobs": "10",
            "ml.enabled": "true"
          },
          "tasks": {
            "_b5PSdInTVWaji9TUrWANg:9620804": {
              "node": "_b5PSdInTVWaji9TUrWANg",
              "id": 9620804,
              "type": "transport",
              "action": "indices:data/write/reindex",
              "status": {
                "total": 216361,
                "updated": 0,
                "created": 30000,
                "deleted": 0,
                "batches": 31,
                "version_conflicts": 0,
                "noops": 0,
                "retries": {
                  "bulk": 0,
                  "search": 0
                },
                "throttled_millis": 0,
                "requests_per_second": -1,
                "throttled_until_millis": 0
              },
              "description": "reindex from [geleevr] to [geleevr_new]",
              "start_time_in_millis": 1511316869170,
              "running_time_in_nanos": 12077416434,
              "cancellable": true
            }
          }
        }
      }
    }
    GET /_tasks/taskId:9620804
    可以查看total,updated,created,deleted等状态
    
    3、使用Cancel Task API取消正在执行的reindex操作
       取消操作可能需要几秒钟的时间,取消方式如下:
    
    POST _tasks/task_id:9620804/_cancel
    4、使用Task API 重置reindex的节流限制
    POST _reindex/task_id:9620804/_rethrottle?requests_per_second=-1
     六、并行化执行reindex操作
    1、手动并行化
    如下是两个slices的手动并行化reindex:
    
    POST _reindex
    {
      "source": {
        "index": "my_index_name",
        "slice": {   // 第一slice执行操作
          "id": 0,
          "max": 2
        }
      },
      "dest": {
        "index": "my_index_name_new"
      }
    }
    POST _reindex
    {
      "source": {
        "index": "my_index_name",
        "slice": {   // 第二slice执行操作
          "id": 1,
          "max": 2
        }
      },
      "dest": {
        "index": "my_index_name_new"
      }
    }
    可以通过以下命令查看执行的结果:
    
    GET _refresh
    POST my_index_name/_search?size=0&filter_path=hits.total
    结果如下:
    
    {
      "hits": {
        "total": 120
      }
    }
    2、自动并行化
    如下是自动划分的5个slices,只是将需要手动划分的过程自动化处理,将一个操作拆分为多个子操作并行化处理,其他查询方式等都一样,如下:
    
    POST _reindex?slices=5&refresh
    {
      "source": {
        "index": "my_index_name"
      },
      "dest": {
        "index": "my_index_name_new"
      }
    } 
    3、并行化处理的特性
            同样可以使用Task API查看每个slices的子请求(child)的task状态;
    
            获取每个slices请求的任务状态,只返回已完成的状态;
    
            这些子请求单独可寻址,比如取消操作和重新配置节流操作;
    
            对每个slices进行重新配置节流时,会将所有未完成的操作进行比例分配;
    
            对每个slices进行取消操作其他所有slices都会生效;
    
            每个请求只拥有全部数据的部分,并且每个文档的大小会不同,大文件基本分配均匀;
    
            并行化处理是使用requests_per_second 或size等,可能或导致分布不均匀;
    
            每个子请求可能获取到不同版本或快照的源索引数据。
    
    4、slices数量设置要求
            数量不能过大,比如500可能出现CPU问题;
    
            查询性能角度看,设置slices为源索引的分片的倍数是比较合适的,一倍是最有效的;
    
            索引性能角度看,应该随着可用资源的数量线性地扩展;
    
            然而索引或查询性能是否在此过程中占据主导,取决于许多因素,比如重新索引的文档和重新索引的集群。
    
     
    
    七、使用索引名称,reindex每天的数据
        如存在如下数据:
    
    PUT metricbeat-2016.05.30/beat/1?refresh
    {"system.cpu.idle.pct": 0.908}
     
    PUT metricbeat-2016.05.31/beat/1?refresh
    {"system.cpu.idle.pct": 0.105}
      可以执行如下reindex脚本:
    
    POST _reindex
    {
      "source": {
        "index": "metricbeat-*"
      },
      "dest": {
        "index": "metricbeat"
      },
      "script": {
        "lang": "painless",
        "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"
      }
    }
        再使用以下命令进行查看:
    
    GET metricbeat-2016.05.30-1/_doc/1
    GET metricbeat-2016.05.31-1/_doc/1
    八、随机对源索引的一个子集合进行reindex
      下面是源索引的一个子集合进行索引的例子,说明:默认会按照_doc进行排序,而score不会起到任何的作用,除非如下提别对score排序进行指定,如下:
    
    POST _reindex
    {
      "size": 10,
      "source": {
        "index": "my_index_name",
        "query": {
          "function_score" : {
            "query" : { "match_all": {} },
            "random_score" : {}
          }
        },
        "sort": "_score"    
      },
      "dest": {
        "index": "random_my_index_name"
      }
    }
    九、reindex在项目中的使用
         以上是对reindex的基本概念和特性的学习,自己在项目中的使用场景:第一是在集群的es版本升级的情况下
    
    1、es集群版本升级的数据迁移 或 将现有生产数据copy的dev等集群环境
            该部分可以在kibana的dev Tools下面直接使用remote reindex的脚本即可,但是需要设置当前集群的elsticsearch.yml配置中设置远程白名单列表,配reindex.remote.whitelist属性,如otherhost:9200, another:9200, 127.0.10.*:9200, localhost:* 。但是千万注意remote reindex不能使用并行化处理,即不能使用slices参数,这一点官方文档上没有明确指出,但是在使用的时候会报错,去掉即可。
    
    POST _reindex?refresh
    {
      "source": {
        "remote": {
          "host": "http://192.168.10.20:9200", 
          "socket_timeout": "1m",
          "connect_timeout": "10s"  
        },
        "index": "source_index_name",
        "query": {         
          "match_all": {}
        }
      },
      "dest": {
        "index": "destination_index_name",
        "version_type": "external",
        "op_type": "create"
      }
    }
    
    ---------------------------------------------------------------------------------------------------------------------------
    干货 | Elasticsearch Reindex性能提升10倍+实战
    1、reindex的速率极慢,是否有办法改善?
    以下问题来自社区:https://elasticsearch.cn/question/3782
    
    问题1:reindex和snapshot的速率极慢,是否有办法改善?
    reindex和snapshot的速率比用filebeat或者kafka到es的写入速率慢好几个数量级(集群写入性能不存在瓶颈),reindex/snapshot的时候CPU还是IO使用率都很低,是不是集群受什么参数限制了reindex和snapshot的速率?
    reindex不管是跨集群还是同集群上都很慢,大约3~5M/s的索引速率,会是什么原因导致的?
    
    问题2:数据量几十个G的场景下,elasticsearch reindex速度太慢,从旧索引导数据到新索引,当前最佳方案是什么?
    2、Reindex简介
    5.X版本后新增Reindex。Reindex可以直接在Elasticsearch集群里面对数据进行重建,如果你的mapping因为修改而需要重建,又或者索引设置修改需要重建的时候,借助Reindex可以很方便的异步进行重建,并且支持跨集群间的数据迁移。比如按天创建的索引可以定期重建合并到以月为单位的索引里面去。当然索引里面要启用_source。
    
    POST _reindex
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    
    3、原因分析
    reindex的核心做跨索引、跨集群的数据迁移。
    慢的原因及优化思路无非包括:
    
    1)批量大小值可能太小。
    需要结合堆内存、线程池调整大小;
    2)reindex的底层是scroll实现,借助scroll并行优化方式,提升效率;
    3)跨索引、跨集群的核心是写入数据,考虑写入优化角度提升效率。
    4、Reindex提升迁移效率的方案
    4.1 提升批量写入大小值
    默认情况下,_reindex使用1000进行批量操作,您可以在source中调整batch_size。
    
    POST _reindex
    {
      "source": {
        "index": "source",
        "size": 5000
      },
      "dest": {
        "index": "dest",
        "routing": "=cat"
      }
    }
    
    批量大小设置的依据:
    
    (1)使用批量索引请求以获得最佳性能。
    批量大小取决于数据、分析和集群配置,但一个好的起点是每批处理5-15 MB。
    注意,这是物理大小。文档数量不是度量批量大小的好指标。例如,如果每批索引1000个文档,:
    1)每个1kb的1000个文档是1mb。
    2)每个100kb的1000个文档是100 MB。
    这些是完全不同的体积大小。
    (2)逐步递增文档容量大小的方式调优。
    1)从大约5-15 MB的大容量开始,慢慢增加,直到你看不到性能的提升。然后开始增加批量写入的并发性(多线程等等)。
    2)使用kibana、cerebro或iostat、top和ps等工具监视节点,以查看资源何时开始出现瓶颈。如果您开始接收EsRejectedExecutionException,您的集群就不能再跟上了:至少有一个资源达到了容量。要么减少并发性,或者提供更多有限的资源(例如从机械硬盘切换到ssd固态硬盘),要么添加更多节点。
    4.2 借助scroll的sliced提升写入效率
    Reindex支持Sliced Scroll以并行化重建索引过程。 这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。
    
    sliced原理(from medcl)
    1)用过Scroll接口吧,很慢?如果你数据量很大,用Scroll遍历数据那确实是接受不了,现在Scroll接口可以并发来进行数据遍历了。
    2)每个Scroll请求,可以分成多个Slice请求,可以理解为切片,各Slice独立并行,利用Scroll重建或者遍历要快很多倍。
    
    slicing使用举例
    slicing的设定分为两种方式:手动设置分片、自动设置分片。
    手动设置分片参见官网。
    自动设置分片如下:
    
    POST _reindex?slices=5&refresh
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    
    slices大小设置注意事项:
    1)slices大小的设置可以手动指定,或者设置slices设置为auto,auto的含义是:针对单索引,slices大小=分片数;针对多索引,slices=分片的最小值。
    2)当slices的数量等于索引中的分片数量时,查询性能最高效。slices大小大于分片数,非但不会提升效率,反而会增加开销。
    3)如果这个slices数字很大(例如500),建议选择一个较低的数字,因为过大的slices 会影响性能。
    
    4.3 ES副本数设置为0
    如果要进行大量批量导入,请考虑通过设置index.number_of_replicas来禁用副本:0。
    主要原因在于:复制文档时,将整个文档发送到副本节点,并逐字重复索引过程。 这意味着每个副本都将执行分析,索引和潜在合并过程。
    相反,如果您使用零副本进行索引,然后在提取完成时启用副本,则恢复过程本质上是逐字节的网络传输。 这比复制索引过程更有效。
    
    PUT /my_logs/_settings
    {
        "number_of_replicas": 1
    }
    
    4.4 增加refresh间隔
    如果你的搜索结果不需要接近实时的准确性,考虑先不要急于索引刷新refresh。可以将每个索引的refresh_interval到30s。
    如果正在进行大量数据导入,可以通过在导入期间将此值设置为-1来禁用刷新。完成后不要忘记重新启用它!
    设置方法:
    
    PUT /my_logs/_settings
    { "refresh_interval": -1 }
    
    5、小结
    实践证明,比默认设置reindex速度能提升10倍+。
    遇到类似问题,多从官网、原理甚至源码的角度思考,逐步拆解分析。
    只要思维不滑坡,办法总比问题多!
    
    参考:
    [1] Jest Reindex参考:http://t.cn/RDOyIc8
    [2] 官网性能优化:http://t.cn/RDOyJqr
    [3] 论坛讨论:http://t.cn/RDOya3a
  • 相关阅读:
    Leetcode题库——7.反转整数
    (tomcat)tomcat启动过慢
    (tomcat)查看tomcat安装路径
    (JDK)cmd中只能执行java不能执行javac命令
    (课)学习进度报告二
    (数据导入)csv文件数据导入数据库
    (编码转换)转换文件编码
    (python开发)用cmd下载Python的第三方库所遇问题及解决方法
    (课)学习进度报告一
    (课)淘宝网质量属性场景
  • 原文地址:https://www.cnblogs.com/buffercache/p/12703335.html
Copyright © 2011-2022 走看看