应用背景:
1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。
2、当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;但是在ES中,一个字段的mapping在定义并且导入数据之后是不能再修改的,
所以这种情况下也可以考虑尝试使用Reindex。
Reindex:
ES提供了_reindex这个API。相对于我们重新导入数据肯定会快不少,实测速度大概是bulk导入数据的5-10倍。
数据迁移步骤:
1、创建新的索引(可以通过java程序也可以直接在head插件上创建)
注意:在创建索引的时候要把表结构也要创建好(也就是mapping)
2、复制数据
最简单、基本的方式:
1)代码请求:
POST _reindex
{
"source": {
"index": "old_index"
},
"dest": {
"index": "new_index"
}
}
2)利用命令:
curl _XPOST 'ES数据库请求地址:9200/_reindex' -d{"source":{"index":"old_index"},"dest":{"index":"new_index"}}
但如果新的index中有数据,并且可能发生冲突,那么可以设置version_type"version_type": "internal"
或者不设置,则Elasticsearch强制性的将文档转储到目标中,覆盖具有相同类型和ID的任何内容:
POST _reindex
{
"source": {
"index": "old_index"
},
"dest": {
"index": "new_index",
"version_type": "internal"
}
}
设置op_type为create将导致_reindex仅在目标索引中创建缺少的文档。所有存在的文档将导致版本POST _reindex
{
“source”: {
“index”: “twitter”
},
“dest”: {
“index”: “new_twitter”,
“op_type”: “create”
}
}
版本冲突将中止_reindex进程,但您可以通过请求体设置”conflict”:”proceed”来在冲突时进行计数:
POST _reindex
{
"conflicts": "proceed",
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"op_type": "create"
}
}
还可以通过向source添加type或添加query来限制文档
POST _reindex
{
"conflicts": "proceed",
"source": {
"index": "old_ijndex"
},
"dest": {
"index": "new_index",
"version_type": "internal",
"op_type": "create"
}
}
增加查询
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"match_all": {}
}
},
"dest": {
"index": "'${indexName}'"
}
}'
数据量大、无删除操作、有更新时间。时间范围内
curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"range" : {
"'${timeField}'" : {
"gte" : '${lastTimestamp}',
"lt" : '${curTimestamp}'
}
}
}
},
"dest": {
"index": "'${indexName}'"
}
}'
es reindex script改字段内容
reindex的时候,改变字段的内容
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"version_type": "external"
},
"script": {
"source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
"lang": "painless"
}
}
数据迁移效率
问题发现:
常规的如果我们只是进行少量的数据迁移利用普通的reindex就可以很好的达到要求,但是当我们发现我们需要迁移的数据量过大时,我们会发现reindex的速度会变得很慢
数据量几十个G的场景下,elasticsearch reindex速度太慢,从旧索引导数据到新索引,当前最佳方案是什么?
原因分析:
reindex的核心做跨索引、跨集群的数据迁移。
慢的原因及优化思路无非包括:
1)批量大小值可能太小。需要结合堆内存、线程池调整大小;
2)reindex的底层是scroll实现,借助scroll并行优化方式,提升效率;
3)跨索引、跨集群的核心是写入数据,考虑写入优化角度提升效率。
可行方案:
1)提升批量写入大小值
默认情况下,_reindex使用1000进行批量操作,您可以在source中调整batch_size。
POST _reindex
{
"source": {
"index": "source",
"size": 5000
},
"dest": {
"index": "dest",
"routing": "=cat"
}
}
批量大小设置的依据:
1、使用批量索引请求以获得最佳性能。
批量大小取决于数据、分析和集群配置,但一个好的起点是每批处理5-15 MB。
注意,这是物理大小。文档数量不是度量批量大小的好指标。例如,如果每批索引1000个文档:
1)每个1kb的1000个文档是1mb。
2)每个100kb的1000个文档是100 MB。
这些是完全不同的体积大小。
2、逐步递增文档容量大小的方式调优。
1)从大约5-15 MB的大容量开始,慢慢增加,直到你看不到性能的提升。然后开始增加批量写入的并发性(多线程等等)。
2)使用kibana、cerebro或iostat、top和ps等工具监视节点,以查看资源何时开始出现瓶颈。如果您开始接收EsRejectedExecutionException,您的集群就不能再跟上了:至少有一个资源达到了容量。
要么减少并发性,或者提供更多有限的资源(例如从机械硬盘切换到ssd固态硬盘),要么添加更多节点。
2)借助scroll的sliced提升写入效率
Reindex支持Sliced Scroll以并行化重建索引过程。 这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。
sliced原理(from medcl)
1)用过Scroll接口吧,很慢?如果你数据量很大,用Scroll遍历数据那确实是接受不了,现在Scroll接口可以并发来进行数据遍历了。
2)每个Scroll请求,可以分成多个Slice请求,可以理解为切片,各Slice独立并行,利用Scroll重建或者遍历要快很多倍。
slicing使用举例
slicing的设定分为两种方式:手动设置分片、自动设置分片。
手动设置分片参见官网。
自动设置分片如下:
POST _reindex?slices=5&refresh
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}
slices大小设置注意事项:
1)slices大小的设置可以手动指定,或者设置slices设置为auto,auto的含义是:针对单索引,slices大小=分片数;针对多索引,slices=分片的最小值。
2)当slices的数量等于索引中的分片数量时,查询性能最高效。slices大小大于分片数,非但不会提升效率,反而会增加开销。
3)如果这个slices数字很大(例如500),建议选择一个较低的数字,因为过大的slices 会影响性能。
实践证明,比默认设置reindex速度能提升10倍+。
Index Alias
当针对某个指定的index进行操作时候,ElasticSearch中的API接受一个index的名字。当然在多个index的情况下也可以指定多个index。
index aliases的API允许我们为一个index指定别名。一个别名能够被映射到多个index中,当指定了index对应已经有其它index对应的别名之后,别名可以自动扩展到多个index。一个别名能够关联上一个过滤器,当搜索和路由的时候能够自动使用这个过滤器。
add别名
这个一个关联别名alias1到index test1的例子
curl -XPOST 'http://localhost:9200/_aliases' -d '
{
"actions" : [
{ "add" : { "index" : "test1", "alias" : "alias1" } }
]
}'
成功则返回
{"acknowledged":true}
remove别名
也可以删除别名
curl -XPOST 'http://localhost:9200/_aliases' -d '
{
"actions" : [
{ "remove" : { "index" : "test1", "alias" : "alias1" } }
]
}'
成功则返回
{"acknowledged":true}
重命名
重命名一个别名很简单,通过相同API的简单remove和add操作就可以了。这个操作是原子的,不用担心在很短的一段时间内别名并不指向任何一个index
curl -XPOST 'http://localhost:9200/_aliases' -d '
{
"actions" : [
{ "remove" : { "index" : "test1", "alias" : "alias1" } },
{ "add" : { "index" : "test1", "alias" : "alias2" } }
]
}'
成功则返回
{"acknowledged":true}
别名切换
当然,也可以通过同一个API实现alias到不同index的切换。在实际使用中,如果我们通过别名来访问ElasticSearch,那么可以通过别名指向index的切换来实现不同版本数据的快速切换。
curl -XPOST 'http://localhost:9200/_aliases' -d '
{
"actions" : [
{ "remove" : { "index" : "test1", "alias" : "alias2" } },
{ "add" : { "index" : "test", "alias" : "alias2" } }
]
}'
成功则返回
{"acknowledged":true}
指向多个index的别名
可以将一个别名指向多余一个index,通过简单的多个add操作
curl -XPOST 'http://localhost:9200/_aliases' -d '
{
"actions" : [
{ "add" : { "index" : "test1", "alias" : "alias1" } },
{ "add" : { "index" : "test2", "alias" : "alias1" } }
]
}'
成功则返回
{"acknowledged":true}
参考:
https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-reindex.html
https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-aliases.html