zoukankan      html  css  js  c++  java
  • es通用工具类ElasticSearchUtil

    最近也是搭了一套elk+rabbitmq的日志系统,搬过来大哥编写的工具类

    public class ElasticSearchUtil {
        private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class);
        /**
         * 创建索引
         *
         * @param index
         * @return
         */
        public static boolean createIndex(String index) {
            if (!isIndexExist(index)) {
                logger.info("Index is not exits!");
            }
            CreateIndexResponse indexresponse = null;
            try {
                indexresponse = EsClient.getTransportClient().admin().indices().prepareCreate(index).execute()
                        .actionGet();
            } catch (Exception e) {
                e.printStackTrace();
            }
            logger.info("执行建立成功?" + indexresponse.isAcknowledged());
    
            return indexresponse.isAcknowledged();
        }
    
        /**
         * 删除索引
         *
         * @param index
         * @return
         */
        public static boolean deleteIndex(String index) {
            if (!isIndexExist(index)) {
                logger.info("Index is not exits!");
            }
            DeleteIndexResponse dResponse = EsClient.getTransportClient().admin().indices().prepareDelete(index).execute().actionGet();
            if (dResponse.isAcknowledged()) {
                logger.info("delete index " + index + "  successfully!");
            } else {
                logger.info("Fail to delete index " + index);
            }
            return dResponse.isAcknowledged();
        }
    
        /**
         * 判断索引是否存在
         *
         * @param index
         * @return
         */
        public static boolean isIndexExist(String index) {
            IndicesExistsResponse inExistsResponse = EsClient.getTransportClient().admin().indices().exists(new IndicesExistsRequest
                    (index)).actionGet();
            if (inExistsResponse.isExists()) {
                logger.info("Index [" + index + "] is exist!");
            } else {
                logger.info("Index [" + index + "] is not exist!");
            }
            return inExistsResponse.isExists();
        }
    
        /**
         * 数据添加,正定ID
         *
         * @param jsonObject 要增加的数据
         * @param index      索引,类似数据库
         * @param type       类型,类似表
         * @param id         数据ID
         * @return
         */
        public static String addData(JSONObject jsonObject, String index, String type, String id) {
    
            IndexResponse response = EsClient.getTransportClient().prepareIndex(index, type, id).setSource(jsonObject).get();
    
            logger.info("addData response status:{},id:{}", response.status().getStatus(), response.getId());
    
            return response.getId();
        }
    
        /**
         * 数据添加
         *
         * @param jsonObject 要增加的数据
         * @param index      索引,类似数据库
         * @param type       类型,类似表
         * @return
         */
        public static String addData(JSONObject jsonObject, String index, String type) {
            return addData(jsonObject, index, type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
        }
    
        /**
         * 通过ID删除数据
         *
         * @param index 索引,类似数据库
         * @param type  类型,类似表
         * @param id    数据ID
         */
        public static void deleteDataById(String index, String type, String id) {
    
            DeleteResponse response = EsClient.getTransportClient().prepareDelete(index, type, id).execute().actionGet();
    
            logger.info("deleteDataById response status:{},id:{}", response.status().getStatus(), response.getId());
        }
    
        /**
         * 通过ID 更新数据
         *
         * @param jsonObject 要增加的数据
         * @param index      索引,类似数据库
         * @param type       类型,类似表
         * @param id         数据ID
         * @return
         */
        public static void updateDataById(JSONObject jsonObject, String index, String type, String id) {
    
            UpdateRequest updateRequest = new UpdateRequest();
    
            updateRequest.index(index).type(type).id(id).doc(jsonObject);
    
            EsClient.getTransportClient().update(updateRequest);
    
        }
    
        /**
         * 通过ID获取数据
         *
         * @param index  索引,类似数据库
         * @param type   类型,类似表
         * @param id     数据ID
         * @param fields 需要显示的字段,逗号分隔(缺省为全部字段)
         * @return
         */
        public static Map<String, Object> searchDataById(String index, String type, String id, String fields) {
    
            GetRequestBuilder getRequestBuilder = EsClient.getTransportClient().prepareGet(index, type, id);
    
            if (StringUtils.isNotEmpty(fields)) {
                getRequestBuilder.setFetchSource(fields.split(","), null);
            }
    
            GetResponse getResponse = getRequestBuilder.execute().actionGet();
    
            return getResponse.getSource();
        }
    
        /**
         * 使用分词查询不分页
         *
         * @param index          索引名称
         * @param type           类型名称,可传入多个type逗号分隔
         * @param startTime      开始时间
         * @param endTime        结束时间
         * @param size           文档大小限制
         * @param fields         需要显示的字段,逗号分隔(缺省为全部字段)
         * @param sortField      排序字段
         * @param highlightField 高亮字段
         * @param operatorTag    查询条件最外层的关系(true与,false或)
         * @param parmStr        过滤条件
         * @return
         */
        public static List<Map<String, Object>> searchListData(String index, String type, Integer size,long startTime,
                long endTime, String fields, String logType, String sortField, String highlightField, boolean
                operatorTag, String... parmStr) {
            SearchRequestBuilder searchRequestBuilder = EsClient.getTransportClient().prepareSearch(index);
            if (StringUtils.isNotEmpty(type)) {
                searchRequestBuilder.setTypes(type.split(","));
            }
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
            if (startTime > 0 && endTime > 0) {
                boolQuery.must(QueryBuilders.rangeQuery("createDate").format("epoch_millis").from(startTime).to(endTime)
                        .includeLower(true).includeUpper(false));
            }
            //设置查询日志类型
            boolQuery.must(QueryBuilders.matchQuery("logType", logType));
            // 查询字段 与关系
            for (int i = 0, len = parmStr.length; i < len; i++) {
                BoolQueryBuilder tempBoolQuery = QueryBuilders.boolQuery();
                if (parmStr[i].contains("&&")){
                    String[] tempStr =parmStr[i].split("&&");
                    for(int j=0,len1=tempStr.length;j<len1;j++)
                    {
                        String[] ss = tempStr[j].split("=");
                        //分词查询
                        tempBoolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                    }
                    if (operatorTag) {
                        //如果为真则最外层是与关系
                        boolQuery.must(tempBoolQuery);
                    } else {
                        boolQuery.should(tempBoolQuery);
                    }
                } else {
                    if (parmStr[i].contains(",,")) {
                        String[] tempStr =parmStr[i].split(",,");
                        for(int j=0,len1=tempStr.length;j<len1;j++)
                        {
                            String[] ss = tempStr[j].split("=");
                            //分词查询
                            tempBoolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                        }
                        if (operatorTag) {
                            //如果为真则最外层是与关系
                            boolQuery.must(tempBoolQuery);
                        } else {
                            boolQuery.should(tempBoolQuery);
                        }
                    } else {
                        if(ToolUtil.isNotEmpty(parmStr[i])) {
                            String[] ss = parmStr[i].split("=");
                            if (operatorTag) {
                                //如果为真则最外层是与关系
                                boolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                            } else {
                                boolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                            }
                        }
                    }
                }
            }
            if (StringUtils.isNotEmpty(highlightField)) {
                HighlightBuilder highlightBuilder = new HighlightBuilder();
                // 设置高亮字段
                highlightBuilder.field(highlightField);
                searchRequestBuilder.highlighter(highlightBuilder);
            }
            searchRequestBuilder.setQuery(boolQuery);
            if (StringUtils.isNotEmpty(fields)) {
                searchRequestBuilder.setFetchSource(fields.split(","), null);
            }
            searchRequestBuilder.setFetchSource(true);
    
            if (StringUtils.isNotEmpty(sortField)) {
                searchRequestBuilder.addSort(sortField, SortOrder.DESC);
            }
            if (size != null && size > 0) {
                searchRequestBuilder.setSize(size);
            }
            //打印的内容 可以在 Elasticsearch head 和 Kibana  上执行查询
            logger.info("
    {}", searchRequestBuilder);
            SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
    
            long totalHits = searchResponse.getHits().totalHits;
            long length = searchResponse.getHits().getHits().length;
            logger.info("共查询到[{}]条数据,处理数据条数[{}]", totalHits, length);
            if (searchResponse.status().getStatus() == 200) {
                // 解析对象
                return setSearchResponse(searchResponse, highlightField);
            }
            return null;
        }
    
        /**
         * 使用分词查询,并分页
         *
         * @param index          索引名称
         * @param type           类型名称,可传入多个type逗号分隔
         * @param currentPage    当前页
         * @param pageSize       每页显示条数
         * @param startTime      开始时间
         * @param endTime        结束时间
         * @param fields         需要显示的字段,逗号分隔(缺省为全部字段)
         * @param sortField      排序字段
         * @param highlightField 高亮字段
         * @param operatorTag    外层逻辑与true 或false
         * @param parmStr        内层逻辑与&& 或||
         * @return
         */
        public static EsPage searchDataPage(String index, String type, int currentPage, int pageSize, long startTime,
                                            long endTime, String fields, String logType, String sortField, String highlightField, boolean
                operatorTag, String... parmStr) {
            SearchRequestBuilder searchRequestBuilder = EsClient.getTransportClient().prepareSearch(index);
            if (StringUtils.isNotEmpty(type)) {
                searchRequestBuilder.setTypes(type.split(","));
            }
            searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
            // 需要显示的字段,逗号分隔(缺省为全部字段)
            if (StringUtils.isNotEmpty(fields)) {
                searchRequestBuilder.setFetchSource(fields.split(","), null);
            }
            //排序字段
            if (StringUtils.isNotEmpty(sortField)) {
                searchRequestBuilder.addSort(sortField, SortOrder.DESC);
            }
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
            if (startTime > 0 && endTime > 0) {
                boolQuery.must(QueryBuilders.rangeQuery("createDate").format("epoch_millis").from(startTime).to(endTime)
                        .includeLower(true).includeUpper(true));
            }
            //设置查询日志类型
            boolQuery.must(QueryBuilders.matchQuery("logType", logType));
            // 查询字段 与关系
            for (int i = 0, len = parmStr.length; i < len; i++) {
                BoolQueryBuilder tempBoolQuery = QueryBuilders.boolQuery();
                if (parmStr[i].contains("&&")) {
                    String[] tempStr =parmStr[i].split("&&");
                    for(int j=0,len1=tempStr.length;j<len1;j++)
                    {
                        String[] ss = tempStr[j].split("=");
                        //分词查询
                        tempBoolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                    }
                    if (operatorTag) {
                        //如果为真则最外层是与关系
                        boolQuery.must(tempBoolQuery);
                    } else {
                        boolQuery.should(tempBoolQuery);
                    }
                } else {
                    if (parmStr[i].contains(",,")) {
                        String[] tempStr =parmStr[i].split(",,");
                        for(int j=0,len1=tempStr.length;j<len1;j++)
                        {
                            String[] ss = tempStr[j].split("=");
                            //分词查询
                            tempBoolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                        }
                        if (operatorTag) {
                            //如果为真则最外层是与关系
                            boolQuery.must(tempBoolQuery);
                        } else {
                            boolQuery.should(tempBoolQuery);
                        }
                    } else {
                        if(ToolUtil.isNotEmpty(parmStr[i])) {
                            String[] ss = parmStr[i].split("=");
                            if (operatorTag) {
                                //如果为真则最外层是与关系
                                boolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                            } else {
                                boolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                            }
                        }
                    }
                }
            }
            if (StringUtils.isNotEmpty(highlightField)) {
                HighlightBuilder highlightBuilder = new HighlightBuilder();
                //highlightBuilder.preTags("<span style='color:red' >");//设置前缀
                //highlightBuilder.postTags("</span>");//设置后缀
                // 设置高亮字段
                highlightBuilder.field(highlightField);
                searchRequestBuilder.highlighter(highlightBuilder);
            }
            searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
            searchRequestBuilder.setQuery(boolQuery);
            // 分页应用
            int num = (currentPage - 1) * pageSize;
            searchRequestBuilder.setFrom(num).setSize(pageSize);
            // 设置是否按查询匹配度排序
            searchRequestBuilder.setExplain(true);
            //打印的内容 可以在 Elasticsearch head 和 Kibana  上执行查询
            logger.info("
    {}", searchRequestBuilder);
            // 执行搜索,返回搜索响应信息
            SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
            long totalHits = searchResponse.getHits().totalHits;
            long length = searchResponse.getHits().getHits().length;
            logger.debug("共查询到[{}]条数据,处理数据条数[{}]", totalHits, length);
            if (searchResponse.status().getStatus() == 200) {
                // 解析对象
                List<Map<String, Object>> sourceList = setSearchResponse(searchResponse, highlightField);
    
                return new EsPage(currentPage, pageSize, (int) totalHits, sourceList);
            }
            return null;
        }
    
        /**
         * 高亮结果集 特殊处理
         *
         * @param searchResponse
         * @param highlightField
         */
        public static List<Map<String, Object>> setSearchResponse(SearchResponse searchResponse, String highlightField) {
            List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>();
            StringBuffer stringBuffer = new StringBuffer();
    
            for (SearchHit searchHit : searchResponse.getHits().getHits()) {
                searchHit.getSourceAsMap().put("_id", searchHit.getId());
    
                if (StringUtils.isNotEmpty(highlightField)) {
    
                    System.out.println("遍历 高亮结果集,覆盖 正常结果集" + searchHit.getSourceAsMap());
                    Text[] text = searchHit.getHighlightFields().get(highlightField).getFragments();
    
                    if (text != null) {
                        for (Text str : text) {
                            stringBuffer.append(str.string());
                        }
                        //遍历 高亮结果集,覆盖 正常结果集
                        searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString());
                    }
                }
                sourceList.add(searchHit.getSourceAsMap());
            }
    
            return sourceList;
        }
        /**
         * 查询所有数据
         *
         * @param index 索引名称
         * @param type type 6.0后不推荐使用
         * @param fields 需要显示的字段
         * @param sortField 需要进行排序的字段
         * @param highlightField 需要高亮的字段
         * @param queryBuilder 查询条件
         * @return
         */
        public static List<Map<String, Object>> searchAllData(String index, String type, String fields, String sortField, String highlightField, QueryBuilder queryBuilder ) {
            //指定一个index和type
            EsClient esClient=new EsClient();
            SearchRequestBuilder searchRequestBuilder = esClient.getESClient().prepareSearch(index);
            // 高亮(xxx=111,aaa=222)
            if (StringUtils.isNotEmpty(highlightField)) {
                HighlightBuilder highlightBuilder = new HighlightBuilder();
                //设置前缀
                highlightBuilder.preTags("<span style='color:red;font-weight:bold'>");
                //设置后缀
                highlightBuilder.postTags("</span>");
                // 设置高亮字段
                highlightBuilder.field(highlightField);
                searchRequestBuilder.highlighter(highlightBuilder);
            }
            // 需要显示的字段,逗号分隔(缺省为全部字段)
            if (StringUtils.isNotEmpty(fields)) {
                searchRequestBuilder.setFetchSource(fields.split(","), null);
            }
            searchRequestBuilder.setFetchSource(true);
            if (StringUtils.isNotEmpty(sortField)) {
                searchRequestBuilder.addSort(sortField, SortOrder.ASC);
            }
            //设置每批读取的数据量
            searchRequestBuilder.setSize(100);
            //查询条件
            searchRequestBuilder.setQuery(queryBuilder);
            //设置 search context 维护1分钟的有效期
            searchRequestBuilder.setScroll(TimeValue.timeValueMinutes(1));
            //获得首次的查询结果
            SearchResponse scrollResp=searchRequestBuilder.get();
            //打印的内容 可以在 Elasticsearch head 和 Kibana  上执行查询
            logger.info("
    {}", searchRequestBuilder);
            //打印命中数量
            logger.info("命中总数量:{}", scrollResp.getHits().getTotalHits());
            List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>();
            StringBuffer stringBuffer = new StringBuffer();
            do {
                //将scorllId循环传递
                scrollResp = EsClient.getTransportClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).execute().actionGet();
                for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                    searchHit.getSourceAsMap().put("id", searchHit.getId());
                    if (StringUtils.isNotEmpty(highlightField)) {
                        if (!ToolUtil.isEmpty(searchHit.getHighlightFields().get(highlightField))) {
                            Text[] text = searchHit.getHighlightFields().get(highlightField).getFragments();
                            if (text != null) {
                                for (Text str : text) {
                                    stringBuffer.append(str.string());
                                }
                                //遍历 高亮结果集,覆盖 正常结果集
                                searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString());
                            }
                        }
                    }
                    sourceList.add(searchHit.getSourceAsMap());
                }
                //当searchHits的数组为空的时候结束循环,至此数据全部读取完毕
            } while(scrollResp.getHits().getHits().length != 0);
            //删除scroll
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.addScrollId(scrollResp.getScrollId());
            EsClient.getTransportClient().clearScroll(clearScrollRequest).actionGet();
            return sourceList;
        }
        /**
         * 批量新增数据
         *
         * @param index    索引名称
         * @param type     索引类型
         * @param dataList 需要新增的数据
         */
        public static void insertBatch(String index, String type, List<Map<String, Object>> dataList,int batchSize) {
            BulkRequestBuilder bulkRequest = EsClient.getTransportClient().prepareBulk();
            for (int i = 0; i < dataList.size(); i++) {
                bulkRequest.add(EsClient.getTransportClient().prepareIndex(index, type, Convert.toStr(dataList.get(i).get("id"))).setSource(dataList.get(i)));
                // 每5000条提交一次
                if ((i + 1) % batchSize == 0) {
                    BulkResponse bulkItemResponses = bulkRequest.execute().actionGet();
                    bulkRequest = EsClient.getTransportClient().prepareBulk();
                    logger.info("已保存: {} 条,执行时间:{} ", batchSize, bulkItemResponses.getTook());
                }
            }
            if(dataList.size()%batchSize !=0){
                BulkResponse bulkItemResponses = bulkRequest.execute().actionGet();
                logger.info("保存: {}条,执行时间:{} ", dataList.size() % batchSize, bulkItemResponses.getTook());
            }
        }
    }

    有需要的自己直接搬走,不谢

  • 相关阅读:
    Sending post
    <<the not so short introduction to Latex2e >> 读书笔记
    Latex 书签中文乱码解决方案
    VisualSVN迁移到其他服务器 子曰
    C#遍历DataSet中数据的几种方法总结 子曰
    Extjs formpanel加载数据的两种方式 子曰
    向老销售取经,学来的一点软件销售技巧 子曰
    extjs 实现 NumberField 即时计算 子曰
    Ext.form.DateField简单用法及日期范围控制 子曰
    解决.aspx中插入浮动广告不滚动问题 子曰
  • 原文地址:https://www.cnblogs.com/innocenter/p/13262311.html
Copyright © 2011-2022 走看看