zoukankan      html  css  js  c++  java
  • es学习-java操作 2.4.0版本

    package esjava;

    import org.elasticsearch.action.bulk.*;
    import org.elasticsearch.action.delete.DeleteRequest;
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.action.fieldstats.FieldStats;
    import org.elasticsearch.action.get.GetRequestBuilder;
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.action.get.MultiGetItemResponse;
    import org.elasticsearch.action.get.MultiGetResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.action.update.UpdateResponse;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.cluster.node.DiscoveryNode;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.InetSocketTransportAddress;
    import org.elasticsearch.common.unit.ByteSizeUnit;
    import org.elasticsearch.common.unit.ByteSizeValue;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.common.xcontent.XContentBuilder;
    import org.elasticsearch.common.xcontent.XContentFactory;

    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.net.UnknownHostException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;

    /**
    * Created by zhuzhiqiang on 2018/7/28.
    */
    public class ESUtile {
    static TransportClient client;
    private IndexRequest source;
    static {
    Map<String, String> map = new HashMap<String, String>();
    map.put("cluster.name", "es-cluster");
    Settings.Builder settings = Settings.builder().put(map);
    try {
    client = TransportClient.builder().settings(settings).build()
    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.0.108"), Integer.parseInt("9300")));
    } catch (UnknownHostException e) {
    e.printStackTrace();
    }
    }

    public static void testInfo() {
    List<DiscoveryNode> nodes = client.connectedNodes();
    for (DiscoveryNode node : nodes) {
    System.out.println(node.getHostAddress());
    }
    }

    /**
    * 组织json串, 方式1,直接拼接
    */
    public static String createJson1() {
    String json = "{" +
    ""user":"kimchy"," +
    ""postDate":"2013-01-30"," +
    ""message":"trying out Elasticsearch"" +
    "}";
    return json;
    }

    /**
    * 使用map创建json
    */
    public static Map<String, Object> createJson2() {
    Map<String,Object> json = new HashMap<String, Object>();
    json.put("user", "kimchyy");
    json.put("postDate", new FieldStats.Date());
    json.put("message", "trying out elasticsearch update");
    return json;
    }

    public XContentBuilder createJson4() throws Exception {
    // 创建json对象, 其中一个创建json的方式
    XContentBuilder source = XContentFactory.jsonBuilder()
    .startObject()
    .field("user", "kimchy")
    .field("postDate", new FieldStats.Date())
    .field("message", "trying to out ElasticSearch")
    .endObject();
    return source;
    }

    //添加文档

    public static void creat() throws Exception {
    String json1 = createJson1();
    // 存json入索引中
    IndexResponse response = client.prepareIndex("twitter", "tweet", "4").setSource(json1).get();
    // // 结果获取
    String index = response.getIndex();
    String type = response.getType();
    String id = response.getId();
    long version = response.getVersion();
    boolean created = response.isCreated();
    System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created);
    }

    //更新文档

    public static void update() throws Exception {
    UpdateRequest updateRequest=new UpdateRequest();
    updateRequest.index("twitter");
    updateRequest.type("tweet");
    updateRequest.id("3");
    updateRequest.doc(XContentFactory.jsonBuilder()
    .startObject()
    // 对没有的字段添加, 对已有的字段替换
    .field("gender", "maleww").endObject());
    UpdateResponse updateResponse = client.update(updateRequest).get();
    System.out.println(updateResponse.getIndex()+":"+updateResponse.getType()+
    ":"+updateResponse.getId()+":"+updateResponse.getVersion()+":"+updateResponse.isCreated()
    );

    }

    //更新文档

    public static void updateAndAdd() throws Exception {
    // 设置查询条件, 查找不到则添加生效
    IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "5")
    .source(XContentFactory.jsonBuilder()
    .startObject()
    .field("gender", "malewwq")
    .endObject());
    UpdateRequest updateRequest=new UpdateRequest();
    updateRequest.index("twitter");
    updateRequest.type("tweet");
    updateRequest.id("5");
    updateRequest.doc(XContentFactory.jsonBuilder()
    .startObject()
    // 对没有的字段添加, 对已有的字段替换
    .field("user", "zzq").endObject());
    UpdateResponse updateResponse = client.update(updateRequest.upsert(indexRequest)).get();
    System.out.println(updateResponse.getIndex()+":"+updateResponse.getType()+
    ":"+updateResponse.getId()+":"+updateResponse.getVersion()+":"+updateResponse.isCreated()
    );

    }


    //删除
    public static void delete() throws Exception {
    DeleteResponse deleteResponse = client.prepareDelete("twitter", "tweet", "4").get();
    System.out.println(deleteResponse.getIndex()+":"+deleteResponse.getType()+
    ":"+deleteResponse.getId()+":"+deleteResponse.getVersion()+":"+deleteResponse.toString()
    );

    }

    //查询

    public static void search() throws Exception{
    GetResponse getResponse = client.prepareGet("twitter", "tweet", "3").get();
    System.out.println(getResponse.getSourceAsString());


    }
    /**
    * 测试multi get api
    * 从不同的index, type, 和id中获取
    */

    public static void testMultiGet() {
    MultiGetResponse multiGetResponse = client.prepareMultiGet()
    .add("twitter", "tweet", "1")
    .add("twitter", "tweet", "1", "3", "4")
    .get();

    for (MultiGetItemResponse itemResponse : multiGetResponse) {
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {
    String sourceAsString = response.getSourceAsString();
    System.out.println(sourceAsString);
    }
    }
    }


    /**
    * bulk 批量执行
    * 一次查询可以update 或 delete多个document
    */

    public void testBulk() throws Exception {
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
    .setSource(XContentFactory.jsonBuilder()
    .startObject()
    .field("user", "kimchy")
    .field("postDate", new FieldStats.Date())
    .field("message", "trying out Elasticsearch")
    .endObject()));
    bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
    .setSource(XContentFactory.jsonBuilder()
    .startObject()
    .field("user", "kimchy")
    .field("postDate", new FieldStats.Date())
    .field("message", "another post")
    .endObject()));
    BulkResponse response = bulkRequest.get();
    System.out.println(response.getHeaders());
    }

    /**
    * 使用bulk processor
    * @throws Exception
    */

    public void testBulkProcessor() throws Exception {
    // 创建BulkPorcessor对象
    BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
    public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {
    // TODO Auto-generated method stub
    }

    // 执行出错时执行
    public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {
    // TODO Auto-generated method stub
    }

    public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {
    // TODO Auto-generated method stub
    }
    })
    // 1w次请求执行一次bulk
    .setBulkActions(10000)
    // 1gb的数据刷新一次bulk
    .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
    // 固定5s必须刷新一次
    .setFlushInterval(TimeValue.timeValueSeconds(5))
    // 并发请求数量, 0不并发, 1并发允许执行
    .setConcurrentRequests(1)
    // 设置退避, 100ms后执行, 最大请求3次
    .setBackoffPolicy(
    BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
    .build();

    // 添加单次请求
    bulkProcessor.add(new IndexRequest("twitter", "tweet", "1"));
    bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

    // 关闭
    bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
    // 或者
    bulkProcessor.close();
    }
    }



  • 相关阅读:
    生成证书时Distribution下面App Store and Ad Hoc 选项不能选择的原因及解决办法
    ios 实现版本更新检查
    ios 同步Get请求的实现
    UI设计规范整理一iOS字体和切图及规范
    Mac下使用抓包工具--Charles进行抓包
    iOS 审核被拒
    Xcode 9 compiling IB documents for earlier than ios 7 is no longer supported
    解决JSON包含HTML标签无法显示的问题
    OC与swift相互调用
    UIApplication深入研究
  • 原文地址:https://www.cnblogs.com/anxbb/p/9383221.html
Copyright © 2011-2022 走看看