前言
elasticsearch官方并没有提供合适的备份工具,然而生产场景中备份却是的确需要的。
本文介绍了使用自己写的php脚本以及第三方工具来进行索引的备份,恢复以及删除等操作。
全量备份
elasticdump --input http://127.0.0.1:9200/logstash-postback --output logstash-postback_2017.01.17.json --limit 1000
elasticdump安装方法在下面
恢复
命令例子
elasticdump --input logstash-postback_2017.01.14.json --output http://127.0.0.1:9200/logstash-postback --limit 1000
详解
elasticdump命令需要单独安装。
--input 是指定输入,可指定文件,或需要备份的es集群。
--output是指定输出,可指定文件,或需要备份的es集群。
logstash-postback_2017.01.14.json 是指定的已备份的文件。
http://127.0.0.1:9200/logstash-postback 是指定的要恢复数据的elastictic集群,IP加端口加索引名。注意,索引名是不带时间的。
--limit 1000 一次导入一千条数据,加快进度。
elasticdump命令安装
yum install npm
npm install elasticdump -g
命令安装完毕,可以测试。
可能会报出nodejs的版本之类的错误,你需要升级一下版本。
npm install -g n
n stable
至此可以使用。
删除索引
php delete.php --index http://127.0.0.1:9200/<index> --start 2017-01-01 --end 2017-01-02
--start 选择删除这个索引中指定日期的内容
--end 不传则默认删除一天,也就是start的那天
<index> 要删除的索引名
cat delete.php
<?php date_default_timezone_set('Asia/Shanghai'); $longopts = array( 'index:', 'query:', 'start:', 'end:', ); $options = getopt('a', $longopts); if(!isset($options['index'])) { fwrite(STDERR, "index必须设置索引地址 "); exit(1); } $components = parse_url($options['index']); if(!isset($components['path'])) { fwrite(STDERR, 'index不可为空'); exit(1); } $host = "{$components['scheme']}://{$components['host']}:{$components['port']}"; $index = basename($components['path']); $query = isset($options['query']) ? $options['query'] : '{"query":{"match_all":{}}}'; if(isset($options['start'])) { $start_time = strtotime($options['start']); $start = date('Y-m-d', $start_time).'T00:00:00+0800'; if(isset($options['end'])) { $end_time = strtotime($options['end']); $end = date('Y-m-d', $end_time).'T00:00:00+0800'; } else { $end = date('Y-m-d', $start_time+86400).'T00:00:00+0800'; } $field = strpos($index, 'analysis')!==false ? 'create_time' : 'date'; $query = '{"size":1000,"_source":false,"query":{"filtered":{"filter":{"range":{"'.$field.'":{"gte":"'.$start.'","lt":"'.$end.'"}}}}}}'; } $scroll_id = null; $retry = 0; $num = 0; while(true) { if(is_null($scroll_id)) { $result = post("{$host}/{$index}/_search?scroll=2m", $query); } else { $result = get("{$host}/_search/scroll?scroll=2m&scroll_id={$scroll_id}"); } $json = json_decode($result, true); if(!isset($json['_scroll_id'])) { fwrite(STDERR, "查询失败:索引-{$index} 起始-{$start} 截止-{$end} "); sleep(5); $retry++; if($retry>10) { exit(4); } } $scroll_id = $json['_scroll_id']; $bulk = []; foreach($json['hits']['hits'] as $row) { unset($row['_score']); $bulk[] = json_encode(['delete'=>$row]); $num++; } if(count($json['hits']['hits'])==0) { break; } $check = post("{$host}/_bulk", implode(" ", $bulk)." "); fwrite(STDOUT, "{$num} "); usleep(100000); } echo "deleted:{$num} "; function get($url) { $handle = curl_init(); //curl_setopt($handle, CURLOPT_POST, 1); curl_setopt($handle, CURLOPT_HEADER, 0); curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1); curl_setopt($handle, CURLOPT_URL, $url); //curl_setopt($handle, CURLOPT_POSTFIELDS, $data); return curl_exec($handle); } function post($url, $data) { $handle = curl_init(); curl_setopt($handle, CURLOPT_POST, 1); curl_setopt($handle, CURLOPT_HEADER, 0); curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1); curl_setopt($handle, CURLOPT_URL, $url); curl_setopt($handle, CURLOPT_POSTFIELDS, $data); return curl_exec($handle); }
索引增量备份
elasticdump增量备份有BUG,无规律的丢数据。
于是我们自己写了脚本来进行增量备份。
备份脚本分为两个部分,PHP脚本沟通elasticsearch来进行数据读取与备份。SHELL脚本来写作传参与PHP脚本
SHELL脚本如下
#!/bin/bash #如果有参数,则第一个参数为备份日期,否则默认备份昨天数据 if [ -z "$1" ];then start=$(date +%Y-%m-%d --date '-1 day 00:00:00') end=$(date +%Y-%m-%d --date 'today 00:00:00') else start=$(date +%Y-%m-%d --date $1) end=$(date +%Y-%m-%d --date "$1 +1 day 00:00:00") fi #如果是每月1号则加一天,解决时区导致的数据跨索引问题 if [ $(date +%d --date $end) -eq "01" ] then end=$(date +%Y-%m-%d --date "$end +1 day 00:00:00") fi php esdump.php --input=http://127.0.0.1:9200/logstash-event-$(date +%Y.%m --date $start) --start $start --end $end 2>> backup_error_$start.log | gzip > event/logstash-event-$(date +%Y.%m_%d --date $start).json.gz 2>> backup_error_$start.log &
注:代码第18行的logstash-event替换为你要备份的索引名
PHP脚本如下(无需修改)
<?php date_default_timezone_set('Asia/Shanghai'); $longopts = array( 'input:', 'output:', 'query:', 'start:', 'end:', ); $options = getopt('a', $longopts); if(!isset($options['input'])) { fwrite(STDERR, "input必须设置索引地址 "); exit(1); } $check = get($options['input'].'/_count'); if($check===false) { fwrite(STDERR, "input索引地址无效:{$options['input']} "); exit(2); } $check = json_decode($check, true); if(!isset($check['count'])) { fwrite(STDERR, "input索引地址无效:{$options['input']} "); exit(3); } $components = parse_url($options['input']); $host = "{$components['scheme']}://{$components['host']}:{$components['port']}"; $index = basename($components['path']); $query = isset($options['query']) ? $options['query'] : '{"query":{"match_all":{}}}'; if(isset($options['start'])) { $start_time = strtotime($options['start']); $start = date('Y-m-d', $start_time).'T00:00:00+0800'; if(isset($options['end'])) { $end_time = strtotime($options['end']); $end = date('Y-m-d', $end_time).'T00:00:00+0800'; } else { $end = date('Y-m-d', $start_time+86400).'T00:00:00+0800'; } $field = strpos($index, 'analysis')!==false ? 'create_time' : 'date'; $query = '{"size":1000,"sort":{"'.$field.'":{"order":"asc"}},"query":{"filtered":{"filter":{"range":{"'.$field.'":{"gte":"'.$start.'","lt":"'.$end.'"}}}}}}'; if(strpos($index, 'eventlogs')!==false) { $query = '{"size":1000,"sort":{"'.$field.'":{"order":"asc"}},"query":{"filtered":{"filter":{"bool":{'. '"must":[{"range":{"date":{"gte":"'.$start.'","lte":"'.$end.'"}}}],'. '"must_not":[{"exists": {"field":"nsp3hq"}},{"exists": {"field":"q0i8u1"}},{"exists": {"field":"eyn916"}},{"exists": {"field":"20mqd8"}},'. '{"exists": {"field":"wwbkux"}},{"exists": {"field":"r5ua96"}},{"exists": {"field":"easiz"}},{"exists": {"field":"dexusu"}},{"exists": {"field":"earts"}},'. '{"exists": {"field":"ealu"}},{"exists": {"field":"ealf"}},{"exists": {"field":"eal"}},{"exists": {"field":"ears"}},{"exists": {"field":"ealuf"}},'. '{"exists": {"field":"ealus"}},{"exists": {"field":"eaatf"}},{"exists": {"field":"enail"}},{"exists": {"field":"enuail"}},{"exists": {"field":"test"}}]'. '}}}}}'; } } $scroll_id = null; $retry = 0; $num = 0; while(true) { if(is_null($scroll_id)) { $result = post("{$host}/{$index}/_search?scroll=2m", $query); } else { $result = get("{$host}/_search/scroll?scroll=2m&scroll_id={$scroll_id}"); } $json = json_decode($result, true); if(!isset($json['_scroll_id'])) { fwrite(STDERR, "查询失败:索引-{$index} 起始-{$start} 截止-{$end} "); sleep(5); $retry++; if($retry>10) { exit(4); } } $scroll_id = $json['_scroll_id']; foreach($json['hits']['hits'] as $row) { fwrite(STDOUT, json_encode($row)." "); $num++; } if(count($json['hits']['hits'])==0) { break; } usleep(100000); } //校验条数是否一致 $query = json_decode($query, true); unset($query['size'], $query['sort']); $result = post("{$host}/{$index}/_count", json_encode($query)); $json = json_decode($result, true); if(!isset($json['count']) or intval($json['count'])!==$num) { fwrite(STDERR, "校验失败:索引-{$index} 起始-{$start} 截止-{$end} 记录条数-{$json['count']} 导出条数-{$num} "); } function get($url) { $handle = curl_init(); //curl_setopt($handle, CURLOPT_POST, 1); curl_setopt($handle, CURLOPT_HEADER, 0); curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1); curl_setopt($handle, CURLOPT_URL, $url); //curl_setopt($handle, CURLOPT_POSTFIELDS, $data); return curl_exec($handle); } function post($url, $data) { $handle = curl_init(); curl_setopt($handle, CURLOPT_POST, 1); curl_setopt($handle, CURLOPT_HEADER, 0); curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1); curl_setopt($handle, CURLOPT_URL, $url); curl_setopt($handle, CURLOPT_POSTFIELDS, $data); return curl_exec($handle); }
备份时执行shell脚本即可备份昨天的增量数据
谢土豪
如果有帮到你的话,请赞赏我吧!
本文为kerwin原创,转载请注明出处。
http://www.cnblogs.com/kerwinC/p/6296675.html