zoukankan      html  css  js  c++  java
  • Elasticsearch java api操作(一)(Java Low Level Rest Client)

    一、说明:

      一、Elasticsearch提供了两个JAVA REST Client版本:

      1、java low level rest client:

      低级别的rest客户端,通过http与集群交互,用户需自己编组请求JSON串,及解析响应JSON串。兼容所有Elasticsearch版本。

      特点:maven引入

      使用介绍: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.html

      2、java high rest client:

      高级别的REST客户端,基于低级别的REST客户端,增加了编组请求JSON串、解析响应JSON串等相关API,使用的版本需要保存和ES服务一致的版本,否则会有版本问题。

      从6.0.0开始加入的,目的是以java面向对象的方式进行请求、响应处理。

      每个API支持 同步、异步 两种方式,同步方法之间返回一个结果对象。异步的方法以async为后缀,通过listener参数来通知结果。高级java resy客户端依赖Elasticsearch core pproject

      兼容性说明:

      依赖jdk1.8和Elasticsearch core project

    二、Java Low Level Rest Client的使用

    版本:

    Elasticsearch 6.3.1

    pom文件:

     <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>transport</artifactId>
                <version>6.3.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>6.3.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>2.7</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.7</version>
            </dependency>
    
            <dependency>
                <groupId>net.sf.json-lib</groupId>
                <artifactId>json-lib</artifactId>
                <version>0.9</version>
     </dependency>
    

    一、构建elasicsearch client工具类

    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.TransportAddress;
    import org.elasticsearch.transport.client.PreBuiltTransportClient;
    import java.net.InetAddress;
    
    /**
     * @Author: xiaolaotou
     * @Date: 2019/4/19
     */
    
    /**
     * 构建elasticsrarch client
     */
    public class ClientUtil {
        private static TransportClient client;
        public TransportClient CreateClient() throws Exception {
            // 先构建client
            System.out.println("11111111111");
            Settings settings=Settings.builder()
                    .put("cluster.name","elasticsearch1")
                    .put("client.transport.ignore_cluster_name", true)  //如果集群名不对,也能连接
                    .build();
            //创建Client
            TransportClient client = new PreBuiltTransportClient(settings)
                    .addTransportAddress(
                            new TransportAddress(
                                    InetAddress.getByName(
                                            "192.168.200.100"),
                                    9300));
            return client;
        }
    }

    二、测试类

    import net.sf.json.JSONObject;
    import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
    import org.elasticsearch.action.bulk.BulkRequestBuilder;
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.action.search.SearchType;
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.action.update.UpdateResponse;
    import org.elasticsearch.client.Requests;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.common.xcontent.XContentBuilder;
    import org.elasticsearch.common.xcontent.XContentFactory;
    import org.elasticsearch.common.xcontent.XContentType;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.search.SearchHits;
    import java.io.BufferedWriter;
    import java.io.File;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Set;
    import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
    
    
    /**
     * @Author: xiaolaotou
     * @Date: 2019/4/19
     * ElasticSearch 6.3.1
     */
    public class Test {
    
        private static TransportClient client;
    
        static {
            try {
                client = new ClientUtil().CreateClient();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
    
            //创建索引
    //        createEmployee();
            //根据inde,type,id查询一个document的data
    //        FindIndex();
    //        CreateJsonIndex();
            //批量导入
    //        BulkCreateIndex();
    
            //批量导出
    //        OutData();
            //创建带ik分词的index
    //        CreateIndexIkTest();
    
            //更新索引
    //        UpdateIndex();
    //        createIndex2();
    //        Search();
              get();
        }
    
        /**
         * 创建索引,普通格式
         *
         * @throws Exception
         */
        public static void createEmployee() throws Exception {
            IndexResponse response = client.prepareIndex("student", "doc", "1")
                    .setSource(jsonBuilder()
                            .startObject()
                            .field("name", "jack")
                            .field("age", 27)
                            .field("position", "technique")
                            .field("country", "china")
                            .field("join_date", "2017-01-01")
                            .field("salary", 10000)
                            .endObject())
                    .get();
            System.out.println("创建成功!");
        }
    
    /**
         * 根据 index ,type,id查询
         *
         * @throws Exception
         */
        public static void FindIndex() throws Exception {
            GetResponse getResponse = client.prepareGet("student", "doc", "1").get();
            System.out.println(getResponse.getSourceAsString());
        }
    
    /**
         * 创建索引,JSON
         *
         * @throws IOException
         */
        public static void CreateJsonIndex() throws IOException {
            JSONObject json = new JSONObject();
            json.put("user", "小明");
            json.put("title", "Java Engineer");
            json.put("desc", "web 开发");
            IndexResponse response = client.prepareIndex("studentjson", "doc", "1")
                    .setSource(json, XContentType.JSON)
                    .get();
            String _index = response.getIndex();
            System.out.println(_index);
        }
    
    /**
         * elasticsearch批量导入
         */
        public static void BulkCreateIndex() {
            BulkRequestBuilder builder = client.prepareBulk();
            for (int i = 0; i < 100000; i++) {
                HashMap<String, Object> map = new HashMap<>();
                map.put("recordtime", "11");
                map.put("area", "22");
                map.put("usertype", "33");
                map.put("count", 44);
                builder.add(client.prepareIndex("bulktest", "1").setSource(map));
                //每10000条提交一次
                if (i % 10000 == 0) {
                    builder.execute().actionGet();
                    builder = client.prepareBulk();
                }
            }
        }
    
    /**
         * 批量导出
         */
        public static void OutData() throws IOException {
            SearchResponse response = client.prepareSearch("bulktest").setTypes("1")
                    .setQuery(QueryBuilders.matchAllQuery())
                    .setSize(10000).setScroll(new TimeValue(600000))
                    .setSearchType(SearchType.DEFAULT).execute().actionGet();
            // setScroll(new TimeValue(600000)) 设置滚动的时间
            String scrollid = response.getScrollId();
            //把导出的结果以JSON的格式写到文件里
    
            //每次返回数据10000条。一直循环查询知道所有的数据都被查询出来
            while (true) {
                SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000))
                        .execute().actionGet();
                SearchHits searchHit = response2.getHits();
                //再次查询不到数据时跳出循环
                if (searchHit.getHits().length == 0) {
                    break;
                }
                System.out.println("查询数量 :" + searchHit.getHits().length);
                for (int i = 0; i < searchHit.getHits().length; i++) {
                    String json = searchHit.getHits()[i].getSourceAsString();
                    putData(json);
                }
                System.out.println("查询结束");
            }
        }
    
    public static void putData(String json) throws IOException {
            String str = json + "
    ";
            //写入本地文件
            String fileTxt = "D:\data.txt";
            File file = new File(fileTxt);
            if (!file.getParentFile().exists()) {
                file.getParentFile().mkdirs();
            }
            if (!file.exists()) {
                file.createNewFile();
                FileWriter fw = new FileWriter(file, true);
                BufferedWriter bw = new BufferedWriter(fw);
                System.out.println("写入完成啦啊");
                bw.write(String.valueOf(str));
                bw.flush();
                bw.close();
                fw.close();
            } else {
                FileWriter fw = new FileWriter(file, true);
                BufferedWriter bw = new BufferedWriter(fw);
                System.out.println("追加写入完成啦啦");
                bw.write(String.valueOf(str));
                bw.flush();
                bw.close();
                fw.close();
            }
        }
    
    
        /**
         * 创建索引,并给某些字段指定ik分词器,以后向该索引中查询时,就会用ik分词
         */
        public static void CreateIndexIkTest() throws Exception {
            //创建映射
            XContentBuilder mapping = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject("properties")
                    //title:字段名,  type:文本类型       analyzer :分词器类型
                    .startObject("title").field("type", "text").field("analyzer", "ik_smart").endObject()   //该字段添加的内容,查询时将会使用ik_smart分词
                    .startObject("content").field("type", "text").field("analyzer", "ik_max_word").endObject()
                    .endObject()
                    .endObject();
    
            //index:索引名   type:类型名(可以自己定义)
            PutMappingRequest putmap = Requests.putMappingRequest("index").type("type").source(mapping);
            //创建索引
            client.admin().indices().prepareCreate("index").execute().actionGet();
            //为索引添加映射
            client.admin().indices().putMapping(putmap).actionGet();
    
            //调用下面的方法为创建的索引添加内容
            CreateIndex1();
        }
    
        //这个方法是为上一步创建的索引中添加内容,包括id,id不能重复
        public static void CreateIndex1() throws IOException {
            IndexResponse response = client.prepareIndex("index", "type", "1") //索引,类型,id
                    .setSource(jsonBuilder()
                            .startObject()
                            .field("title", "title")   //字段,值
                            .field("content", "content")
                            .endObject()
                    ).get();
        }
    
    /**
         * 更新索引
         */
        //更新索引,更新刚才创建的索引,如果id相同将会覆盖掉刚才的内容
        public static void UpdateIndex() throws Exception {
            //每次添加id应该不同,相当于数据表中的主键,相同的话将会进行覆盖
            UpdateResponse response=client.update(new UpdateRequest("index","type","1")
            .doc(XContentFactory.jsonBuilder()
                .startObject()
                    .field("title","中华人民共和国国歌,国歌是最好听的歌")
                    .field("content","中华人民共和国国歌,国歌是最好听的歌")
                    .endObject()
            )).get();
        }
    
        //再插入一条数据
        public static void createIndex2() throws IOException {
            IndexResponse response = client.prepareIndex("index", "type", "2")
                    .setSource(jsonBuilder()
                            .startObject()
                            .field("title", "中华民族是伟大的民族")
                            .field("content", "中华民族是伟大的民族")
                            .endObject()
                    ).get();
        }
    
        /**
         * 下面使用index索引下的2个document进行查询
         */
        public static  void  Search(){
            SearchResponse response1 = client.prepareSearch( "index")  //指定多个索引
                    .setTypes("type")  //指定类型
                    .setSearchType(SearchType.QUERY_THEN_FETCH)
                    .setQuery(QueryBuilders.matchQuery("title", "中华人民共和国国歌"))  // Query
    //                .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18))     // Filter
                    .setFrom(0).setSize(60).setExplain(true)
                    .get();
            long totalHits1= response1.getHits().totalHits;  //命中个数
            System.out.println("response1======="+totalHits1);
    
            SearchResponse response2 = client.prepareSearch( "index")  //指定多个索引
                    .setTypes("type")  //指定类型
                    .setSearchType(SearchType.QUERY_THEN_FETCH)
                    .setQuery(QueryBuilders.matchQuery("content", "中华人民共和国国歌"))  // Query
    //                .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18))     // Filter
                    .setFrom(0).setSize(60).setExplain(true)
                    .get();
            long totalHits2 = response2.getHits().totalHits;  //命中个数
            System.out.println("response2========="+totalHits2);
        }
    
        /**
         * GET操作
         */
        public static void get() {
            GetResponse response = client.prepareGet("index", "type", "2").get();
            Map<String, Object> source = response.getSource();
            Set<String> strings = source.keySet();
            Iterator<String> iterator = strings.iterator();
            while (iterator.hasNext()) {
                System.out.println(source.get(iterator.next()));
            }
        }
    }
  • 相关阅读:
    盒子垂直水平居中
    Sahi (2) —— https/SSL配置(102 Tutorial)
    Sahi (1) —— 快速入门(101 Tutorial)
    组织分析(1)——介绍
    Java Servlet (1) —— Filter过滤请求与响应
    CAS (8) —— Mac下配置CAS到JBoss EAP 6.4(6.x)的Standalone模式(服务端)
    JBoss Wildfly (1) —— 7.2.0.Final编译
    CAS (7) —— Mac下配置CAS 4.x的JPATicketRegistry(服务端)
    CAS (6) —— Nginx代理模式下浏览器访问CAS服务器网络顺序图详解
    CAS (5) —— Nginx代理模式下浏览器访问CAS服务器配置详解
  • 原文地址:https://www.cnblogs.com/yfb918/p/10737560.html
Copyright © 2011-2022 走看看