zoukankan      html  css  js  c++  java
  • 【ElasticSearch】线上索引重建

    项目背景:

      1.由于项目中存在旧索引设置不合理情况,需要进行索引重建

      2.线上的ElasticSearch由1台扩容到3台,原有的索引需要分片

      例如:

       旧索引 index_user 设置主分片为1,副分片为0,数据没有高可用

    GET index_user/_search
    {
      "took" : 121,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      }

    实现步骤:

      1.新建索引,index_user_v2设置我们所需要的主分片和副分片数量

    PUT index_user_v2
    {
      "settings": {
        "number_of_replicas": 1,
        "number_of_shards": 5
      }
    }

      2.设置索引数据结构,因为新索引和旧索引mapping结构一致,索引可以直接copy旧索引的数据结构;

    PUT index_user_v2/t_user/_mappings
    {
      "properties": {
        "age": {
          "type": "integer"
        },
        "ageScope": {
          "type": "keyword"
        },
        "birthday": {
          "type": "long"
        },
        "cityId": {
          "type": "integer"
        },
        "cityName": {
          "type": "keyword"
        },
        "countryCode": {
          "type": "integer"
        },
        "countyId": {
          "type": "integer"
        },
        "create_time": {
          "type": "long"
        },
        "dbId": {
          "type": "long"
        },
        "email": {
          "type": "keyword"
        },
        "gameIds": {
          "type": "text",
          "analyzer": "ik_max_word"
        },
        "isCreateServer": {
          "type": "integer"
        },
        "isDelete": {
          "type": "boolean"
        },
        "nickName": {
          "type": "text",
          "analyzer": "ik_smart"
        },
        "nickNamePingYin": {
          "type": "text",
          "analyzer": "pinyin"
        },
        "nnNumber": {
          "type": "long"
        },
        "provinceId": {
          "type": "integer"
        },
        "provinceName": {
          "type": "keyword"
        },
        "sex": {
          "type": "keyword"
        },
        "signature": {
          "type": "keyword"
        },
        "status": {
          "type": "keyword"
        },
        "telNum": {
          "type": "keyword"
        },
        "updae_time": {
          "type": "long"
        },
        "userId": {
          "type": "long"
        },
        "userType": {
          "type": "keyword"
        },
        "userUrl": {
          "type": "keyword"
        },
        "userUrlNn": {
          "type": "keyword"
        },
        "user_id": {
          "type": "long"
        }
      }
    }

      3. 执行完步骤1和步骤2之后,在Kibana->Monitoring->Node里面可以看到索引index_user_v2已经被自动分片到三个节点,如图

               

        这里,正式开始索引重建之前,可以将index_user_v2的副分片数量设置为0,减少副分片写入带来的时间损耗

      PUT index_user_v2/_settings
      {
        "settings": {
          "number_of_replicas": 0
        }
      }

       4.执行索引迁移,将index_user上的数据复制到index_user_v2, 同时设置 wait_for_completion=false 表示索引迁移的请求会在后台执行

    # 索引迁移
    POST /_reindex?wait_for_completion=false
    {
      "source": {
        "index": "index_user"
      },
      "dest": {
        "index":"index_user_v2"
      }
    }

      执行后,会生成一个taskId  :  例如:Mroifc1NSJq2s7mf38XxmA:1679363718,后续我们可以使用这个taskId去查询这个迁移任务的状态,耗时,以及执行的进度等等

    GET _tasks/Mroifc1NSJq2s7mf38XxmA:1679363718
    {
      "completed" : true,
      "task" : {
        "node" : "Mroifc1NSJq2s7mf38XxmA",
        "id" : 1679363718,
        "type" : "transport",
        "action" : "indices:data/write/reindex",
        "status" : {
          "total" : 15480531,
          "updated" : 0,
          "created" : 15480531,
          "deleted" : 0,
          "batches" : 15481,
          "version_conflicts" : 0,
          "noops" : 0,
          "retries" : {
            "bulk" : 0,
            "search" : 0
          },
          "throttled_millis" : 0,
          "requests_per_second" : -1.0,
          "throttled_until_millis" : 0
        },
        "description" : "reindex from [index_user] to [index_user_v2]",
        "start_time_in_millis" : 1623316057822,
        "running_time_in_nanos" : 594661905143,
        "cancellable" : true,
        "headers" : { }
      },
      "response" : {
        "took" : 594661,
        "timed_out" : false,
        "total" : 15480531,
        "updated" : 0,
        "created" : 15480531,
        "deleted" : 0,
        "batches" : 15481,
        "version_conflicts" : 0,
        "noops" : 0,
        "retries" : {
          "bulk" : 0,
          "search" : 0
        },
        "throttled" : "0s",
        "throttled_millis" : 0,
        "requests_per_second" : -1.0,
        "throttled_until" : "0s",
        "throttled_until_millis" : 0,
        "failures" : [ ]
      }
    }

        5.任务完成后  

        将旧索引index_user的别名index_user_latest 移除

        新索引index_user_v2添加别名index_user_latest

        至此完成全部的索引重建任务

    # 别名替换
    POST _aliases
    {
      "actions": [
        {
          "add": {
            "index": "index_user_v2",
            "alias": "index_user_latest"
          }
        },
        {
          "remove": {
            "index": "index_user",
            "alias": "index_user_latest"
          }
        }
      ]
    }

    事后思考:

      1.执行_reindex索引迁移时,会读取当前index_user旧索引的数量 15480602条数据,将这批数据复制到新索引index_user_v2中

      但是实际生产会持续写数据到旧索引index_user中,导致reindex复制的数据,会略小于实际的数据量

      处理方式:该索引的数据是StreamSet实时同步MySQL的数据到ElasticSearch中,这里可以将StreamSet停止,记录复制开始的时间,待复制完成后进行数据的增量同步;

      2.这里有个点可以优化,_reindex复制后, wait_for_completion=false 会生成任务,可以将任务Id写入定时任务中,轮训该任务的状态,任务结束后,可以及时通知;

      

       

  • 相关阅读:
    [Effective JavaScript 笔记]第54条:将undefined看做“没有值”
    [Effective JavaScript 笔记]第53条:保持一致的约定
    UDP打洞原理介绍
    Uboot启动分析之Start.S
    MMU
    linux_shell
    SSH2配置
    线程同步
    C#线程基础
    客户端服务器通讯常用的一种方法——Marshal类
  • 原文地址:https://www.cnblogs.com/july-sunny/p/14872754.html
Copyright © 2011-2022 走看看