zoukankan      html  css  js  c++  java
  • es数据迁移脚本(python)

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    # 文件名:indiceCreate.py
    import sys
    import base64
    import time
    import httplib
    import json
    ## 老集群host(ip+port)
    oldClusterHost = "192.168.1.85:9200"
    ## 老集群用户名,可为空
    oldClusterUserName = "elastic"
    ## 老集群密码,可为空
    oldClusterPassword = "elastic"
    ## 新集群host(ip+port)
    newClusterHost = "192.168.1.118:9200"
    ## 新集群用户名,可为空
    newClusterUser = ""
    ## 新集群密码,可为空
    newClusterPassword = ""
    DEFAULT_REPLICAS = 0
    def httpRequest(method, host, endpoint, params="", username="", password=""):
        conn = httplib.HTTPConnection(host)
        headers = {}
        if (username != "") :
            'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
            base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
            headers["Authorization"] = "Basic %s" % base64string;
        if "GET" == method:
            headers["Content-Type"] = "application/x-www-form-urlencoded"
            conn.request(method=method, url=endpoint, headers=headers)
        else :
            headers["Content-Type"] = "application/json"
            conn.request(method=method, url=endpoint, body=params, headers=headers)
        response = conn.getresponse()
        res = response.read()
        return res
    def httpGet(host, endpoint, username="", password=""):
        return httpRequest("GET", host, endpoint, "", username, password)
    def httpPost(host, endpoint, params, username="", password=""):
        return httpRequest("POST", host, endpoint, params, username, password)
    def httpPut(host, endpoint, params, username="", password=""):
        return httpRequest("PUT", host, endpoint, params, username, password)
    def getIndices(host, username="", password=""):
        endpoint = "/_cat/indices"
        indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
        indicesList = indicesResult.split("\n")
        indexList = []
        for indices in indicesList:
            if (indices.find("open") > 0):
                indexList.append(indices.split()[2])
        return indexList
    def getSettings(index, host, username="", password=""):
        endpoint = "/" + index + "/_settings"
        indexSettings = httpGet(host, endpoint, username, password)
        print index + " 原始settings如下:\n" + indexSettings
        settingsDict = json.loads(indexSettings)
        ## 分片数默认和老集群索引保持一致
        number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
        ## 副本数默认为0
        number_of_replicas = DEFAULT_REPLICAS
        newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
        return newSetting
    def getMapping(index, host, username="", password=""):
        endpoint = "/" + index + "/_mapping"
        indexMapping = httpGet(host, endpoint, username, password)
        print index + " 原始mapping如下:\n" + indexMapping
        mappingDict = json.loads(indexMapping)
        mappings = json.dumps(mappingDict[index]["mappings"])
        newMapping = "\"mappings\" : " + mappings
        return newMapping
    def createIndexStatement(oldIndexName):
        settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
        mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
        createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
        return createstatement
    def createIndex(oldIndexName, newIndexName=""):
        if (newIndexName == "") :
            newIndexName = oldIndexName
        createstatement = createIndexStatement(oldIndexName)
        print "新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement
        endpoint = "/" + newIndexName
        createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
        print "新索引 " + newIndexName + " 创建结果:" + createResult
    ## main
    indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
    systemIndex = []
    for index in indexList:
        if (index.startswith(".")):
            systemIndex.append(index)
        else :
            createIndex(index, index)
    if (len(systemIndex) > 0) :
        for index in systemIndex:
            print index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~"
    以上同步的时候设置的副本数是0,目的是加快同步速度,同步完成后需要设置副本数,如下:
    curl -H "Content-Type: application/json" -XPUT 'http://192.168.1.118:9200/db_customer/_settings' -d '{
       "number_of_replicas" : 1
    }'
     
  • 相关阅读:
    【Spring源码深度解析系列 】Spring整体架构
    【Spring Boot && Spring Cloud系列】在spring-data-Redis中如何使用切换库
    自定义异常
    Java IO
    多线程和虚拟机的宏观理解
    mybatis添加记录时返回主键id
    从JVM角度看i++ 与++i
    JVM指令集介绍
    jvm 指令集代码
    javap -c命令详解
  • 原文地址:https://www.cnblogs.com/hxlasky/p/10138659.html
Copyright © 2011-2022 走看看