zoukankan      html  css  js  c++  java
  • elasticsearch的操作类

    import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
    import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.client.Client;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.InetSocketTransportAddress;
    import org.elasticsearch.index.IndexNotFoundException;
    import org.elasticsearch.index.query.QueryBuilder;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.index.query.RangeQueryBuilder;
    import org.elasticsearch.search.SearchHit;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Calendar;
    import java.util.Collections;
    import java.util.Date;
    
    /**
     * Created by guanxiangqing on 2016/9/24.
     */
    public class ESStorage {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ESStorage.class);
    
        /**
         * ElasticSearch集群的名称之KEY值
         */
        private static String ES_CLUSTER_NAME = "es.cluster.name";
    
        /**
         * ElasticSearch集群的地址之KEY值
         */
        private static String ES_CLUSTER_ADDR = "es.cluster.addr";
    
        /**
         * 创建一个Client变量
         */
        private static TransportClient client;
    
        /**
         * 创建一个锁对象
         */
        private static Object lock = new Object();
    
        /**
         * 初始化ElasticSearch客户端连接
         * @return
         * @throws UnknownHostException
         */
        public static Client buildClientByTransport() throws UnknownHostException {
            if(client == null){
                synchronized (lock) {
                    /**
                     * ElasticSearch集群的名称之Value值
                     */
                    String esClusterName =INSTANCE.getStringValue(ES_CLUSTER_NAME);
    
                    /**
                     * ElasticSearch集群的地址之Value值
                     */
                    String esClusterAddr = INSTANCE.getStringValue(ES_CLUSTER_ADDR);
    
                    Settings settings = Settings.settingsBuilder()
                            .put("cluster.name", esClusterName)
                            .put("client.transport.sniff", true).build();  // 自动嗅取其他的节点
    
                    //解析 es.cluster.addr=cloudwave3:9300,cloudwave2:9300,cloudwave1:9300
                    String[] singleEsURL = esClusterAddr.split(",");
    
                    if(singleEsURL.length == 1){
                        String esHostName = singleEsURL[0].split(":")[0];
                        int esPort = Integer.parseInt(singleEsURL[0].split(":")[1]);
    
                        client = TransportClient.builder().settings(settings).build()
                                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName), esPort));
                    } else if (singleEsURL.length == 2){
                        String esHostName0 = singleEsURL[0].split(":")[0];
                        int esPort0 = Integer.parseInt(singleEsURL[0].split(":")[1]);
    
                        String esHostName1 = singleEsURL[1].split(":")[0];
                        int esPort1 = Integer.parseInt(singleEsURL[1].split(":")[1]);
    
                        client = TransportClient.builder().settings(settings).build()
                                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName0), esPort0))
                                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName1), esPort1));
                    } else if (singleEsURL.length >= 3) {
                        String esHostName0 = singleEsURL[0].split(":")[0];
                        int esPort0 = Integer.parseInt(singleEsURL[0].split(":")[1]);
    
                        String esHostName1 = singleEsURL[1].split(":")[0];
                        int esPort1 = Integer.parseInt(singleEsURL[1].split(":")[1]);
    
                        String esHostName2 = singleEsURL[2].split(":")[0];
                        int esPort2 = Integer.parseInt(singleEsURL[2].split(":")[1]);
    
                        client = TransportClient.builder().settings(settings).build()
                                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName0), esPort0))
                                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName1), esPort1))
                                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName2), esPort2));
                    }
    
                }
            }
            return client;
        }
    
        /**
         * 删除ElasticSearch的索引
         * @param index  索引名称
         * @param type   索引来的类型
         * @param id     数据的ID
         * @throws Exception
         */
        public static void deleteEsIndexByIndexAndTypeAndId(String index, String type, String id) throws Exception{
            ESStorageOperationUnit.buildClientByTransport();
            DeleteResponse deleteresponse = client.prepareDelete(index, type, id)   //"logs", "log2015","150"
                    .execute()
                    .actionGet();
            LOGGER.info("Delete index:" + index + " type: " + type + " id :" + id +
                    " successfully, and the version of the delete operation is " + deleteresponse.getVersion());
        }
    
        /**
         * 基于ES的ID值,批量删除ElasticSearch的索引
         * @param index  索引名称
         * @param type  索引类型
         * @param queryIds  数据的ID集合
         * @throws Exception
         */
        public static void deleteEsIndexByIndexAndTypeAndIdWithBath(String index, String type, ArrayList<String> queryIds) throws Exception{
            for(String queryId: queryIds){
                ESStorage.deleteEsIndexByIndexAndTypeAndId(index,type,queryId);
            }
        }
    
    
        /**
         * 删除ElasticSearch的索引(只通过索引删除)
         * @param index  索引名称
         * @throws Exception
         */
        public void deleteEsIndex(String index) throws Exception{
            ESStorageOperationUnit.buildClientByTransport();
            DeleteIndexResponse delete = client.admin().indices()
                    .delete(new DeleteIndexRequest(index)).actionGet();
            LOGGER.info("The ElasticSearch index of " + index + " is deleted." + delete.toString());
        }
    
        /**
         * 批量删除ElasticSearch的索引(只通过索引删除)
         * @param indexList 要删除的索引集合
         * @throws Exception
         */
        public void deleteEsIndexByBatch(ArrayList<String> indexList) throws Exception{
            ESStorageOperationUnit.buildClientByTransport();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            ArrayList<String> esAllIndex =  ESStorage.acquireAllEsIndexList();
    
            for(String  index :  indexList){
                if(esAllIndex.contains(index)){
                    try{
                        client.admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
                        LOGGER.info("The ElasticSearch index of " + index + " is deleted." + sdf.format(new Date()));
                    } catch (IndexNotFoundException e){
                        LOGGER.info("The ElasticSearch index of " + index + "is not find");
                    }
                }
            }
        }
    
        /**
         * 通过日期和表名获取数据表的Index集合
         * @param weekDayList 基于时间划分的集合
         * @param tableName 数据表名称
         * @return
         */
        public ArrayList<String> acquireEsIndexList(String[] weekDayList, String tableName){
            ArrayList<String> esIndexList = new ArrayList<String>();
            String tableNameLowCase = tableName.toLowerCase();
            String esListTemp;
    
            for(int i = 0; i < weekDayList.length - 1 ; i++){
                esListTemp = tableNameLowCase + "-" + weekDayList[i] + "00-" + weekDayList[i+1] + "00";
                esIndexList.add(esListTemp);
            }
            return esIndexList;
        }
    
        /**
         * 获取ES中的所有索引值
         * @return
         */
        public static ArrayList<String> acquireAllEsIndexList() throws Exception{
            ArrayList<String> esIndexLists = new ArrayList<String>();
            ESStorageOperationUnit.buildClientByTransport();
            String[] indexList = client.admin().cluster().prepareState().execute().actionGet().getState().getMetaData().concreteAllIndices();
    
            for(int i = 0; i < indexList.length; i++){
                esIndexLists.add(indexList[i].trim());
            }
            return esIndexLists;
        }
    
        /**
         * 通过时间属性字段,及属性字段的值(时间的毫秒数),来查询符合条件的所有ID值
         * @param index  ES索引:
         * @param fieldName 
         * @param mill
         * @return
         * @throws Exception
         */
        public static ArrayList<String> acquireQueryIDsByTimeAttribute(String index, String esType, String fieldName, String millsTime) throws Exception{
            ESStorage.buildClientByTransport();
            QueryBuilder queryBuilder = buildQuery(fieldName,millsTime);
            SearchResponse response = client.prepareSearch(index)   // 索引
                    .setTypes(esType)
                    .setQuery(queryBuilder)                         // 组装的查询条件
                    .execute()
                    .actionGet();
    
            ArrayList<String> esIds = new ArrayList<String>();
            if (response != null) {
                System.out.println(response);
                for (SearchHit hit : response.getHits().getHits()) {
                    esIds.add(hit.getId());
                }
            }
            return esIds;
        }
    
        /**
         *
         * @param index
         * @param esType
         * @param fieldName
         * @param millsTime
         * @throws Exception
         */
        public static void  acquireQueryIDsByTimeAttributeAndDeleteEsData(String index, String esType, String fieldName, String millsTime) throws Exception{
            ESStorageOperationUnit.buildClientByTransport();
            long esHistsNumber = acquireQueryTotalhistsNumbers(index, esType, fieldName, millsTime);
            LOGGER.info("The total Number hits of ElasticSearch is " + esHistsNumber);
            if(esHistsNumber > 0){
                for(int i = 0; i < esHistsNumber/20000 + 1; i++){
                    QueryBuilder queryBuilder = buildQuery(fieldName,millsTime);
                    SearchResponse response = client.prepareSearch(index)   // 索引
                            .setTypes(esType)
                            .setQuery(queryBuilder).setSize(20000)         // 组装的查询条件
                            .execute()
                            .actionGet();
    
                    ArrayList<String> esIds = new ArrayList<String>();
                    if (response != null) {
                        for (SearchHit hit : response.getHits().getHits()) {
                            esIds.add(hit.getId());
                        }
                    }
                    deleteEsIndexByIndexAndTypeAndIdWithBath(index, esType, esIds);
                }
            }
        }
    
        /**
         * 获取特定条件,查询的Es中hits总数量(用于遍历所有的满足条件Es的hits数据)
         * @param index
         * @param esType
         * @param fieldName
         * @param millsTime
         * @return
         * @throws Exception
         */
        public static long acquireQueryTotalhistsNumbers(String index, String esType, String fieldName, String millsTime) throws Exception{
            ESStorageOperationUnit.buildClientByTransport();
            QueryBuilder queryBuilder = buildQuery(fieldName,millsTime);
            SearchResponse response = client.prepareSearch(index)   // 索引
                    .setTypes(esType)
                    .setQuery(queryBuilder)                         // 组装的查询条件
                    .execute()
                    .actionGet();
            return response.getHits().getTotalHits();
        }
    
        /**
         * 根据范围筛选,来组装条件,本方法基于ES的RangeQueryBuilder重组装
         * @param fieldName
         * @param millsTime
         * @return
         */
        public static QueryBuilder buildQuery(String fieldName, String millsTime) {
            RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(fieldName).lte(millsTime); //gt lt eq gte lte
            return rangeQueryBuilder;
        }
    
        /**
         * 通过集合Collection元素,或者改元素最大的一个值,实则为ES中最大的索引
         * 集合中元素格式:
         * @param 
         * @return 返回值格式
         */
        public static String acquireMaxValueFromCollectionsElement(ArrayList<String> collectionsElements){
            Collections.sort(collectionsElements);     //排序
            return collectionsElements.get(collectionsElements.size() - 1);
        }
    
        /**
         * 根据当前ES的索引,获取ES的下一个索引
         * @param currentEsIndex
         * @return
         */
        public static String acquireNextEsIndexByCurrentEsIndex(String currentEsIndex,String weekDays) throws Exception{
            String[] esIndex = currentEsIndex.split("-");
            String tableName = esIndex[0].trim();
            String lastTime = esIndex[2].trim();
            String yearMonthDay = lastTime.substring(0, 8);
            return  tableName + "-" + lastTime + "-" + WeekDayUtil.getNextDate(yearMonthDay,weekDays) + "00";
        }
    
        /**
         * 通过当前的Es索引,获取上一个月,最后最后一天的毫秒时间数
         * 比如:
         *      时间:2016-06-30 23:59:59
         *      毫秒数:1467302399000
         * @param currentEsIndex  Es的索引,
         * @return
         * @throws Exception
         */
        public static String acquireMillsTimeWithCurrentDMonthLastDayByCurrentEsIndex(String currentEsIndex) throws Exception{
            String[] esIndex = currentEsIndex.split("-");
            String lastTime = esIndex[2].trim();
            String year = lastTime.substring(0, 4);
            String month = lastTime.substring(4, 6);
    
            Calendar calendar = Calendar.getInstance();
            calendar.set(Calendar.MONTH, Integer.parseInt(month));
            calendar.set(Calendar.YEAR, Integer.parseInt(year));
    
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
            calendar.set(Calendar.DAY_OF_MONTH, 0);
            String lastTime_temp = format.format(calendar.getTime()) + " 24:00:00";
    
            SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            format2.parse(lastTime_temp).getTime();
            return format2.parse(lastTime_temp).getTime() + "";
        }
    
    
        public static void main(String args []) throws Exception{
            /**
             * 功能测试1:
             * 功能测试说明:
             * 初始化部分:公共模块
             */
            Settings settings = Settings.settingsBuilder().put("cluster.name", "").build();
            TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("cloudwave1"), 9300))
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("cloudwave2"), 9300))
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("cloudwave3"), 9300));
    
            /**
             * 功能测试4:
             * 测试删除多个索引,并且存在所有不存在的情况
    //
    //        ArrayList<String> indexList = new ArrayList<String>();
    //        indexList.add(index1);
    //        indexList.add(index2);
    //        indexList.add(index3);
    //        indexList.add(index4);
    //        indexList.add(index5);
    //
    //        for(String  index :  indexList){
    //            try {
    //                client.admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
    //                LOGGER.info("ES索引" + index + "删除结束:");
    //            } catch (IndexNotFoundException e){
    //                LOGGER.info("ES索引" + index + "不存在!!!");
    //            }
    //        }
    
            /**
             * 功能测试5:
             * 获取ES所有索引
             */
    //        String[] indexList = client.admin().cluster().prepareState().execute().actionGet().getState().getMetaData().concreteAllIndices();
    //        int parameterNumber = 0;
    //        System.out.println("Index List:");
    //        for (String index : indexList) {
    //            System.out.println(index);
    //            parameterNumber ++;
    //        }
    //        System.out.print("ES索引总数量" + parameterNumber);
    
            /**
             * 功能测试6:
             * 截取ES索引,并获取相应的信息(年月日)
             */
            String[] esIndex = "".split("-");
            String tableName = esIndex[0].trim();
            String lastTime = esIndex[2].trim();
    
            System.out.println("tableName:" + tableName);
            System.out.println("lastTime:" + lastTime);
    
            String year = lastTime.substring(0, 4);
            String month = lastTime.substring(4, 6);
            String day = lastTime.substring(6, 8);
            System.out.println("year:" + year);
            System.out.println("month:" + month);
            System.out.println("day:" + day);
    
            String yearMonthDay = lastTime.substring(0, 8);
            System.out.println("解析之后的值:" +tableName + "-" + lastTime + "-" + WeekDayUtil.getNextDate(yearMonthDay,"星期四") );
            System.out.println(tableName + "-" + lastTime + "-" + WeekDayUtil.getNextDate(yearMonthDay,"星期四") + "00");
    
            Calendar calendar = Calendar.getInstance();
            calendar.set(Calendar.MONTH, Integer.parseInt(month));
            calendar.set(Calendar.YEAR, Integer.parseInt(year));
    
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
    //        calendar.add(Calendar.MONTH, 1);
            calendar.set(Calendar.DAY_OF_MONTH, 0);
            System.out.println("当前月份最后一天时间1:" + format.format(calendar.getTime()));
            String lastTime_temp = format.format(calendar.getTime()) + " 23:59:59";
            System.out.println("当前月份最后一天时间2:" + lastTime_temp);
    
            SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            format2.parse(lastTime_temp).getTime();
            System.out.println("当前月份最后一天时间3:" + format2.parse(lastTime_temp).getTime());
            System.out.println("当前月份最后一天时间4:" + format2.format(format2.parse(lastTime_temp).getTime()));
        }
    }
  • 相关阅读:
    《java编程思想》读书笔记(二)第五章(2)
    《java编程思想》读书笔记(一)开篇&第五章(1)
    java_SE(Day15)_集合1
    cocos2d-x 3.2锚点,Point,addchild,getcontensize
    cocos2d-x 3.2,Label,Action,Listener,Menu Item等简单用法
    C++函数后面加const修饰
    #pragma once 与 #ifndef 解析
    图文解说:Nginx+tomcat配置集群负载均衡
    JUC (Java Util Concurrency) 基础内容概述
    设计模式之中介者模式(Mediator)
  • 原文地址:https://www.cnblogs.com/jinniezheng/p/6387737.html
Copyright © 2011-2022 走看看