zoukankan      html  css  js  c++  java
  • Java操作elasticsearch

    使用 Maven 工程,我的 pom 文件如下所示:

        <dependencies>
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>2.4.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>2.6.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.6.2</version>
            </dependency>
        </dependencies>
        

    连接机器

    TransportClient client = TransportClient.builder()
        .build()
        .addTransportAddress(new InetSocketTransportAddress(InetAddress
        .getByName("localhost"), 9300));       
        

    Index API 创建 Index 并且插入 Document

    创建索引有很多种方法,这里列举常用的 2 种:

    HashMap<String, Object> json = new HashMap<String, Object>();
    json.put("first_name","Shuang");
    json.put("last_name", "Peng");
    json.put("age", 24);
    json.put("about", "I love coding");
    IndexResponse response = client
        .prepareIndex("tseg","students","1")
        .setSource(json).get();
    
    IndexResponse response = client.prepareIndex("tseg","students","1")
       .setSource(jsonBuilder()
       .startObject()
       .field("first_name", "Shuang")
       .field("first_name", "Peng")
       .field("age", 24)
       .field("about", "I love coding")
       .endObject())
       .get();
     

    注意:Index API 只能用于创建 index,类似于关系型数据库里面的 create table,他不能对已有的数据库进行添加。追加操作可以用后面会提到的 Update 或者 Bulk 来完成。

    Get API 获取 Document

    GetResponse response2 = client.prepareGet("tseg", "students", "1").get();
    Map<String, Object> res = response2.getSource();
    for (Map.Entry<String, Object> entry: res.entrySet()){
         System.out.println(entry.getKey() + " : " + entry.getValue());
         }
    

    Delete API 删除 Index 或者 Document

    // 用来删除对应的 document 
    DeleteResponse response3 = 
        client.prepareDelete("tesg","students","1").get();
    // 用来删除对应的 index
    DeleteIndexResponse response4 = 
        client.admin().indices().prepareDelete("facebook").execute().actionGet();
    

    Update API 更新操作

    更新操作也有两种方法。建议使用第一种,第二种太复杂了。。。看看就好。

    第一种

    client.prepareUpdate("tseg", "students", "1")
        .setDoc(jsonBuilder()
        .startObject().field("age", 32)
        .endObject())
        .get();
    

    第二种

    IndexRequest indexRequest = new IndexRequest("tseg", "students", "1")
        .source(jsonBuilder()
        .startObject()
        .field("first_name", "Shuang")
        .field("last_name", "Peng")
        .field("age", 32)
        .field("about", "I loving coding")
        .endObject());
    
    UpdateRequest updateRequest = new UpdateRequest("tseg","students", "1")
        .doc(jsonBuilder()
        .startObject().field("age", 32)
        .endObject())
        .upsert(indexRequest);
     client.update(updateRequest).get();
    

    不过这里提一下第二种方法,如果对应的 field 不存在的话,则更新操作自动变为插入操作,否则,就是正常的修改操作。

    Multi Get API 多查找

    MultiGetResponse API 可以一次返回多个要查找的值。下面介绍了两种方法,一种是返回一个 Map,我们可以按照不同的 field 取值;第二种方法是直接返回一个字符串(Json格式)。

    MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
        .add("tseg", "students", "1", "2", "3").get();
    
    for (MultiGetItemResponse itemResponses : multiGetItemResponses) {
        GetResponse response5 = itemResponses.getResponse();
        if (response5.isExists()) {
        
    // 第一种用法
        Map<String, Object> fields = response5.getSource();
        System.out.println(fields.get("first_name"));
    
    // 第二种用法
        String json2 = response5.getSourceAsString();
        System.out.println(json2);
    }
    

    Bulk API 批量操作

    Bulk API允许批量提交index和delete请求, 如下:

    BulkRequestBuilder bulkRequest = client.prepareBulk();
    bulkRequest.add(client.prepareIndex("tseg", "students", "1")
               .setSource(jsonBuilder()
               .startObject()
               .field("first_name", "Allen")
               .field("last_name", "Peng")
               .field("age", "22")
               .endObject()))
               .get();
               
    bulkRequest.add(client.prepareIndex("tseg", "students", "2"))
                .setSource(jsonBuilder()
                .startObject()
                .field("first_name", "Hou")
                .field("last_name", "Xue")
                .field("age", "30")
                .endObject()))
                .get();
    
    HashMap<String, Object> json2 = new HashMap<String, Object>();
    List<String> list = new ArrayList<String>();
    list.add("music");
    list.add("football");
    json2.put("first_name", "Peng");
    json2.put("last_name", "Peng");
    json2.put("interests", list);
    BulkRequestBuilder bulkRequest2 = client.prepareBulk();
    
    // 两种执行方法,个人倾向于第一种
    bulkRequest2.add(client.prepareIndex("facebook", "info", 
        "3").setSource(json2)).get();
    // 第二种方法
    bulkRequest2.add(client.prepareIndex("facebook", 
        "info","1").setSource(json2)).execute().actionGet();
    

    还可以这样做:

    BulkRequestBuilder bulkRequest = client.prepareBulk();
    bulkRequest.add(client.prepareIndex("index1", "type1", "id1")
        .setSource(source);
    bulkRequest.add(client.prepareIndex("index2", "type2", "id2")
        .setSource(source);
    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
    

    Bulk Processor API 可在批量操作完成之前和之后进行相应的操作

    BulkProcessor bulkProcessor = BulkProcessor.builder(
            client,  
            new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId,
                                      BulkRequest request) { ... } 
    
                @Override
                public void afterBulk(long executionId,
                                      BulkRequest request,
                                      BulkResponse response) { ... } 
    
                @Override
                public void afterBulk(long executionId,
                                      BulkRequest request,
                                      Throwable failure) { ... } 
            })
            .setBulkActions(10000) 
            .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) 
            .setFlushInterval(TimeValue.timeValueSeconds(5)) 
            .setConcurrentRequests(1) 
             .build();
             
    bulkProcessor.add(new IndexRequest("index1", "type1", "id1").source(source1));  
    bulkProcessor.add(new DeleteRequest("index2", "type2", "id2");        
    
    1. beforeBulk 会在批量提交之前执行,可以从 BulkRequest 中获取请求信息request.requests() 或者请求数量 request.numberOfActions()。

    2. 第一个 afterBulk 会在批量成功后执行,可以跟 beforeBulk 配合计算批量所需时间。

    3. 第二个 afterBulk 会在批量失败后执行。

    4. 在例子中,当请求超过 10000 个(default=1000)或者总大小超过1GB(default=5MB)时,触发批量提交动作

  • 相关阅读:
    Elementary Methods in Number Theory Exercise 1.3.13
    Elementary Methods in Number Theory Exercise 1.3.17, 1.3.18, 1.3.19, 1.3.20, 1.3.21
    数论概论(Joseph H.Silverman) 习题 5.3,Elementary methods in number theory exercise 1.3.23
    Elementary Methods in Number Theory Exercise 1.2.31
    数论概论(Joseph H.Silverman) 习题 5.3,Elementary methods in number theory exercise 1.3.23
    Elementary Methods in Number Theory Exercise 1.3.13
    Elementary Methods in Number Theory Exercise 1.3.17, 1.3.18, 1.3.19, 1.3.20, 1.3.21
    Elementary Methods in Number Theory Exercise 1.2.31
    Elementary Methods in Number Theory Exercise 1.2.26 The Heisenberg group
    4__面向对象的PHP之作用域
  • 原文地址:https://www.cnblogs.com/cmfwm/p/7988660.html
Copyright © 2011-2022 走看看