import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.SearchHit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; import java.util.Date; /** * Created by guanxiangqing on 2016/9/24. */ public class ESStorage { private static final Logger LOGGER = LoggerFactory.getLogger(ESStorage.class); /** * ElasticSearch集群的名称之KEY值 */ private static String ES_CLUSTER_NAME = "es.cluster.name"; /** * ElasticSearch集群的地址之KEY值 */ private static String ES_CLUSTER_ADDR = "es.cluster.addr"; /** * 创建一个Client变量 */ private static TransportClient client; /** * 创建一个锁对象 */ private static Object lock = new Object(); /** * 初始化ElasticSearch客户端连接 * @return * @throws UnknownHostException */ public static Client buildClientByTransport() throws UnknownHostException { if(client == null){ synchronized (lock) { /** * ElasticSearch集群的名称之Value值 */ String esClusterName =INSTANCE.getStringValue(ES_CLUSTER_NAME); /** * ElasticSearch集群的地址之Value值 */ String esClusterAddr = INSTANCE.getStringValue(ES_CLUSTER_ADDR); Settings settings = Settings.settingsBuilder() .put("cluster.name", esClusterName) .put("client.transport.sniff", true).build(); // 自动嗅取其他的节点 //解析 es.cluster.addr=cloudwave3:9300,cloudwave2:9300,cloudwave1:9300 String[] singleEsURL = esClusterAddr.split(","); if(singleEsURL.length == 1){ String esHostName = singleEsURL[0].split(":")[0]; int esPort = Integer.parseInt(singleEsURL[0].split(":")[1]); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName), esPort)); } else if (singleEsURL.length == 2){ String esHostName0 = singleEsURL[0].split(":")[0]; int esPort0 = Integer.parseInt(singleEsURL[0].split(":")[1]); String esHostName1 = singleEsURL[1].split(":")[0]; int esPort1 = Integer.parseInt(singleEsURL[1].split(":")[1]); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName0), esPort0)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName1), esPort1)); } else if (singleEsURL.length >= 3) { String esHostName0 = singleEsURL[0].split(":")[0]; int esPort0 = Integer.parseInt(singleEsURL[0].split(":")[1]); String esHostName1 = singleEsURL[1].split(":")[0]; int esPort1 = Integer.parseInt(singleEsURL[1].split(":")[1]); String esHostName2 = singleEsURL[2].split(":")[0]; int esPort2 = Integer.parseInt(singleEsURL[2].split(":")[1]); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName0), esPort0)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName1), esPort1)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName2), esPort2)); } } } return client; } /** * 删除ElasticSearch的索引 * @param index 索引名称 * @param type 索引来的类型 * @param id 数据的ID * @throws Exception */ public static void deleteEsIndexByIndexAndTypeAndId(String index, String type, String id) throws Exception{ ESStorageOperationUnit.buildClientByTransport(); DeleteResponse deleteresponse = client.prepareDelete(index, type, id) //"logs", "log2015","150" .execute() .actionGet(); LOGGER.info("Delete index:" + index + " type: " + type + " id :" + id + " successfully, and the version of the delete operation is " + deleteresponse.getVersion()); } /** * 基于ES的ID值,批量删除ElasticSearch的索引 * @param index 索引名称 * @param type 索引类型 * @param queryIds 数据的ID集合 * @throws Exception */ public static void deleteEsIndexByIndexAndTypeAndIdWithBath(String index, String type, ArrayList<String> queryIds) throws Exception{ for(String queryId: queryIds){ ESStorage.deleteEsIndexByIndexAndTypeAndId(index,type,queryId); } } /** * 删除ElasticSearch的索引(只通过索引删除) * @param index 索引名称 * @throws Exception */ public void deleteEsIndex(String index) throws Exception{ ESStorageOperationUnit.buildClientByTransport(); DeleteIndexResponse delete = client.admin().indices() .delete(new DeleteIndexRequest(index)).actionGet(); LOGGER.info("The ElasticSearch index of " + index + " is deleted." + delete.toString()); } /** * 批量删除ElasticSearch的索引(只通过索引删除) * @param indexList 要删除的索引集合 * @throws Exception */ public void deleteEsIndexByBatch(ArrayList<String> indexList) throws Exception{ ESStorageOperationUnit.buildClientByTransport(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); ArrayList<String> esAllIndex = ESStorage.acquireAllEsIndexList(); for(String index : indexList){ if(esAllIndex.contains(index)){ try{ client.admin().indices().delete(new DeleteIndexRequest(index)).actionGet(); LOGGER.info("The ElasticSearch index of " + index + " is deleted." + sdf.format(new Date())); } catch (IndexNotFoundException e){ LOGGER.info("The ElasticSearch index of " + index + "is not find"); } } } } /** * 通过日期和表名获取数据表的Index集合 * @param weekDayList 基于时间划分的集合 * @param tableName 数据表名称 * @return */ public ArrayList<String> acquireEsIndexList(String[] weekDayList, String tableName){ ArrayList<String> esIndexList = new ArrayList<String>(); String tableNameLowCase = tableName.toLowerCase(); String esListTemp; for(int i = 0; i < weekDayList.length - 1 ; i++){ esListTemp = tableNameLowCase + "-" + weekDayList[i] + "00-" + weekDayList[i+1] + "00"; esIndexList.add(esListTemp); } return esIndexList; } /** * 获取ES中的所有索引值 * @return */ public static ArrayList<String> acquireAllEsIndexList() throws Exception{ ArrayList<String> esIndexLists = new ArrayList<String>(); ESStorageOperationUnit.buildClientByTransport(); String[] indexList = client.admin().cluster().prepareState().execute().actionGet().getState().getMetaData().concreteAllIndices(); for(int i = 0; i < indexList.length; i++){ esIndexLists.add(indexList[i].trim()); } return esIndexLists; } /** * 通过时间属性字段,及属性字段的值(时间的毫秒数),来查询符合条件的所有ID值 * @param index ES索引: * @param fieldName * @param mill * @return * @throws Exception */ public static ArrayList<String> acquireQueryIDsByTimeAttribute(String index, String esType, String fieldName, String millsTime) throws Exception{ ESStorage.buildClientByTransport(); QueryBuilder queryBuilder = buildQuery(fieldName,millsTime); SearchResponse response = client.prepareSearch(index) // 索引 .setTypes(esType) .setQuery(queryBuilder) // 组装的查询条件 .execute() .actionGet(); ArrayList<String> esIds = new ArrayList<String>(); if (response != null) { System.out.println(response); for (SearchHit hit : response.getHits().getHits()) { esIds.add(hit.getId()); } } return esIds; } /** * * @param index * @param esType * @param fieldName * @param millsTime * @throws Exception */ public static void acquireQueryIDsByTimeAttributeAndDeleteEsData(String index, String esType, String fieldName, String millsTime) throws Exception{ ESStorageOperationUnit.buildClientByTransport(); long esHistsNumber = acquireQueryTotalhistsNumbers(index, esType, fieldName, millsTime); LOGGER.info("The total Number hits of ElasticSearch is " + esHistsNumber); if(esHistsNumber > 0){ for(int i = 0; i < esHistsNumber/20000 + 1; i++){ QueryBuilder queryBuilder = buildQuery(fieldName,millsTime); SearchResponse response = client.prepareSearch(index) // 索引 .setTypes(esType) .setQuery(queryBuilder).setSize(20000) // 组装的查询条件 .execute() .actionGet(); ArrayList<String> esIds = new ArrayList<String>(); if (response != null) { for (SearchHit hit : response.getHits().getHits()) { esIds.add(hit.getId()); } } deleteEsIndexByIndexAndTypeAndIdWithBath(index, esType, esIds); } } } /** * 获取特定条件,查询的Es中hits总数量(用于遍历所有的满足条件Es的hits数据) * @param index * @param esType * @param fieldName * @param millsTime * @return * @throws Exception */ public static long acquireQueryTotalhistsNumbers(String index, String esType, String fieldName, String millsTime) throws Exception{ ESStorageOperationUnit.buildClientByTransport(); QueryBuilder queryBuilder = buildQuery(fieldName,millsTime); SearchResponse response = client.prepareSearch(index) // 索引 .setTypes(esType) .setQuery(queryBuilder) // 组装的查询条件 .execute() .actionGet(); return response.getHits().getTotalHits(); } /** * 根据范围筛选,来组装条件,本方法基于ES的RangeQueryBuilder重组装 * @param fieldName * @param millsTime * @return */ public static QueryBuilder buildQuery(String fieldName, String millsTime) { RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(fieldName).lte(millsTime); //gt lt eq gte lte return rangeQueryBuilder; } /** * 通过集合Collection元素,或者改元素最大的一个值,实则为ES中最大的索引 * 集合中元素格式: * @param * @return 返回值格式 */ public static String acquireMaxValueFromCollectionsElement(ArrayList<String> collectionsElements){ Collections.sort(collectionsElements); //排序 return collectionsElements.get(collectionsElements.size() - 1); } /** * 根据当前ES的索引,获取ES的下一个索引 * @param currentEsIndex * @return */ public static String acquireNextEsIndexByCurrentEsIndex(String currentEsIndex,String weekDays) throws Exception{ String[] esIndex = currentEsIndex.split("-"); String tableName = esIndex[0].trim(); String lastTime = esIndex[2].trim(); String yearMonthDay = lastTime.substring(0, 8); return tableName + "-" + lastTime + "-" + WeekDayUtil.getNextDate(yearMonthDay,weekDays) + "00"; } /** * 通过当前的Es索引,获取上一个月,最后最后一天的毫秒时间数 * 比如: * 时间:2016-06-30 23:59:59 * 毫秒数:1467302399000 * @param currentEsIndex Es的索引, * @return * @throws Exception */ public static String acquireMillsTimeWithCurrentDMonthLastDayByCurrentEsIndex(String currentEsIndex) throws Exception{ String[] esIndex = currentEsIndex.split("-"); String lastTime = esIndex[2].trim(); String year = lastTime.substring(0, 4); String month = lastTime.substring(4, 6); Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.MONTH, Integer.parseInt(month)); calendar.set(Calendar.YEAR, Integer.parseInt(year)); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); calendar.set(Calendar.DAY_OF_MONTH, 0); String lastTime_temp = format.format(calendar.getTime()) + " 24:00:00"; SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); format2.parse(lastTime_temp).getTime(); return format2.parse(lastTime_temp).getTime() + ""; } public static void main(String args []) throws Exception{ /** * 功能测试1: * 功能测试说明: * 初始化部分:公共模块 */ Settings settings = Settings.settingsBuilder().put("cluster.name", "").build(); TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("cloudwave1"), 9300)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("cloudwave2"), 9300)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("cloudwave3"), 9300)); /** * 功能测试4: * 测试删除多个索引,并且存在所有不存在的情况 // // ArrayList<String> indexList = new ArrayList<String>(); // indexList.add(index1); // indexList.add(index2); // indexList.add(index3); // indexList.add(index4); // indexList.add(index5); // // for(String index : indexList){ // try { // client.admin().indices().delete(new DeleteIndexRequest(index)).actionGet(); // LOGGER.info("ES索引" + index + "删除结束:"); // } catch (IndexNotFoundException e){ // LOGGER.info("ES索引" + index + "不存在!!!"); // } // } /** * 功能测试5: * 获取ES所有索引 */ // String[] indexList = client.admin().cluster().prepareState().execute().actionGet().getState().getMetaData().concreteAllIndices(); // int parameterNumber = 0; // System.out.println("Index List:"); // for (String index : indexList) { // System.out.println(index); // parameterNumber ++; // } // System.out.print("ES索引总数量" + parameterNumber); /** * 功能测试6: * 截取ES索引,并获取相应的信息(年月日) */ String[] esIndex = "".split("-"); String tableName = esIndex[0].trim(); String lastTime = esIndex[2].trim(); System.out.println("tableName:" + tableName); System.out.println("lastTime:" + lastTime); String year = lastTime.substring(0, 4); String month = lastTime.substring(4, 6); String day = lastTime.substring(6, 8); System.out.println("year:" + year); System.out.println("month:" + month); System.out.println("day:" + day); String yearMonthDay = lastTime.substring(0, 8); System.out.println("解析之后的值:" +tableName + "-" + lastTime + "-" + WeekDayUtil.getNextDate(yearMonthDay,"星期四") ); System.out.println(tableName + "-" + lastTime + "-" + WeekDayUtil.getNextDate(yearMonthDay,"星期四") + "00"); Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.MONTH, Integer.parseInt(month)); calendar.set(Calendar.YEAR, Integer.parseInt(year)); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); // calendar.add(Calendar.MONTH, 1); calendar.set(Calendar.DAY_OF_MONTH, 0); System.out.println("当前月份最后一天时间1:" + format.format(calendar.getTime())); String lastTime_temp = format.format(calendar.getTime()) + " 23:59:59"; System.out.println("当前月份最后一天时间2:" + lastTime_temp); SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); format2.parse(lastTime_temp).getTime(); System.out.println("当前月份最后一天时间3:" + format2.parse(lastTime_temp).getTime()); System.out.println("当前月份最后一天时间4:" + format2.format(format2.parse(lastTime_temp).getTime())); } }