zoukankan      html  css  js  c++  java
  • SpringBoot整合Elasticsearch

    SpringBoot2.x整合Elasticsearch(下面简称es)教程

    1、老规矩,先在pom.xml中添加es的依赖

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    </dependency>
    

    2、在application.yml中添加es的配置

    #elasticsearch配置
    elasticsearch:
      rest:
        #es节点地址,集群则用逗号隔开
        uris: 10.24.56.154:9200
    

    3、添加es的工具类ElasticSearchUtils.java,工具类中我只添加了常用的一些方法,大家可以根据需要自行完善

    package com.example.study.util;
    
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.http.HttpHost;
    import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
    import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
    import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.delete.DeleteRequest;
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.action.get.GetRequest;
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.action.search.SearchRequest;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.action.support.master.AcknowledgedResponse;
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.action.update.UpdateResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.client.indices.GetIndexRequest;
    import org.elasticsearch.common.Strings;
    import org.elasticsearch.common.text.Text;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.common.xcontent.XContentType;
    import org.elasticsearch.search.SearchHit;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
    import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
    import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.UUID;
    
    /**
     * ElasticSearch工具类
     *
     * @author 154594742@qq.com
     * @date 2021/3/4 19:34
     */
    
    @Slf4j
    @Component
    public class ElasticSearchUtils {
    
        @Value("${spring.elasticsearch.rest.uris}")
        private String uris;
    
        private RestHighLevelClient restHighLevelClient;
    
        /**
         * 在Servlet容器初始化前执行
         */
        @PostConstruct
        private void init() {
            try {
                if (restHighLevelClient != null) {
                    restHighLevelClient.close();
                }
                if (StringUtils.isBlank(uris)) {
                    log.error("spring.elasticsearch.rest.uris is blank");
                    return;
                }
    
                //解析yml中的配置转化为HttpHost数组
                String[] uriArr = uris.split(",");
                HttpHost[] httpHostArr = new HttpHost[uriArr.length];
                int i = 0;
                for (String uri : uriArr) {
                    if (StringUtils.isEmpty(uris)) {
                        continue;
                    }
    
                    try {
                        //拆分出ip和端口号
                        String[] split = uri.split(":");
                        String host = split[0];
                        String port = split[1];
                        HttpHost httpHost = new HttpHost(host, Integer.parseInt(port), "http");
                        httpHostArr[i++] = httpHost;
                    } catch (Exception e) {
                        log.error(e.getMessage());
                    }
                }
                RestClientBuilder builder = RestClient.builder(httpHostArr);
                restHighLevelClient = new RestHighLevelClient(builder);
            } catch (IOException e) {
                log.error(e.getMessage());
            }
        }
    
        /**
         * 创建索引
         *
         * @param index
         * @return
         */
        public boolean createIndex(String index) throws IOException {
            if (isIndexExist(index)) {
                log.error("Index is  exits!");
                return false;
            }
            //1.创建索引请求
            CreateIndexRequest request = new CreateIndexRequest(index);
            //2.执行客户端请求
            CreateIndexResponse response = restHighLevelClient.indices()
                    .create(request, RequestOptions.DEFAULT);
            return response.isAcknowledged();
        }
    
        /**
         * 判断索引是否存在
         *
         * @param index
         * @return
         */
        public boolean isIndexExist(String index) throws IOException {
            GetIndexRequest request = new GetIndexRequest(index);
            return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
        }
    
        /**
         * 删除索引
         *
         * @param index
         * @return
         */
        public boolean deleteIndex(String index) throws IOException {
            if (!isIndexExist(index)) {
                log.error("Index is not exits!");
                return false;
            }
            DeleteIndexRequest request = new DeleteIndexRequest(index);
            AcknowledgedResponse delete = restHighLevelClient.indices()
                    .delete(request, RequestOptions.DEFAULT);
            return delete.isAcknowledged();
        }
    
        /**
         * 新增/更新数据
         *
         * @param object 要新增/更新的数据
         * @param index  索引,类似数据库
         * @param id     数据ID
         * @return
         */
        public String submitData(Object object, String index, String id) throws IOException {
            if (null == id) {
                return addData(object, index);
            }
            if (this.existsById(index, id)) {
                return this.updateDataByIdNoRealTime(object, index, id);
            } else {
                return addData(object, index, id);
            }
        }
    
        /**
         * 新增数据,自定义id
         *
         * @param object 要增加的数据
         * @param index  索引,类似数据库
         * @param id     数据ID,为null时es随机生成
         * @return
         */
        public String addData(Object object, String index, String id) throws IOException {
            if (null == id) {
                return addData(object, index);
            }
            if (this.existsById(index, id)) {
                return this.updateDataByIdNoRealTime(object, index, id);
            }
            //创建请求
            IndexRequest request = new IndexRequest(index);
            request.id(id);
            request.timeout(TimeValue.timeValueSeconds(1));
            //将数据放入请求 json
            request.source(JSON.toJSONString(object), XContentType.JSON);
            //客户端发送请求
            IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
            log.info("添加数据成功 索引为: {}, response 状态: {}, id为: {}", index, response.status().getStatus(), response.getId());
            return response.getId();
        }
    
        /**
         * 数据添加 随机id
         *
         * @param object 要增加的数据
         * @param index  索引,类似数据库
         * @return
         */
        public String addData(Object object, String index) throws IOException {
            return addData(object, index, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
        }
    
        /**
         * 通过ID删除数据
         *
         * @param index 索引,类似数据库
         * @param id    数据ID
         * @return
         */
        public String deleteDataById(String index, String id) throws IOException {
            DeleteRequest request = new DeleteRequest(index, id);
            DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
            return deleteResponse.getId();
        }
    
        /**
         * 通过ID 更新数据
         *
         * @param object 要更新数据
         * @param index  索引,类似数据库
         * @param id     数据ID
         * @return
         */
        public String updateDataById(Object object, String index, String id) throws IOException {
            UpdateRequest updateRequest = new UpdateRequest(index, id);
            updateRequest.timeout("1s");
            updateRequest.doc(JSON.toJSONString(object), XContentType.JSON);
            UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            log.info("索引为: {}, id为: {},updateResponseID:{}, 更新数据成功", index, id, updateResponse.getId());
            return updateResponse.getId();
        }
    
        /**
         * 通过ID 更新数据,保证实时性
         *
         * @param object 要增加的数据
         * @param index  索引,类似数据库
         * @param id     数据ID
         * @return
         */
        public String updateDataByIdNoRealTime(Object object, String index, String id) throws IOException {
            //更新请求
            UpdateRequest updateRequest = new UpdateRequest(index, id);
    
            //保证数据实时更新
            updateRequest.setRefreshPolicy("wait_for");
    
            updateRequest.timeout("1s");
            updateRequest.doc(JSON.toJSONString(object), XContentType.JSON);
            //执行更新请求
            UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            log.info("索引为: {}, id为: {},updateResponseID:{}, 实时更新数据成功", index, id, updateResponse.getId());
            return updateResponse.getId();
        }
    
        /**
         * 通过ID获取数据
         *
         * @param index  索引,类似数据库
         * @param id     数据ID
         * @param fields 需要显示的字段,逗号分隔(缺省为全部字段)
         * @return
         */
        public Map<String, Object> searchDataById(String index, String id, String fields) throws IOException {
            GetRequest request = new GetRequest(index, id);
            if (StringUtils.isNotEmpty(fields)) {
                //只查询特定字段。如果需要查询所有字段则不设置该项。
                request.fetchSourceContext(new FetchSourceContext(true, fields.split(","), Strings.EMPTY_ARRAY));
            }
            GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);
            return response.getSource();
        }
    
        /**
         * 通过ID判断文档是否存在
         *
         * @param index 索引,类似数据库
         * @param id    数据ID
         * @return
         */
        public boolean existsById(String index, String id) throws IOException {
            GetRequest request = new GetRequest(index, id);
            //不获取返回的_source的上下文
            request.fetchSourceContext(new FetchSourceContext(false));
            request.storedFields("_none_");
            return restHighLevelClient.exists(request, RequestOptions.DEFAULT);
        }
    
        /**
         * 批量插入false成功
         *
         * @param index   索引,类似数据库
         * @param objects 数据
         * @return
         */
        public boolean bulkPost(String index, List<?> objects) {
            BulkRequest bulkRequest = new BulkRequest();
            BulkResponse response = null;
            //最大数量不得超过20万
            for (Object object : objects) {
                IndexRequest request = new IndexRequest(index);
                request.source(JSON.toJSONString(object), XContentType.JSON);
                bulkRequest.add(request);
            }
            try {
                response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null != response && response.hasFailures();
        }
    
    
        /**
         * 获取低水平客户端
         *
         * @return
         */
        public RestClient getLowLevelClient() {
            return restHighLevelClient.getLowLevelClient();
        }
    
        /**
         * 高亮结果集 特殊处理
         * map转对象 JSONObject.parseObject(JSONObject.toJSONString(map), Content.class)
         *
         * @param searchResponse
         * @param highlightField
         */
        private List<Map<String, Object>> setSearchResponse(SearchResponse searchResponse, String highlightField) {
            //解析结果
            ArrayList<Map<String, Object>> list = new ArrayList<>();
            for (SearchHit hit : searchResponse.getHits().getHits()) {
                Map<String, HighlightField> high = hit.getHighlightFields();
                HighlightField title = high.get(highlightField);
                //原来的结果
                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                //解析高亮字段,将原来的字段换为高亮字段
                if (title != null) {
                    Text[] texts = title.fragments();
                    StringBuilder nTitle = new StringBuilder();
                    for (Text text : texts) {
                        nTitle.append(text);
                    }
                    //替换
                    sourceAsMap.put(highlightField, nTitle.toString());
                }
                list.add(sourceAsMap);
            }
            return list;
        }
    
        /**
         * 查询并分页
         *
         * @param index          索引名称
         * @param query          查询条件
         * @param highlightField 高亮字段
         * @return
         */
        public List<Map<String, Object>> searchListData(String index,
                                                        SearchSourceBuilder query,
                                                        String highlightField) throws IOException {
            SearchRequest request = new SearchRequest(index);
    
            //高亮
            HighlightBuilder highlight = new HighlightBuilder();
            highlight.field(highlightField);
            //关闭多个高亮
            highlight.requireFieldMatch(false);
            highlight.preTags("<span style='color:red'>");
            highlight.postTags("</span>");
            query.highlighter(highlight);
            //不返回源数据。只有条数之类的数据。
            //builder.fetchSource(false);
            request.source(query);
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            log.info("totalHits:" + response.getHits().getTotalHits());
            if (response.status().getStatus() == 200) {
                // 解析对象
                return setSearchResponse(response, highlightField);
            }
            return null;
        }
    }
    

    4、添加es控制器ElasticSearchController.java作为测试使用

    package com.example.study.controller;
    
    import com.example.study.model.entity.UserEntity;
    import com.example.study.model.vo.ResponseVo;
    import com.example.study.util.BuildResponseUtils;
    import com.example.study.util.ElasticSearchUtils;
    import io.swagger.annotations.Api;
    import io.swagger.annotations.ApiOperation;
    import org.apache.commons.lang3.StringUtils;
    import org.elasticsearch.common.Strings;
    import org.elasticsearch.index.query.BoolQueryBuilder;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
    import org.elasticsearch.search.sort.SortOrder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    
    import java.io.IOException;
    import java.util.Map;
    
    /**
     * ElasticSearch控制器
     *
     * @author 154594742@qq.com
     * @date 2021/3/5 10:02
     */
    
    @Api(tags = "ElasticSearch控制器")
    @RestController
    @RequestMapping("elasticSearch")
    public class ElasticSearchController {
    
        @Autowired
        private ElasticSearchUtils elasticSearchUtils;
    
        /**
         * 新增索引
         *
         * @param index 索引
         * @return ResponseVo
         */
        @ApiOperation("新增索引")
        @PostMapping("index")
        public ResponseVo<?> createIndex(String index) throws IOException {
            return BuildResponseUtils.buildResponse(elasticSearchUtils.createIndex(index));
        }
    
        /**
         * 索引是否存在
         *
         * @param index index
         * @return ResponseVo
         */
        @ApiOperation("索引是否存在")
        @GetMapping("index/{index}")
        public ResponseVo<?> existIndex(@PathVariable String index) throws IOException {
            return BuildResponseUtils.buildResponse(elasticSearchUtils.isIndexExist(index));
        }
    
        /**
         * 删除索引
         *
         * @param index index
         * @return ResponseVo
         */
        @ApiOperation("删除索引")
        @DeleteMapping("index/{index}")
        public ResponseVo<?> deleteIndex(@PathVariable String index) throws IOException {
            return BuildResponseUtils.buildResponse(elasticSearchUtils.deleteIndex(index));
        }
    
    
        /**
         * 新增/更新数据
         *
         * @param entity 数据
         * @param index  索引
         * @param esId   esId
         * @return ResponseVo
         */
        @ApiOperation("新增/更新数据")
        @PostMapping("data")
        public ResponseVo<String> submitData(UserEntity entity, String index, String esId) throws IOException {
            return BuildResponseUtils.buildResponse(elasticSearchUtils.submitData(entity, index, esId));
        }
    
        /**
         * 通过id删除数据
         *
         * @param index index
         * @param id    id
         * @return ResponseVo
         */
        @ApiOperation("通过id删除数据")
        @DeleteMapping("data/{index}/{id}")
        public ResponseVo<String> deleteDataById(@PathVariable String index, @PathVariable String id) throws IOException {
            return BuildResponseUtils.buildResponse(elasticSearchUtils.deleteDataById(index, id));
        }
    
        /**
         * 通过id查询数据
         *
         * @param index  index
         * @param id     id
         * @param fields 需要显示的字段,逗号分隔(缺省为全部字段)
         * @return ResponseVo
         */
        @ApiOperation("通过id查询数据")
        @GetMapping("data")
        public ResponseVo<Map<String, Object>> searchDataById(String index, String id, String fields) throws IOException {
            return BuildResponseUtils.buildResponse(elasticSearchUtils.searchDataById(index, id, fields));
        }
    
        /**
         * 分页查询(这只是一个demo)
         *
         * @param index index
         * @return ResponseVo
         */
        @ApiOperation("分页查询")
        @GetMapping("data/page")
        public ResponseVo<?> selectPage(String index) throws IOException {
            //构建查询条件
            BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
            //精确查询
            //boolQueryBuilder.must(QueryBuilders.wildcardQuery("name", "张三"));
            // 模糊查询
            boolQueryBuilder.filter(QueryBuilders.wildcardQuery("name", "张"));
            // 范围查询 from:相当于闭区间; gt:相当于开区间(>) gte:相当于闭区间 (>=) lt:开区间(<) lte:闭区间 (<=)
            boolQueryBuilder.filter(QueryBuilders.rangeQuery("age").from(18).to(32));
            SearchSourceBuilder query = new SearchSourceBuilder();
            query.query(boolQueryBuilder);
            //需要查询的字段,缺省则查询全部
            String fields = "";
            //需要高亮显示的字段
            String highlightField = "name";
            if (StringUtils.isNotBlank(fields)) {
                //只查询特定字段。如果需要查询所有字段则不设置该项。
                query.fetchSource(new FetchSourceContext(true, fields.split(","), Strings.EMPTY_ARRAY));
            }
            //分页参数,相当于pageNum
            Integer from = 0;
            //分页参数,相当于pageSize
            Integer size = 2;
            //设置分页参数
            query.from(from);
            query.size(size);
    
            //设置排序字段和排序方式,注意:字段是text类型需要拼接.keyword
            //query.sort("age", SortOrder.DESC);
            query.sort("name" + ".keyword", SortOrder.ASC);
    
            return BuildResponseUtils.buildResponse(elasticSearchUtils.searchListData(index, query, highlightField));
        }
    }
    

    5、运行项目,然后访问 http://localhost:8080/swagger-ui.html 测试一下效果吧

    * 这里我就只贴上分页查询的效果(相信这也是大家最需要的),其余的大家自行体验
    

     

  • 相关阅读:
    【JVM性能调优】检测最耗cpu的线程的脚本
    JUC之ThreadPoolExecutor实现原理
    HashMap实现原理
    JUC之阻塞队列BlockingQueue的实现原理
    dubbo实践
    .net 技术基础
    日志等级
    CentOS 笔记(六) 历史命令 自动补充
    CentOS 笔记(五) 常用工具
    CentOS 笔记(二) 端口占用,进程查看
  • 原文地址:https://www.cnblogs.com/niudaxianren/p/15005079.html
Copyright © 2011-2022 走看看