zoukankan      html  css  js  c++  java
  • ElasticSearch(八):springboot集成ElasticSearch集群并使用

    1. 集群的搭建

    见:ElasticSearch(七)

    2. springboot配置集群

    2.1 创建springboot项目,使用idea创建,不过多介绍(创建项目时候建议不要勾选elasticsearch,springboot目前自带的elasticsearch版本为5.6.10,如果你版本高于这个版本,还是自己手动导入。)

    2.2 导入依赖

       <properties>
          <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
          <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
          <java.version>1.8</java.version>
          <elasticSearch.version>6.3.2</elasticSearch.version>
       </properties>
    <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId>  <version>${elasticSearch.version}</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>${elasticSearch.version}</version> <exclusions> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticSearch.version}</version> </dependency> <dependency> <groupId>org.elasticsearch.plugin</groupId> <artifactId>transport-netty4-client</artifactId> <version>${elasticSearch.version}</version> </dependency>

    对于依赖需要说明的几点:

    2.2.1. org.elasticsearch.client--transport 依赖添加之后,会依赖一系列的插件,客户端等,虽然在springboot2.0中依旧依赖  org.elasticsearch-elasticsearch-6.3.2,但是在依赖列表中,其添加的依赖依然是elasticSearch5.6.10的依赖,所以必须排除这个依赖,手动添加org.elasticsearch-elasticsearch6.3.2的依赖,目前只有这种解决方法,否则导致版本不一致冲突。如下:

    当我排除 org.elasticsearch.client.transport的elasticsearch的依赖之后,重新添加elasticsearch 6.3.2的依赖之后,就显示的是同样的elasticsearch6.3.2。显示如下:

    2.2.2. 这时候如果你再springboot中配置了TransportClient的方法Bean,则启动项目,会报错:

    这是因为:transport-netty4-client的版本是5.6.0,而我们使用的所有的elasticsearch版本都是6.3.2,导致jar包冲突,所以,我们必须将transport-netty4-client的版本更新到6.3.2。

    这就需要导入jar:org.elasticsearch.plugin----transport-netty4-client 的jar,(具体依赖将上面),这时候transport-netty4-client的版本也是6.3.2了。

    2.2.3. 到这里已经可以使用elasticsearch的集群了,不过我们又导入了一个 elasticsearch-rest-high-level-client的jar,目的是:为了使用某些特殊的api。参见:https://www.cnblogs.com/ginb/p/8716485.html

    3. 启动项目,连接elasticSearch集群

    3.1 配置集群信息

    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.TransportAddress;
    import org.elasticsearch.transport.client.PreBuiltTransportClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import java.net.InetAddress;
    
    /**
     * @Auther: cc
     * @Date: 
     * @Description:
     */
    @Configuration
    public class ESConfig {
        private Logger logger  = LoggerFactory.getLogger(this.getClass());
    
        @Value("${elasticsearch.firstIp}")
        private String firstIp;
        @Value("${elasticsearch.secondIp}")
        private String secondIp;
        @Value("${elasticsearch.thirdIp}")
        private String thirdIp;
        @Value("${elasticsearch.firstPort}")
        private String firstPort;
        @Value("${elasticsearch.secondPort}")
        private String secondPort;
        @Value("${elasticsearch.thirdPort}")
        private String thirdPort;
        @Value("${elasticsearch.clusterName}")
        private String clusterName;
    
        @Bean
        public TransportClient getTransportClient() {
            logger.info("ElasticSearch初始化开始。。");
            logger.info("要连接的节点1的ip是{},端口是{},集群名为{}" , firstIp , firstPort , clusterName);
            logger.info("要连接的节点2的ip是{},端口是{},集群名为{}" , secondIp , secondPort , clusterName);
            logger.info("要连接的节点3的ip是{},端口是{},集群名为{}" , thirdIp , thirdPort , clusterName);
            TransportClient transportClient = null;
            try {
                Settings settings = Settings.builder()
                        .put("cluster.name",clusterName)    //集群名称
                        .put("client.transport.sniff",true)  //目的是为了可以找到集群,嗅探机制开启
                        .build();
                transportClient = new PreBuiltTransportClient(settings);
                TransportAddress firstAddress = new TransportAddress(InetAddress.getByName(firstIp),Integer.parseInt(firstPort));
                TransportAddress secondAddress = new TransportAddress(InetAddress.getByName(secondIp),Integer.parseInt(secondPort));
                TransportAddress thirdAddress = new TransportAddress(InetAddress.getByName(thirdIp),Integer.parseInt(thirdPort));
                transportClient.addTransportAddress(firstAddress);
                transportClient.addTransportAddress(secondAddress);
                transportClient.addTransportAddress(thirdAddress);
                logger.info("ElasticSearch初始化完成。。");
            }catch (Exception e){
                e.printStackTrace();
                logger.error("ElasticSearch初始化失败:" +  e.getMessage(),e);
            }
            return transportClient;
        }
    }

    对于上面代码解释:

    3.1.1 首先需要再配置文件中配置服务器集群的所有ip,端口,然后通过@value导入到config类中。

    3.2.2 类上必须加@Configuration注解,方法上必须加@Bean注解。

    3.2 启动项目,连接集群

    启动项目,如果不报错就可行了。

    4. 使用springboot操作索引

    4.1 创建索引

    主要使用方法:

    CreateIndexRequest createIndexRequest = Requests.createIndexRequest(index).settings(settings).mapping(type,mapping);  //指定setting,mapping创建索引,如果非结构化索引的话,不指定mapping
    CreateIndexResponse response = transportClient.admin().indices().create(createIndexRequest).get();
    logger.info("建立索引映射成功:" + response.isAcknowledged());

    4.2 删除索引

    DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(index);                  //创建删除索引的请求
    DeleteIndexResponse response = transportClient.admin().indices().delete(deleteIndexRequest).get();    //删除索引的响应
    logger.info("删除索引结果:{}",response.isAcknowledged());

    完整代码如下

    import com.cc.es.domain.base.ResultBean;
    import io.swagger.annotations.Api;
    import io.swagger.annotations.ApiImplicitParam;
    import io.swagger.annotations.ApiImplicitParams;
    import io.swagger.annotations.ApiOperation;
    import org.apache.commons.lang3.StringUtils;
    import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
    import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
    import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
    import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
    import org.elasticsearch.client.Requests;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.xcontent.XContentBuilder;
    import org.elasticsearch.common.xcontent.XContentFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    import java.util.*;
    
    /**
     * @Auther: Administrator
     * @Date: 2018/8/21 07
     * @Description:
     */
    @Api(value = "Index", tags = "索引")
    @RestController
    @RequestMapping("index")
    public class IndexController {
    
        private final String INDEX = "index";
        private final String TYPE = "type";
        private Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Resource
        private TransportClient transportClient;
    
    
        @ApiOperation(value = "结构化创建索引")
        @ApiImplicitParams({
                @ApiImplicitParam(name = "index", value = "索引名", required = true, dataType = "String", paramType = "query"),
                @ApiImplicitParam(name = "type", value = "类型", required = true, dataType = "Integer", paramType = "query"),
                @ApiImplicitParam(name = "fields", value = "结构化索引字段名,不定参数,传入的时候参数名为索引字段名,值为对应的数据类型")
        })
        @RequestMapping(value = "/create" , method = RequestMethod.POST)
        public ResultBean createIndex(@RequestParam Map<String,String> param){
            ResultBean resultBean = new ResultBean();
            String index = null;
            String type = null;
            List<String> fieldList = new ArrayList<>();
            logger.info("接收的创建索引的参数:" + param);
            Set<Map.Entry<String, String>> set = param.entrySet();
            for (Map.Entry<String, String> entry: set) {
                String key = entry.getKey();
                if(key.trim().equals(INDEX)){
                    index = entry.getValue();
                }else if(key.trim().equals(TYPE)){
                    type = entry.getValue();
                }else{
                    fieldList.add(key);
                }
            }
            if(StringUtils.isBlank(index) || StringUtils.isBlank(type)){
                resultBean.setSuccess(false);
                resultBean.setMsg("参数错误!");
                return resultBean;
            }
            try {
                XContentBuilder settings = XContentFactory.jsonBuilder()
                        .startObject()
                            .field("number_of_shards",6)
                            .field("number_of_replicas",1)
                            .startObject("analysis").startObject("analyzer").startObject("ik")
                                .field("tokenizer","ik_max_word")
                            .endObject().endObject().endObject()
                        .endObject();
                XContentBuilder mapping = XContentFactory.jsonBuilder();
                mapping.startObject().field("dynamic","strict").startObject("properties");
                for (int i = 0,j = fieldList.size(); i < j; i++) {
                    String field = fieldList.get(i);
                    String fieldType = param.get(field);
                    mapping.startObject(field).field("type",fieldType);
                    if(fieldType.trim().equals("date")){
                        mapping.field("format","yyyy-MM-dd HH:mm:ss || yyyy-MM-dd ");
                    }
                    mapping.endObject();
                }
                mapping.endObject().endObject();
                CreateIndexRequest createIndexRequest = Requests.createIndexRequest(index).settings(settings).mapping(type,mapping);
                CreateIndexResponse response = transportClient.admin().indices().create(createIndexRequest).get();
                logger.info("建立索引映射成功:" + response.isAcknowledged());
                resultBean.setSuccess(true);
                resultBean.setMsg("创建索引成功!");
            } catch (Exception e) {
                resultBean.setSuccess(false);
                resultBean.setMsg("创建索引失败!");
                logger.error("创建索引失败!要创建的索引为{},文档类型为{},异常为:",index,type,e.getMessage(),e);
            }
            return resultBean;
        }
    
        @ApiOperation(value = "删除索引")
        @ApiImplicitParams({
                @ApiImplicitParam(name = "index", value = "索引名", required = true, dataType = "String", paramType = "query"),
        })
        @RequestMapping(value = "/delete" , method = RequestMethod.POST)
        public ResultBean deleteIndex(String index){
            ResultBean resultBean = new ResultBean();
            if (StringUtils.isBlank(index)) {
                resultBean.setMsg("参数错误,索引为空");
                resultBean.setSuccess(false);
                return resultBean;
            }
            try {
                DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(index);
                DeleteIndexResponse response = transportClient.admin().indices().delete(deleteIndexRequest).get();
                logger.info("删除索引结果:{}",response.isAcknowledged());
                resultBean.setSuccess(response.isAcknowledged());
                resultBean.setMsg(response.isAcknowledged() ? "删除索引成功!" : "删除索引失败!");
            } catch (Exception e) {
                resultBean.setSuccess(false);
                resultBean.setMsg("创建索引失败!");
                logger.error("删除索引失败!要删除的索引为{},异常为:",index,e.getMessage(),e);
            }
            return resultBean;
        }
        
    }
    View Code

    到目前为止,springboot的索引已经完成。这里都是使用的原生的一些api,以后可能还会使用一些别的方法完成。

  • 相关阅读:
    新增模块--性能调测问题荟萃
    SSDB(网络LevelDB)-- 实际遇到的问题
    内存池--定长内存池
    分布式系统
    Tcp Ip -- tcpdump win窗口大小
    GCC手册学习(序)
    GNU--gprof使用总结
    几种TCP连接中出现RST的情况(转载)
    Tcp/Ip--正常情况下的三次握手,四次挥手
    关于cnblogs的排版
  • 原文地址:https://www.cnblogs.com/chenmc/p/9548847.html
Copyright © 2011-2022 走看看