zoukankan      html  css  js  c++  java
  • springboot整合ElasticSearch

    yml

    spring:
      data:
        elasticsearch:
          client:
            reactive:
              endpoints: 192.168.209.160:9200
              connection-timeout: 10000#链接到es的超时时间,毫秒为单位,默认10秒(10000毫秒)
              socket-timeout: 10000#读取和写入的超时时间,单位为毫秒,默认5秒(5000毫秒)
      elasticsearch:
        rest:
          uris: 192.168.209.160:9200
    #     这两个属性在新版本的springboot中已经不建议使用,9300属于elasticsearch各节点之间的通讯接口。
    #     属于lowlevelclient。我们推荐使用9200的RestHighLevelClient去链接
    #     cluster-nodes: 127.0.0.1:9300
    #     cluster-name: helloElasticsearch

    pom 

        <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-captcha</artifactId>
                <version>5.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
            </dependency>

    Controller

    package com.fwz.tproject.testfunction.controller;
    
    import java.util.Date;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.fwz.tproject.testfunction.service.ElasticSearchUtils;
    import com.fwz.tproject.testfunction.service.IdGeneratorSnowflake;
    import com.fwz.tproject.testfunction.service.OrderService;
    
    /**
     * 
     * 
     * @author 冯文哲
     * @version 2018-06-11
     */
    @RestController
    @RequestMapping(value = "/test")
    public class MainController {
      
    
        @Autowired
        private IdGeneratorSnowflake idGenerator;
        @Autowired
        ElasticSearchUtils utilsService;
    
     
    
        @RequestMapping(value = "createIndex")
        public String elasticsearch() {
    
            if (utilsService.createIndex("fwztest_index", 5, 1, "")) {
    
                return "创建成功";
            } else {
                return "创建失败";
            }
        }
    
        @RequestMapping(value = "addDoc")
        public String addDoc() {
            for (int j = 0; j < 1000; j++) {
                Map<String, Object> map = new ConcurrentHashMap<String, Object>();
    
                map.put("author_id", idGenerator.snowflakeId());
                map.put("title", "这有" + j + "个中国人");
                map.put("content", "其中有" + (j - 1) + "个老黑");
                map.put("create_date", new Date());
                utilsService.addDoc("fwztest_index", String.valueOf(idGenerator.snowflakeId()), map);
            }
            return "新增成功";
        }
    
        @RequestMapping(value = "deleteDoc")
        public String deleteDoc(String id) {
            utilsService.deleteDoc("fwztest_index", id);
            return "删除成功";
        }
    
        @RequestMapping(value = "updateDoc")
        public String updateDoc(String id) {
            utilsService.updateDoc("fwztest_index", id, "");
            return "修改成功";
        }
    
        @RequestMapping(value = "selectDoc")
        public Map<String, Object> selectDoc(String id) {
            return utilsService.getDoc("fwztest_index", id);
    
        }
    
    }

    Utils

    package com.fwz.tproject.testfunction.service;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    import org.elasticsearch.action.ActionListener;
    import org.elasticsearch.action.DocWriteRequest;
    import org.elasticsearch.action.DocWriteResponse;
    import org.elasticsearch.action.bulk.BulkItemResponse;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.delete.DeleteRequest;
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.action.get.GetRequest;
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.action.search.SearchRequest;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.action.search.SearchScrollRequest;
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.action.update.UpdateResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.client.indices.CreateIndexRequest;
    import org.elasticsearch.client.indices.CreateIndexResponse;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.common.xcontent.XContentType;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.search.SearchHit;
    import org.elasticsearch.search.SearchHits;
    import org.elasticsearch.search.aggregations.AggregationBuilders;
    import org.elasticsearch.search.aggregations.Aggregations;
    import org.elasticsearch.search.aggregations.bucket.terms.Terms;
    import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
    import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
    import org.elasticsearch.search.aggregations.metrics.Avg;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.stereotype.Service;
    
    @Service
    @EnableAsync
    public class ElasticSearchUtils {
        @Autowired
        private RestHighLevelClient restClient;
        Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class.getName());
    
        /**
         * createIndex
         * 
         * @param indexName //索引名称
         * @param shards    //主分片
         * @param replicas  //备份分片
         * @param mapping   //mapping配置
         * @return
         */
        public boolean createIndex(String indexName, Integer shards, Integer replicas, String mapping) {
            logger.info(restClient.toString());
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            request.settings(Settings.builder().put("number_of_shards", 5).put("number_of_replicas", 1));
    
            request.mapping(
                    "{"properties":{"author_id":{"type":"long"},"title":{"type":"text","analyzer":"standard","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"content":{"type":"text","analyzer":"ik_max_word","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"create_date":{"type":"date"}}}",
                    XContentType.JSON);
            request.setTimeout(TimeValue.timeValueMinutes(1));
            CreateIndexResponse createIndexResponse;
            try {
                createIndexResponse = restClient.indices().create(request, RequestOptions.DEFAULT);
    
                boolean acknowledged = createIndexResponse.isAcknowledged();
    
                logger.info("是否获取ACK:" + acknowledged);
                return acknowledged;
            } catch (IOException e) {
                // TODO Auto-generated catch block
                logger.error(e.toString());
            }
            return false;
        }
    
        /**
         * 
         * addDocument
         * 
         * @param index  索引名称
         * @param id     数据ID(为空则使用es内部ID)
         * @param source 数据(json 或 Map)
         * @return
         * @author fwzz
         * @version 创建时间:2021年1月27日 下午5:10:42
         *
         */
        @Async
        public Future<Boolean> addDoc(String index, String id, Map<String, Object> source) {
    
            // 增, source 里对象创建方式可以是JSON字符串,或者Map,或者XContentBuilder 对象
            IndexRequest indexRequest = new IndexRequest(index).source(source);
            if (id != null && !"".equals(id)) {
    
                indexRequest = indexRequest.id(id);
            }
            try {
                IndexResponse res = restClient.index(indexRequest, RequestOptions.DEFAULT);
                logger.info("新增数据成功,ID为: " + res.getId());
                return new AsyncResult<Boolean>(true);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return new AsyncResult<Boolean>(false);
        }
    
        /**
         * 
         * deleteDocument
         * 
         * @param index 索引名称
         * @param id    数据ID
         * @return
         * @author fwzz
         * @version 创建时间:2021年1月27日 下午5:19:26
         *
         */
        public boolean deleteDoc(String index, String id) {
            //
            DeleteRequest deleteRequest = new DeleteRequest(index, id);
            DeleteResponse res;
            try {
                res = restClient.delete(deleteRequest, RequestOptions.DEFAULT);
                logger.info(res.getResult().toString());
                logger.info("删除数据成功,ID为: " + res.getId());
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return false;
        }
    
        /**
         * 
         * updateDocument
         * 
         * @param index
         * @param id
         * @param source
         * @return
         * @author fwzz
         * @version 创建时间:2021年1月27日 下午5:25:43
         *
         */
        public boolean updateDoc(String index, String id, String source) {
            // 改, source 里对象创建方式可以是JSON字符串,或者Map,或者XContentBuilder 对象
            UpdateRequest updateRequest = new UpdateRequest(index, id).doc(source);
            try {
                UpdateResponse res = restClient.update(updateRequest, RequestOptions.DEFAULT);
                logger.info("修改数据成功,ID为: " + res.getId());
                return true;
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return false;
        }
    
        /**
         * selectDocument
         * 
         * @param index
         * @param id
         * @return
         * @author fwzz
         * @version 创建时间:2021年1月27日 下午5:27:33
         *
         */
        public Map<String, Object> getDoc(String index, String id) {
            //
            GetRequest getRequest = new GetRequest(index, id);
            try {
                GetResponse res = restClient.get(getRequest, RequestOptions.DEFAULT);
                logger.info("查询数据成功,ID为: " + res.getId());
                logger.info("查询数据成功,字符串数据为: " + res.getSourceAsString());
                Map<String, Object> map = res.getSourceAsMap();
    
                logger.info("查询数据成功,Map数据为: " + map.toString());
                return map;
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return null;
        }
    
        /**
         * bulkDemo
         * 
         * @param index
         * @param id
         * @return
         * @author fwzz
         * @version 创建时间:2021年1月27日 下午7:35:34
         *
         */
        public Boolean bulkRequest(String index, String id) {
            BulkRequest request = new BulkRequest();
            /**
             * map为更新或新增的数据
             */
            request.add(new IndexRequest(index).source(XContentType.JSON, new HashMap<String, Object>()));
            request.add(new DeleteRequest(index, id));
            request.add(new UpdateRequest(index, id).doc(XContentType.JSON, new HashMap<String, Object>()));
    
            BulkResponse bulkResponse;
            try {
                bulkResponse = restClient.bulk(request, RequestOptions.DEFAULT);
                for (BulkItemResponse bulkItemResponse : bulkResponse) {
                    if (bulkItemResponse.isFailed()) {
                        BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                        logger.info(failure.getMessage());
                        continue;
                    }
                    DocWriteResponse itemResponse = bulkItemResponse.getResponse();
                    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        logger.info(indexResponse.getResult().toString());
                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                        logger.info(updateResponse.getResult().toString());
                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        logger.info(deleteResponse.getResult().toString());
                    }
                }
                return true;
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return false;
    
        }
    
        /**
         * searchQueryDemo 可完全取代getRequest
         * 
         * @return
         * @author fwzz
         * @version 创建时间:2021年1月27日 下午7:43:57
         *
         */
        public Boolean searchQuery() {
            /**
             * 指定index
             */
            SearchRequest searchRequest = new SearchRequest("gdp_tops*");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            /**
             * 指定query
             */
            sourceBuilder.query(QueryBuilders.termQuery("city", "北京市"));
            sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
    
            searchRequest.source(sourceBuilder);
            try {
                SearchResponse response = restClient.search(searchRequest, RequestOptions.DEFAULT);
                Arrays.stream(response.getHits().getHits()).forEach(i -> {
                    System.out.println(i.getIndex());
                    System.out.println(i.getSourceAsMap());
    
                });
                logger.info(response.getHits().getTotalHits().toString());
                return true;
            } catch (IOException e) {
                e.printStackTrace();
            }
            return false;
        }
    
        /**
         * aggsSearchDemo
         * 
         * @return
         * @author fwzz
         * @version 创建时间:2021年1月27日 下午7:46:31
         *
         */
        public Boolean aggsQuery() {
            /**
             * 指定index
             */
            SearchRequest searchRequest = new SearchRequest("gdp_tops*");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company").field("company.keyword");
            aggregation.subAggregation(AggregationBuilders.avg("average_age").field("age"));
            searchSourceBuilder.aggregation(aggregation);
            searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
            /**
             * 分页查询
             */
            /*
             * searchSourceBuilder.from(0); searchSourceBuilder.size(5);
             */
            searchRequest.source(searchSourceBuilder);
            try {
                /**
                 * 处理方法1 (1 2 都尝试一下)
                 */
                SearchResponse response = restClient.search(searchRequest, RequestOptions.DEFAULT);
                Arrays.stream(response.getHits().getHits()).forEach(i -> {
                    logger.info(i.getIndex());
                    logger.info(i.getSourceAsMap().toString());
    
                });
                /**
                 * 处理方法2 (1 2 都尝试一下)
                 */
                Aggregations aggregations = response.getAggregations();
                Terms byCompanyAggregation = aggregations.get("by_company");
                Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic");
                Avg averageAge = elasticBucket.getAggregations().get("average_age");
                double avg = averageAge.getValue();
    
                logger.info(response.getHits().getTotalHits().toString());
                return true;
            } catch (IOException e) {
                e.printStackTrace();
            }
            return false;
        }
    
        /**
         * searchAsyncDemo
         * 
         * @return
         * @author fwzz
         * @version 创建时间:2021年1月27日 下午7:50:00
         *
         */
        public Boolean searchAsync() {
    
            /**
             * 指定index
             */
            SearchRequest searchRequest = new SearchRequest("gdp_tops*");
            restClient.searchAsync(searchRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
                @Override
                public void onResponse(SearchResponse searchResponse) {
                    SearchHit[] searchHits = searchResponse.getHits().getHits();
                    for (SearchHit hit : searchHits) {
                        // 结果的Index
                        String index = hit.getIndex();
                        // 结果的ID
                        String id = hit.getId();
                        // 结果的评分
                        float score = hit.getScore();
                        // 查询的结果 JSON字符串形式
                        String sourceAsString = hit.getSourceAsString();
                        // 查询的结果 Map的形式
                        Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                        // Document的title
                        String documentTitle = (String) sourceAsMap.get("title");
                        // 结果中的某个List
                        List<Object> users = (List<Object>) sourceAsMap.get("user");
                        // 结果中的某个Map
                        Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
                    }
                }
    
                @Override
                public void onFailure(Exception e) {
                    logger.error(e.toString());
                }
            });
            return true;
        }
    
        /**
         * 有时候需要查询的数据太多,可以考虑使用SearchRequest.scroll()方法拿到scrollId;之后再使用SearchScrollRequest
         * 其用法如下:
         * 
         * @return
         * @author fwzz
         * @version 创建时间:2021年1月27日 下午8:00:14
         *
         */
        public Boolean searchScroll() {
    
            SearchRequest searchRequest = new SearchRequest("posts");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    
            searchSourceBuilder.query(QueryBuilders.termQuery("city", "北京市"));
            searchSourceBuilder.size(5);
            searchRequest.source(searchSourceBuilder);
            searchRequest.scroll(TimeValue.timeValueMinutes(1L));
            SearchResponse searchResponse;
            try {
                searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
                String scrollId = searchResponse.getScrollId();
                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                scrollRequest.scroll(TimeValue.timeValueSeconds(30));
                SearchResponse searchScrollResponse = restClient.scroll(scrollRequest, RequestOptions.DEFAULT);
                scrollId = searchScrollResponse.getScrollId();
                SearchHits hits = searchScrollResponse.getHits();
                logger.info(hits.getTotalHits().toString());
                return true;
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
            return false;
        }
    
    }

    全局ID生成工具类

    package com.fwz.tproject.testfunction.service;
    
    import javax.annotation.PostConstruct;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import cn.hutool.core.lang.Snowflake;
    import cn.hutool.core.net.NetUtil;
    import cn.hutool.core.util.IdUtil;
    
    @Component
    public class IdGeneratorSnowflake {
        private long workerId = 0;
        private long datacenterId = 1;
        private Snowflake snowflake = IdUtil.createSnowflake(workerId, datacenterId);
    
        private static final Logger log = LoggerFactory.getLogger(IdGeneratorSnowflake.class.getName());
    
        // 依赖注入完成后执行该方法,进行一些初始化工作
        @PostConstruct
        public void init() {
            try {
                workerId = NetUtil.ipv4ToLong(NetUtil.getLocalhostStr());
                log.info("当前机器的workerId: {}", workerId);
            } catch (Exception e) {
                e.printStackTrace();
                log.warn("当前机器的workerId获取失败", e);
                // 释放ID
                workerId = NetUtil.getLocalhostStr().hashCode();
            }
        }
    
        // 使用默认机房号获取ID
        public synchronized long snowflakeId() {
            return snowflake.nextId();
        }
    
        // 自己制定机房号获取ID
        public synchronized long snowflakeId(long workerId, long datacenterId) {
            Snowflake snowflake = IdUtil.createSnowflake(workerId, datacenterId);
    
            return snowflake.nextId();
        }
    
        /**
         * 生成的是不带-的字符审,类似于: 73a64edf935d4952a287739a66f96e06
         * 
         * @return
         */
        public String simpleUUID() {
            return IdUtil.simpleUUID();
        }
    
        /**
         * 生成的UUID是带-的字符串,类似于: b12b6401-6f9c-4351-b2b6-d8afc9ab9272
         * 
         * @return
         */
        public String randomUUID() {
            return IdUtil.randomUUID();
        }
    
        public static void main(String[] args) {
            IdGeneratorSnowflake f = new IdGeneratorSnowflake();
            for (int i = 0; i < 1000; i++) {
    
                System.out.println(f.snowflakeId(0, 0));
            }
    
        }
    }
  • 相关阅读:
    HDU1496(巧妙hash)
    SPOJ(后缀数组求不同子串个数)
    django-admin自定义登录
    Bootstrap实现的页面
    ImageMagick来处理图片,缩放,调整高度等操作
    xlrd,xlwt操作Excel实例
    匹配图片修改图片名称
    python 的两个模块xlwt,xlrd,写入和读取Excel数据
    Excel常见操作,重复数据,去除数据关联
    Excel数据常用操作,vlookup,text,trim,数据格式导致出错
  • 原文地址:https://www.cnblogs.com/fengwenzhee/p/14336734.html
Copyright © 2011-2022 走看看