zoukankan      html  css  js  c++  java
  • ELASTIC的备份与恢复

    前言

    elasticsearch官方并没有提供合适的备份工具,然而生产场景中备份却是的确需要的。

    本文介绍了使用自己写的php脚本以及第三方工具来进行索引的备份,恢复以及删除等操作。

    全量备份

    elasticdump --input  http://127.0.0.1:9200/logstash-postback --output  logstash-postback_2017.01.17.json --limit 1000
     
    elasticdump安装方法在下面

    恢复

    命令例子
    elasticdump --input logstash-postback_2017.01.14.json --output http://127.0.0.1:9200/logstash-postback --limit 1000
    详解
    elasticdump命令需要单独安装。
    --input 是指定输入,可指定文件,或需要备份的es集群。
    --output是指定输出,可指定文件,或需要备份的es集群。
    logstash-postback_2017.01.14.json  是指定的已备份的文件。
     http://127.0.0.1:9200/logstash-postback  是指定的要恢复数据的elastictic集群,IP加端口加索引名。注意,索引名是不带时间的。
    --limit 1000 一次导入一千条数据,加快进度。
     
    elasticdump命令安装
    yum install npm
    npm install elasticdump -g
    命令安装完毕,可以测试。
    可能会报出nodejs的版本之类的错误,你需要升级一下版本。
    npm install -g n
    n stable
    至此可以使用。
     

    删除索引

     php delete.php --index http://127.0.0.1:9200/<index> --start 2017-01-01 --end 2017-01-02  
    --start 选择删除这个索引中指定日期的内容
    --end 不传则默认删除一天,也就是start的那天
    <index> 要删除的索引名
     
    cat delete.php
     
    <?php
    date_default_timezone_set('Asia/Shanghai');
    
    $longopts  = array(
        'index:',
        'query:',
        'start:',
        'end:',
    );
    $options = getopt('a', $longopts);
    
    if(!isset($options['index'])) {
            fwrite(STDERR, "index必须设置索引地址
    ");
            exit(1);
    }
    
    $components = parse_url($options['index']);
    if(!isset($components['path'])) {
            fwrite(STDERR, 'index不可为空');
            exit(1);
    }
    
    $host  = "{$components['scheme']}://{$components['host']}:{$components['port']}";
    $index = basename($components['path']);
    
    $query = isset($options['query']) ? $options['query'] : '{"query":{"match_all":{}}}';
    if(isset($options['start'])) {
            $start_time = strtotime($options['start']);
            $start = date('Y-m-d', $start_time).'T00:00:00+0800';
            if(isset($options['end'])) {
                    $end_time = strtotime($options['end']);
                    $end = date('Y-m-d', $end_time).'T00:00:00+0800';
            } else {
                    $end = date('Y-m-d', $start_time+86400).'T00:00:00+0800';
            }
    
            $field = strpos($index, 'analysis')!==false ? 'create_time' : 'date';
            $query = '{"size":1000,"_source":false,"query":{"filtered":{"filter":{"range":{"'.$field.'":{"gte":"'.$start.'","lt":"'.$end.'"}}}}}}';
    }
    
    $scroll_id = null;
    $retry     = 0;
    $num       = 0;
    while(true) {
            if(is_null($scroll_id)) {
                    $result = post("{$host}/{$index}/_search?scroll=2m", $query);
            } else {
                    $result = get("{$host}/_search/scroll?scroll=2m&scroll_id={$scroll_id}");
            }
    
            $json = json_decode($result, true);
            if(!isset($json['_scroll_id'])) {
                    fwrite(STDERR, "查询失败:索引-{$index} 起始-{$start}  截止-{$end}
    ");
                    sleep(5);
                    $retry++;
                    if($retry>10) {
                            exit(4);
                    }
            }
    
            $scroll_id = $json['_scroll_id'];
            $bulk = [];
            foreach($json['hits']['hits'] as $row) {
                    unset($row['_score']);
                    $bulk[] = json_encode(['delete'=>$row]);
                    $num++;
            }
    
            if(count($json['hits']['hits'])==0)
            {
                    break;
            }
    
            $check = post("{$host}/_bulk", implode("
    ", $bulk)."
    ");
            fwrite(STDOUT, "{$num}
    ");
            usleep(100000);
    }
    
    echo "deleted:{$num}
    ";
    
    function get($url)
    {
            $handle = curl_init();
            //curl_setopt($handle, CURLOPT_POST, 1);
            curl_setopt($handle, CURLOPT_HEADER, 0);
            curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
            curl_setopt($handle, CURLOPT_URL, $url);
            //curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
            return curl_exec($handle);
    }
    
    function post($url, $data)
    {
            $handle = curl_init();
            curl_setopt($handle, CURLOPT_POST, 1);
            curl_setopt($handle, CURLOPT_HEADER, 0);
            curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
            curl_setopt($handle, CURLOPT_URL, $url);
            curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
            return curl_exec($handle);
    }
     
     

    索引增量备份

    elasticdump增量备份有BUG,无规律的丢数据。
    于是我们自己写了脚本来进行增量备份。
    备份脚本分为两个部分,PHP脚本沟通elasticsearch来进行数据读取与备份。SHELL脚本来写作传参与PHP脚本
     
    SHELL脚本如下
     
    #!/bin/bash
    
    #如果有参数,则第一个参数为备份日期,否则默认备份昨天数据
    if [ -z "$1" ];then
            start=$(date +%Y-%m-%d --date '-1 day 00:00:00')
            end=$(date +%Y-%m-%d --date 'today 00:00:00')
    else
            start=$(date +%Y-%m-%d --date $1)
            end=$(date +%Y-%m-%d --date "$1 +1 day 00:00:00")
    fi
    
    #如果是每月1号则加一天,解决时区导致的数据跨索引问题
    if [ $(date +%d --date $end) -eq "01" ]
    then
            end=$(date +%Y-%m-%d --date "$end +1 day 00:00:00")
    fi
    
    php esdump.php --input=http://127.0.0.1:9200/logstash-event-$(date +%Y.%m --date $start) --start $start --end $end  2>> backup_error_$start.log | gzip > event/logstash-event-$(date +%Y.%m_%d --date $start).json.gz  2>> backup_error_$start.log &
     
    注:代码第18行的logstash-event替换为你要备份的索引名
     
    PHP脚本如下(无需修改)
    <?php
    date_default_timezone_set('Asia/Shanghai');
    
    $longopts  = array(
        'input:',
        'output:',
        'query:',
        'start:',
        'end:',
    );
    $options = getopt('a', $longopts);
    
    if(!isset($options['input'])) {
        fwrite(STDERR, "input必须设置索引地址
    ");
        exit(1);
    }
    
    $check = get($options['input'].'/_count');
    if($check===false) {
        fwrite(STDERR, "input索引地址无效:{$options['input']}
    ");
        exit(2);
    }
    
    $check = json_decode($check, true);
    if(!isset($check['count'])) {
        fwrite(STDERR, "input索引地址无效:{$options['input']}
    ");
        exit(3);
    }
    
    $components = parse_url($options['input']);
    $host  = "{$components['scheme']}://{$components['host']}:{$components['port']}";
    $index = basename($components['path']);
    
    $query = isset($options['query']) ? $options['query'] : '{"query":{"match_all":{}}}';
    if(isset($options['start'])) {
        $start_time = strtotime($options['start']);
        $start = date('Y-m-d', $start_time).'T00:00:00+0800';
        if(isset($options['end'])) {
            $end_time = strtotime($options['end']);
            $end = date('Y-m-d', $end_time).'T00:00:00+0800';
        } else {
            $end = date('Y-m-d', $start_time+86400).'T00:00:00+0800';
        }
    
        $field = strpos($index, 'analysis')!==false ? 'create_time' : 'date';
        $query = '{"size":1000,"sort":{"'.$field.'":{"order":"asc"}},"query":{"filtered":{"filter":{"range":{"'.$field.'":{"gte":"'.$start.'","lt":"'.$end.'"}}}}}}';
    
        if(strpos($index, 'eventlogs')!==false) {
            $query = '{"size":1000,"sort":{"'.$field.'":{"order":"asc"}},"query":{"filtered":{"filter":{"bool":{'.
                '"must":[{"range":{"date":{"gte":"'.$start.'","lte":"'.$end.'"}}}],'.
                            '"must_not":[{"exists": {"field":"nsp3hq"}},{"exists": {"field":"q0i8u1"}},{"exists": {"field":"eyn916"}},{"exists": {"field":"20mqd8"}},'.
                            '{"exists": {"field":"wwbkux"}},{"exists": {"field":"r5ua96"}},{"exists": {"field":"easiz"}},{"exists": {"field":"dexusu"}},{"exists": {"field":"earts"}},'.
                            '{"exists": {"field":"ealu"}},{"exists": {"field":"ealf"}},{"exists": {"field":"eal"}},{"exists": {"field":"ears"}},{"exists": {"field":"ealuf"}},'.
                            '{"exists": {"field":"ealus"}},{"exists": {"field":"eaatf"}},{"exists": {"field":"enail"}},{"exists": {"field":"enuail"}},{"exists": {"field":"test"}}]'.
                            '}}}}}';
             }
    }
    
    $scroll_id = null;
    $retry     = 0;
    $num       = 0;
    while(true) {
        if(is_null($scroll_id)) {
            $result = post("{$host}/{$index}/_search?scroll=2m", $query);
        } else {
            $result = get("{$host}/_search/scroll?scroll=2m&scroll_id={$scroll_id}");
        }
    
        $json = json_decode($result, true);
        if(!isset($json['_scroll_id'])) {
            fwrite(STDERR, "查询失败:索引-{$index} 起始-{$start}  截止-{$end}
    ");
            sleep(5);
            $retry++;
            if($retry>10) {
                exit(4);
            }
        }
    
        $scroll_id = $json['_scroll_id'];
    
        foreach($json['hits']['hits'] as $row) {
            fwrite(STDOUT, json_encode($row)."
    ");
            $num++;
        }
    
        if(count($json['hits']['hits'])==0)
        {
            break;
        }
    
        usleep(100000);
    }
    
    //校验条数是否一致
    $query = json_decode($query, true);
    unset($query['size'], $query['sort']);
    $result = post("{$host}/{$index}/_count", json_encode($query));
    $json = json_decode($result, true);
    if(!isset($json['count']) or intval($json['count'])!==$num) {
        fwrite(STDERR, "校验失败:索引-{$index} 起始-{$start} 截止-{$end} 记录条数-{$json['count']} 导出条数-{$num}
    ");
    }
    
    function get($url)
    {
        $handle = curl_init();
        //curl_setopt($handle, CURLOPT_POST, 1);
        curl_setopt($handle, CURLOPT_HEADER, 0);
        curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($handle, CURLOPT_URL, $url);
        //curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
        return curl_exec($handle);
    }
    
    function post($url, $data)
    {
        $handle = curl_init();
        curl_setopt($handle, CURLOPT_POST, 1);
        curl_setopt($handle, CURLOPT_HEADER, 0);
        curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($handle, CURLOPT_URL, $url);
        curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
        return curl_exec($handle);
    }

    备份时执行shell脚本即可备份昨天的增量数据

    谢土豪

    如果有帮到你的话,请赞赏我吧!

    本文为kerwin原创,转载请注明出处。

     http://www.cnblogs.com/kerwinC/p/6296675.html

     

  • 相关阅读:
    最近迷上用dvd字幕学习英语
    原始套接字
    c语言socket编程
    inet_aton和inet_network和inet_addr三者比较
    用man来查找c函数库
    ubuntu的系统日志配置文件的位置
    复制文件
    vim复制粘贴解密(转)
    vim的自动补齐功能
    两个数据结构ip和tcphdr
  • 原文地址:https://www.cnblogs.com/kerwinC/p/6296675.html
Copyright © 2011-2022 走看看