zoukankan      html  css  js  c++  java
  • elasticsearch持有者类

    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.commons.lang3.StringUtils;
    import org.elasticsearch.action.bulk.BulkItemResponse;
    import org.elasticsearch.action.bulk.BulkRequestBuilder;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.delete.DeleteRequest;
    import org.elasticsearch.action.delete.DeleteRequestBuilder;
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.search.SearchRequestBuilder;
    import org.elasticsearch.client.Client;
    import org.elasticsearch.client.Requests;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.InetSocketTransportAddress;
    import org.elasticsearch.common.transport.TransportAddress;
    import org.elasticsearch.index.query.QueryBuilder;
    import org.elasticsearch.search.SearchHits;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    import org.elasticsearch.search.sort.SortOrder;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.Closeable;
    import java.io.IOException;
    import java.io.Serializable;
    import java.net.InetAddress;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    
    /**
     * <p></p>
     *
     * @author
     * @version V1.0
     * @modificationHistory=========================逻辑或功能性重大变更记录
     * @modify by user: $author$ $date$
     * @modify by reason: {方法名}:{原因}
     */
    public class ESHolder implements Serializable,Closeable{
        private static final Logger LOG = LoggerFactory.getLogger(ESHolder.class);
    
        private String esClusterName = null;
        private String esClusterAddress = null;
        // ES客户端
        private Client ESClient = null;
    
        public ESHolder(String esClusterName, String esClusterAddress) {
            this.esClusterName = esClusterName;
            this.esClusterAddress = esClusterAddress;
        }
    
    
        public Client getESClient() {
            if (ESClient == null) {
                initESClient(esClusterName, esClusterAddress);
            }
            return ESClient;
        }
    
        /**
         * 批量建立ES索引
         *
         * @param list
         * @return
         * @author
         */
        public boolean addIndex(String indexName, String typeName, List<Map<String, Object>> list) {
            long t = System.currentTimeMillis();
            try {
                ObjectMapper mapper = new ObjectMapper();
                BulkRequestBuilder bulkRequest = getESClient().prepareBulk();
                for(Map<String, Object> data : list){
                    byte[] json = mapper.writeValueAsBytes(data);
                    bulkRequest.add(new IndexRequest(indexName, typeName).source(json));
                }
    
                BulkResponse response = bulkRequest.execute().actionGet();
                if(response.hasFailures()){
                    BulkItemResponse[] itemResponses = response.getItems();
                    for(BulkItemResponse itemResponse : itemResponses){
                        // TODO Must do something to handle failures.
                        LOG.error("Add ES Index failed! DOC_ID: {}, Reason: {}", itemResponse.getId(), itemResponse.getFailureMessage());
                    }
                }
            } catch (JsonProcessingException e) {
                LOG.error("Build index fail.", e);
                return false;
            }
            LOG.debug("build index complete,num:{}, cost:{}", list.size(), System.currentTimeMillis() - t);
            return true;
        }
    
        /**
         * 批量删除ES索引
         *
         * @param docIds
         *
         *
         */
        public void deleteIndex(String indexName, String typeName, List<String> docIds){
            BulkRequestBuilder bulkRequest = getESClient().prepareBulk();
            for(String docId : docIds){
                bulkRequest.add(new DeleteRequest(indexName, typeName, docId));
            }
            BulkResponse response = bulkRequest.execute().actionGet();
            if(response.hasFailures()){
                BulkItemResponse[] itemResponses = response.getItems();
                for(BulkItemResponse itemResponse : itemResponses){
                    // TODO Must do something to handle failures.
                    LOG.error("ES Index delete failed! DOC_ID: {}, Reason: {}", itemResponse.getId(), itemResponse.getFailureMessage());
                }
            }
        }
    
        /**
         * 删除ES索引
         *
         * @param indexName
         * @param typeName
         * @param data
         * @return
         */
        public boolean deleteIndex(String indexName, String typeName, Map<String, Object> data){
            DeleteRequestBuilder requestBuilder = getESClient().prepareDelete(indexName, typeName,
                    (String) data.get(“rowkey”));
            DeleteResponse response = requestBuilder.execute().actionGet();
            if(!response.isFound()){
                LOG.error("ES Index not found! DOC_ID: {}", response.getId());
                return false;
            }
            return true;
        }
    
        /**
         * 从ES查询数据
         *
         * @param query
         * @return
         *
         */
        public SearchHits queryWithES(SearchRequestBuilder query){
            SearchHits response = query.execute().actionGet().getHits();
            return response;
        }
    
        /**
         * 构造查询对象
         *
         * @param index
         * @param type
         * @param queryBuilder
         * @param retField
         * @param sortField
         * @param start
         * @param rows
         * @return
         */
        public SearchRequestBuilder buildSearch(String index, String type, QueryBuilder queryBuilder, String retField, String sortField, SortOrder sortOrder, int start, int rows){
    
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(queryBuilder).from(start).size(rows);
    
            if(StringUtils.isNotEmpty(retField)){
                searchSourceBuilder.field(retField);
            }
    
            if(StringUtils.isNotEmpty(sortField)){
                searchSourceBuilder.sort(sortField, sortOrder);
            }
    
            LOG.debug("ES Query string: " + searchSourceBuilder.toString());
    
            return getESClient().prepareSearch().setIndices(index).setTypes(type)
                    .setExtraSource(searchSourceBuilder.buildAsBytes(Requests.CONTENT_TYPE));
        }
    
        /**
         * 统计数据量
         *
         * @return 符合条件的数据量
         */
        public long countWithQuery(String indexName, String typeName, QueryBuilder queryBuilder){
            SearchRequestBuilder builder = getESClient().prepareSearch(indexName).setTypes(typeName)
                    .setQuery(queryBuilder).setFrom(0).setSize(0);
            return countWithQuery(builder);
        }
    
        /**
         * 统计数据量
         *
         * @param query
         * @return
         *
         */
        public long countWithQuery(SearchRequestBuilder query){
            return query.execute().actionGet().getHits().getTotalHits();
        }
    
        /**
         * 初始化ES客户端
         *
         * @return
         */
        private void initESClient(String esClusterName, String esClusterAddress) {
            int esClientTimeout = 180000;
            LOG.info("init ES Client...");
            try {
                String[] hostPair = esClusterAddress.split(“,”);
                TransportAddress[] addrs = new TransportAddress[hostPair.length];
    
                int i = 0;
                String[] keyValuePair;
                for (String t : hostPair) {
                    keyValuePair = t.split(":");
                    if (2 != keyValuePair.length) {
                        throw new IOException("ES's host is not correct:" + Arrays.toString(keyValuePair));
                    }
                    addrs[i] = new InetSocketTransportAddress(InetAddress.getByName(keyValuePair[0]), Integer.valueOf(keyValuePair[1]));
                    i++;
                }
    
                Settings settings = Settings.settingsBuilder()
                        .put("cluster.name", esClusterName)
                        .put("client.transport.sniff", true)
                        .put("client.transport.ping_timeout", esClientTimeout + "s").build();
    
                ESClient = TransportClient.builder().settings(settings).build().addTransportAddresses(addrs);
            } catch (Exception e) {
                LOG.error("Address error!", e);
            }
        }
    
    
        @Override
        public void close() throws IOException {
            if(this.ESClient != null){
                LOG.info("closing esclient....");
                this.ESClient.close();
                this.ESClient = null;
            }
        }
    }
  • 相关阅读:
    测试网络
    测试
    Centos6.6中VIM的编辑、退出与保存
    IP
    2018.12.24 课程更新内容到第五章 渗透测试 第4、5节
    利用GRC进行安全研究和审计 – 将无线电信号转换为数据包(转)
    我的翻译--一个针对TP-Link调试协议(TDDP)漏洞挖掘的故事
    我的翻译--针对Outernet卫星信号的逆向工程
    Sulley安装手记
    乘用车黑客手册(译)
  • 原文地址:https://www.cnblogs.com/jinniezheng/p/6383929.html
Copyright © 2011-2022 走看看