zoukankan      html  css  js  c++  java
  • springboot程序进行定时删除elasticsearch6过期索引

    package com.shixun.syslogsearch.service.impl;
    
    import com.alibaba.fastjson.JSONObject;
    import com.shixun.syslogsearch.dao.SyslogRepository;
    import com.shixun.syslogsearch.entity.ESLog;
    import com.shixun.syslogsearch.entity.PageResult;
    import com.shixun.syslogsearch.entity.SysLogLoadFile;
    import com.shixun.syslogsearch.entity.SyslogQueueType;
    import com.shixun.syslogsearch.param.SysLogQueryParam;
    import com.shixun.syslogsearch.service.SyslogService;
    import com.shixun.syslogsearch.utils.*;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.lucene.document.StringField;
    import org.elasticsearch.action.ActionFuture;
    import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
    import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.client.indices.GetIndexRequest;
    import org.elasticsearch.client.indices.GetIndexResponse;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.search.sort.SortBuilders;
    import org.elasticsearch.search.sort.SortOrder;
    import org.elasticsearch.snapshots.SnapshotShardsService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.data.domain.Page;
    import org.springframework.data.domain.PageRequest;
    import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
    import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
    import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
    import org.springframework.data.elasticsearch.core.query.*;
    import org.springframework.stereotype.Service;
    
    import java.io.*;
    import java.nio.charset.Charset;
    import java.text.SimpleDateFormat;
    import java.util.*;
    import java.util.concurrent.TimeUnit;
    import java.util.regex.Pattern;
    
    @Service
    public class SyslogServiceImpl implements SyslogService {
        @Value("${collection.zipFilePath}")
        private String zipFilePath;
        @Value("${collection.unzipShellPath}")
        private String unzipShellPath;
        @Value("${collection.unzipTempPath}")
        private String unzipTempPath;
        @Value("${elasticsearch.indexPrefix}")
        private String indexPrefix;
        @Value("${elasticsearch.expireDays}")
        private String expireDays;
    
        @Autowired
        private RedisClient redisClient;
        @Autowired
        private ElasticsearchOperations elasticsearchTemplate;
        @Autowired
        private RestHighLevelClient elasticsearchClient;
        @Autowired
        private SyslogRepository syslogRepository;
        private static final Logger logger = LoggerFactory.getLogger(SyslogServiceImpl.class);
    
    
        private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyyMMddHHmmss");
        private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss");
    
        private final static String aliasName = "syslog"; //索引别名
    
        @Override
        public void addSyslog(String indexName, ESLog esLog) {
            /***添加数据**/
            IndexQuery indexQuery = new IndexQueryBuilder().withIndexName(indexName).withObject(esLog).build();
            elasticsearchTemplate.index(indexQuery);
        }
    
        @Override
        public void addSyslogDemo() {
            String indexName = generateIndexName();
            /***创建索引***/
            createIndex(indexName);
            /***添加数据***/
            ESLog esLog = new ESLog();
            esLog.setTime(System.currentTimeMillis());
            esLog.setSource_ip("127.0.0.1");
            esLog.setHost_ip("127.0.0.1");
            IndexQuery indexQuery = new IndexQueryBuilder().withIndexName(indexName).withObject(esLog).build();
            elasticsearchTemplate.index(indexQuery);
        }
    
        @Override
        public void updateRefreshInterval(String indexName, int second) {
            String value = "";
            if (second < 0) {
                value = "-1";
            } else {
                value = second + "s";
            }
            try {
                elasticsearchClient.indices().putSettings(new UpdateSettingsRequest(indexName).settings(Settings.builder().put("index.refresh_interval", value)));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public String generateIndexName() {
            String indexName = indexPrefix + sdf1.format(new Date());
            return indexName;
        }
    
        @Override
        public String generateIndexName(String fileName) {
            String indexName = indexPrefix + sdf2.format(new Date())+"-"+fileName.replace(".zip","");
            return indexName;
        }
    
    
        @Override
        public void createIndex(String indexName) {
            if (!elasticsearchTemplate.indexExists(indexName)) {
                /****生成索引***/
                ESLog esLog = new ESLog();
                elasticsearchTemplate.createIndex(indexName);
                elasticsearchTemplate.putMapping(indexName, "_doc", ESLog.class);
                /***为索引设置别名方便查询***/
                AliasQuery aliasQuery = new AliasBuilder()
                        .withIndexName(indexName)
                        .withAliasName(aliasName).build();
                elasticsearchTemplate.addAlias(aliasQuery);
            }
        }
    
        @Override
    /******清理过期索引****/
    public void clearExpireIndex() { logger.debug("索引起始名称为{},保留周期为{}天", indexPrefix, expireDays); String[] allIndexes = getAllIndices(); if (allIndexes.length > 0) { logger.debug("当前索引总数为:{}", allIndexes.length); List<String> timeoutList = getIndicesTimeout(allIndexes,indexPrefix, Long.parseLong(expireDays), TimeUnit.DAYS); if (timeoutList.size() > 0) { logger.debug("对于前缀为 {} 的索引 过期数目为:{}", indexPrefix, timeoutList.size()); timeoutList.forEach(indexName -> { if (elasticsearchTemplate.deleteIndex(indexName)) { logger.debug("成功删除 {} 索引", indexName); } else { logger.debug("删除 {} 索引 失败", indexName); } }); } } } /** * 获取指定索引的创建时间 * @param indexName 索引名称 * @return 索引的创建时间 */ @Override public String getCreateTimeForIndex(String indexName) { return String.valueOf(elasticsearchTemplate.getSetting(indexName).get("index.creation_date")); } /** * 获取前缀相同的过时索引 * @param allIndices 索引列表 * @param indexPrefix 要过滤的索引前缀 * @param time 超时时间 * @param timeUnit 时间单位 * @return 超时的索引列表 */ @Override public List<String> getIndicesTimeout(String[] allIndices, String indexPrefix, Long time, TimeUnit timeUnit) { List<String> list = new ArrayList<>(); for(String indexName:allIndices){ if(indexName.startsWith(indexPrefix)){ String createTime = getCreateTimeForIndex(indexName); if (System.currentTimeMillis() - Long.parseLong(createTime) > timeUnit.toMillis(time)) { logger.debug("索引 {} 已经过期,创建时间为{}", indexName, createTime); list.add(indexName); } } } return list; } /** * 获取所有index */ @Autowired public String[] getAllIndices() { GetIndexRequest request = new GetIndexRequest(indexPrefix+"*"); GetIndexResponse response = null; try { response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT); String[] indices = response.getIndices(); return indices; } catch (IOException e) { e.printStackTrace(); } return null; } }

    参考文档:https://blog.csdn.net/u013084266/article/details/103341569

     

  • 相关阅读:
    3.数据库分类
    2.Oracle数据库安装教程
    1.Oracle数据库简介
    MySQL最全存储引擎、索引使用及SQL优化的实践
    大数据各个工具随笔
    初入Shell
    第4章 DDL数据定义
    第3章 Hive数据类型
    第2章 Hive安装
    MySQL 创建数据库
  • 原文地址:https://www.cnblogs.com/yinliang/p/13892575.html
Copyright © 2011-2022 走看看