zoukankan      html  css  js  c++  java
  • Java操作elasticsearch

    使用 Maven 工程,我的 pom 文件如下所示:

        <dependencies>
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>2.4.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>2.6.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.6.2</version>
            </dependency>
        </dependencies>
        

    连接机器

    TransportClient client = TransportClient.builder()
        .build()
        .addTransportAddress(new InetSocketTransportAddress(InetAddress
        .getByName("localhost"), 9300));       
        

    Index API 创建 Index 并且插入 Document

    创建索引有很多种方法,这里列举常用的 2 种:

    HashMap<String, Object> json = new HashMap<String, Object>();
    json.put("first_name","Shuang");
    json.put("last_name", "Peng");
    json.put("age", 24);
    json.put("about", "I love coding");
    IndexResponse response = client
        .prepareIndex("tseg","students","1")
        .setSource(json).get();
    
    IndexResponse response = client.prepareIndex("tseg","students","1")
       .setSource(jsonBuilder()
       .startObject()
       .field("first_name", "Shuang")
       .field("first_name", "Peng")
       .field("age", 24)
       .field("about", "I love coding")
       .endObject())
       .get();
     

    注意:Index API 只能用于创建 index,类似于关系型数据库里面的 create table,他不能对已有的数据库进行添加。追加操作可以用后面会提到的 Update 或者 Bulk 来完成。

    Get API 获取 Document

    GetResponse response2 = client.prepareGet("tseg", "students", "1").get();
    Map<String, Object> res = response2.getSource();
    for (Map.Entry<String, Object> entry: res.entrySet()){
         System.out.println(entry.getKey() + " : " + entry.getValue());
         }
    

    Delete API 删除 Index 或者 Document

    // 用来删除对应的 document 
    DeleteResponse response3 = 
        client.prepareDelete("tesg","students","1").get();
    // 用来删除对应的 index
    DeleteIndexResponse response4 = 
        client.admin().indices().prepareDelete("facebook").execute().actionGet();
    

    Update API 更新操作

    更新操作也有两种方法。建议使用第一种,第二种太复杂了。。。看看就好。

    第一种

    client.prepareUpdate("tseg", "students", "1")
        .setDoc(jsonBuilder()
        .startObject().field("age", 32)
        .endObject())
        .get();
    

    第二种

    IndexRequest indexRequest = new IndexRequest("tseg", "students", "1")
        .source(jsonBuilder()
        .startObject()
        .field("first_name", "Shuang")
        .field("last_name", "Peng")
        .field("age", 32)
        .field("about", "I loving coding")
        .endObject());
    
    UpdateRequest updateRequest = new UpdateRequest("tseg","students", "1")
        .doc(jsonBuilder()
        .startObject().field("age", 32)
        .endObject())
        .upsert(indexRequest);
     client.update(updateRequest).get();
    

    不过这里提一下第二种方法,如果对应的 field 不存在的话,则更新操作自动变为插入操作,否则,就是正常的修改操作。

    Multi Get API 多查找

    MultiGetResponse API 可以一次返回多个要查找的值。下面介绍了两种方法,一种是返回一个 Map,我们可以按照不同的 field 取值;第二种方法是直接返回一个字符串(Json格式)。

    MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
        .add("tseg", "students", "1", "2", "3").get();
    
    for (MultiGetItemResponse itemResponses : multiGetItemResponses) {
        GetResponse response5 = itemResponses.getResponse();
        if (response5.isExists()) {
        
    // 第一种用法
        Map<String, Object> fields = response5.getSource();
        System.out.println(fields.get("first_name"));
    
    // 第二种用法
        String json2 = response5.getSourceAsString();
        System.out.println(json2);
    }
    

    Bulk API 批量操作

    Bulk API允许批量提交index和delete请求, 如下:

    BulkRequestBuilder bulkRequest = client.prepareBulk();
    bulkRequest.add(client.prepareIndex("tseg", "students", "1")
               .setSource(jsonBuilder()
               .startObject()
               .field("first_name", "Allen")
               .field("last_name", "Peng")
               .field("age", "22")
               .endObject()))
               .get();
               
    bulkRequest.add(client.prepareIndex("tseg", "students", "2"))
                .setSource(jsonBuilder()
                .startObject()
                .field("first_name", "Hou")
                .field("last_name", "Xue")
                .field("age", "30")
                .endObject()))
                .get();
    
    HashMap<String, Object> json2 = new HashMap<String, Object>();
    List<String> list = new ArrayList<String>();
    list.add("music");
    list.add("football");
    json2.put("first_name", "Peng");
    json2.put("last_name", "Peng");
    json2.put("interests", list);
    BulkRequestBuilder bulkRequest2 = client.prepareBulk();
    
    // 两种执行方法,个人倾向于第一种
    bulkRequest2.add(client.prepareIndex("facebook", "info", 
        "3").setSource(json2)).get();
    // 第二种方法
    bulkRequest2.add(client.prepareIndex("facebook", 
        "info","1").setSource(json2)).execute().actionGet();
    

    还可以这样做:

    BulkRequestBuilder bulkRequest = client.prepareBulk();
    bulkRequest.add(client.prepareIndex("index1", "type1", "id1")
        .setSource(source);
    bulkRequest.add(client.prepareIndex("index2", "type2", "id2")
        .setSource(source);
    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
    

    Bulk Processor API 可在批量操作完成之前和之后进行相应的操作

    BulkProcessor bulkProcessor = BulkProcessor.builder(
            client,  
            new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId,
                                      BulkRequest request) { ... } 
    
                @Override
                public void afterBulk(long executionId,
                                      BulkRequest request,
                                      BulkResponse response) { ... } 
    
                @Override
                public void afterBulk(long executionId,
                                      BulkRequest request,
                                      Throwable failure) { ... } 
            })
            .setBulkActions(10000) 
            .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) 
            .setFlushInterval(TimeValue.timeValueSeconds(5)) 
            .setConcurrentRequests(1) 
             .build();
             
    bulkProcessor.add(new IndexRequest("index1", "type1", "id1").source(source1));  
    bulkProcessor.add(new DeleteRequest("index2", "type2", "id2");        
    
    1. beforeBulk 会在批量提交之前执行,可以从 BulkRequest 中获取请求信息request.requests() 或者请求数量 request.numberOfActions()。

    2. 第一个 afterBulk 会在批量成功后执行,可以跟 beforeBulk 配合计算批量所需时间。

    3. 第二个 afterBulk 会在批量失败后执行。

    4. 在例子中,当请求超过 10000 个(default=1000)或者总大小超过1GB(default=5MB)时,触发批量提交动作

  • 相关阅读:
    MongoDB一键安装
    Mongo基本操作
    MongoDB AUTH结果验证及开启方法
    MongoDB AUTH结果验证
    MongoDB使用
    MongoDB-安装配置
    11204RAC-dbca建库脚本
    MySQL主从同步最佳实践
    实时抓取主从的同步状态
    守护神 Supervisor
  • 原文地址:https://www.cnblogs.com/cmfwm/p/7988660.html
Copyright © 2011-2022 走看看