zoukankan      html  css  js  c++  java
  • es-05-获取 resthighlevelclient及api

    在6.x以前, 使用最多的是transportclient, 但在7.x会被废弃, 

    先说以前的创建方式: 

    具体可见: https://www.cnblogs.com/wenbronk/p/6383194.html

        /**
         * 获取连接, 第一种方式
         * @throws Exception
         */
    //    @Before
        public void before() throws Exception {
            Map<String, String> map = new HashMap<String, String>();  
            map.put("cluster.name", "elasticsearch_wenbronk");  
            Settings.Builder settings = Settings.builder().put(map);  
            client = TransportClient.builder().settings(settings).build()  
                            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); 
        }
        
        /**
         * 获取连接, 第二种方式
         * @throws Exception
         */
        @Before
        public void before11() throws Exception {
            // 创建客户端, 使用的默认集群名, "elasticSearch"
    //        client = TransportClient.builder().build()
    //                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300));
    
            // 通过setting对象指定集群配置信息, 配置的集群名
            Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 设置集群名
    //                .put("client.transport.sniff", true) // 开启嗅探 , 开启后会一直连接不上, 原因未知
    //                .put("network.host", "192.168.50.37")
                    .put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上
    //                .put("client.transport.nodes_sampler_interval", 5) //报错,
    //                .put("client.transport.ping_timeout", 5) // 报错, ping等待时间,
                    .build();
             client = TransportClient.builder().settings(settings).build()
                     .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300)));
             // 默认5s
             // 多久打开连接, 默认5s
             System.out.println("success connect");
        }

    最新的创建方式: 

    package com.wenbronk.elasticsearch.usage.highLevel;
    
    import com.google.common.collect.Lists;
    import org.apache.http.HttpHost;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Arrays;
    
    public class RestHighLevelClientParent {
    
        public static final String HOST = "10.124.147.22,10.124.147.23,10.124.147.32";
        public static final Integer PORT = 9200;
    
        protected RestHighLevelClient client;
    
    
        @BeforeEach
        public void testBefore() {
    
            ArrayList<HttpHost> hosts = Lists.newArrayList();
    
            Arrays.stream(HOST.split(",")).forEach(host -> {
                hosts.add(new HttpHost(host, PORT, "http"));
            });
    
            client = new RestHighLevelClient(RestClient.builder(hosts.toArray(new HttpHost[0])));
    
        }
    
        @AfterEach
        public void testAfter() throws IOException {
            client.close();
        }
    
    }

    新的客户端, 对异步的处理方式和以前基本相同

    1), index

    @Test
        public void testAsync() throws IOException, InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            XContentBuilder object = XContentFactory.jsonBuilder()
                    .startObject()
                    .field("user", "wenbronk")
                    .timeField("postData", new Date())
                    .field("message", "message format from xcontent")
                    .endObject();
            IndexRequest source = new IndexRequest("test", "doc", "3").source(object);
            client.indexAsync(source, new ActionListener<IndexResponse>() {
                @Override
                public void onResponse(IndexResponse indexResponse) {
                    String id = indexResponse.getId();
                    String index = indexResponse.getIndex();
                    String type = indexResponse.getType();
    
                    if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    
                    }
                    if (indexResponse.getResult() == DocWriteResponse.Result.DELETED) {
    
                    }
                    if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    
                    }
    
                    ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                        // 有失败的
                    }
                    // 对失败的进行处理
                    if (shardInfo.getFailed() != 0) {
                        for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                            String reason = failure.reason();
                        }
                    }
                    countDownLatch.countDown();
                }
    
                @Override
                public void onFailure(Exception e) {
                    if (e instanceof ElasticsearchException) {
                        ElasticsearchException e1 = (ElasticsearchException) e;
                        if (e1.status() == RestStatus.CONFLICT) {
                            System.out.println("版本冲突 ");
                        }
                    }
                }
            });
            countDownLatch.await();
        }

    2), get

    @Test
        public void testGet() throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            // index, type, id
            GetRequest request = new GetRequest("test", "doc", "1");
    
            // 可选的添加参数
            // disabled _source retrieval
    //        request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
    
            // Configure source inclusion for specific fields
    //        String[] includes = new String[]{"message", "*Data"};
    //        String[] excludes = Strings.EMPTY_ARRAY;
    //        FetchSourceContext fetchSourceContext =
    //                new FetchSourceContext(true, includes, excludes);
    //        request.fetchSourceContext(fetchSourceContext);
    
            // Configure source exclusion for specific fields
    //        request.storedFields("message");
    
            request.routing("routing");
            request.parent("parent");
            request.preference("preference");
            request.realtime(false);
            request.refresh(true);
            request.version(2);
            request.versionType(VersionType.EXTERNAL);
    
            // 对response处理
            client.getAsync(request, new ActionListener<GetResponse>() {
                @Override
                public void onResponse(GetResponse getResponse) {
                    String type = getResponse.getType();
                    String index = getResponse.getIndex();
                    String id = getResponse.getId();
                    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
                    sourceAsMap.entrySet().forEach(entry -> System.out.println(entry.getKey() + " : " + entry.getValue()));
    
                    countDownLatch.countDown();
                }
    
                @Override
                public void onFailure(Exception e) {
                    // 可能的异常, 1, 不存在, 2 conflict
                    if (e instanceof ElasticsearchException) {
                        if (((ElasticsearchException) e).status() == RestStatus.NOT_FOUND) {
                            System.out.println("要找的id不存在");
                        }
    
                        if (((ElasticsearchException) e).status() == RestStatus.CONFLICT) {
                            System.out.println("conflict ");
                        }
                    }
                    countDownLatch.countDown();
                }
            });
            countDownLatch.await();
        }

    3), update

    @Test
        public void testResponse() throws IOException {
            XContentBuilder builder = XContentFactory.jsonBuilder()
                    .startObject()
                    .timeField("updated", new Date())
                    .field("reason", "daily update")
                    .endObject();
            UpdateRequest request = new UpdateRequest("posts", "doc", "1")
                    .doc(builder);
    
            client.updateAsync(request, new ActionListener<UpdateResponse>() {
                @Override
                public void onResponse(UpdateResponse updateResponse) {
                    String index = updateResponse.getIndex();
                    String type = updateResponse.getType();
                    String id = updateResponse.getId();
                    long version = updateResponse.getVersion();
                    if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    
                    } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    
                    } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
    
                    } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
    
                    }
    
                    GetResult result = updateResponse.getGetResult();
                    if (result.isExists()) {
                        String sourceAsString = result.sourceAsString();
                        Map<String, Object> sourceAsMap = result.sourceAsMap();
                        byte[] sourceAsBytes = result.source();
                    } else {
    
                    }
    
                    ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
                    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    
                    }
                    if (shardInfo.getFailed() > 0) {
                        for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                            String reason = failure.reason();
                        }
                    }
                }
    
                @Override
                public void onFailure(Exception e) {
    
                }
            });
        }

    4), delete

    @Test
        public void testDelete() {
            DeleteRequest deleteRequest = new DeleteRequest("test", "doc", "3");
    
            client.deleteAsync(deleteRequest, new ActionListener<DeleteResponse>() {
                @Override
                public void onResponse(DeleteResponse deleteResponse) {
                    String index = deleteResponse.getIndex();
                    String type = deleteResponse.getType();
                    String id = deleteResponse.getId();
                    long version = deleteResponse.getVersion();
                    ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
                    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    
                    }
                    if (shardInfo.getFailed() > 0) {
                        for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                            String reason = failure.reason();
                        }
                    }
                }
    
                @Override
                public void onFailure(Exception e) {
                    // 可能的异常, 1, 不存在, 2 conflict
                    if (e instanceof ElasticsearchException) {
                        if (((ElasticsearchException) e).status() == RestStatus.NOT_FOUND) {
                            System.out.println("id不存在");
                        }
                        if (((ElasticsearchException) e).status() == RestStatus.CONFLICT) {
                            System.out.println("conflict ");
                        }
                    }
                }
            });
    
        }

    5), bulk

     @Test
        public void testBulk() {
            BulkRequest request = new BulkRequest();
            request.add(new DeleteRequest("posts", "doc", "3"));
            request.add(new UpdateRequest("posts", "doc", "2")
                    .doc(XContentType.JSON,"other", "test"));
            request.add(new IndexRequest("posts", "doc", "4")
                    .source(XContentType.JSON,"field", "baz"));
    
            // 可选的参数
            request.timeout(TimeValue.timeValueMinutes(2));
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
            request.waitForActiveShards(2);
    
            // 异步处理
            client.bulkAsync(request, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkItemResponses) {
                    for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
                        DocWriteResponse itemResponse = bulkItemResponse.getResponse();
    
                        if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                            IndexResponse indexResponse = (IndexResponse) itemResponse;
    
                        } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                            UpdateResponse updateResponse = (UpdateResponse) itemResponse;
    
                        } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                            DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        }
                        // 失败的返回
                        if (bulkItemResponse.isFailed()) {
                            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
    
                        }
                    }
                }
    
                @Override
                public void onFailure(Exception e) {
    
                }
            });
        }

    6), bulkprocess

    @Test
        public void testBulkProcess() throws InterruptedException {
            // create bulkprocessor
            BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    int numberOfActions = request.numberOfActions();
                    logger.debug("Executing bulk [{}] with {} requests",
                            executionId, numberOfActions);
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    if (response.hasFailures()) {
                        logger.warn("Bulk [{}] executed with failures", executionId);
                    } else {
                        logger.debug("Bulk [{}] completed in {} milliseconds",
                                executionId, response.getTook().getMillis());
                    }
                }
    
                // when failure, will be called
                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    logger.error("Failed to execute bulk", failure);
                }
            });
    
            // 添加参数
            builder.setBulkActions(500);        // 刷新时间
            builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));        // 刷新长度
            builder.setConcurrentRequests(0);       // 并发度
            builder.setFlushInterval(TimeValue.timeValueSeconds(10L));      // 刷新周期
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
    
            BulkProcessor bulkProcessor = builder.build();
    
            // 添加批量执行数据
            IndexRequest one = new IndexRequest("posts", "doc", "1").
                    source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
            IndexRequest two = new IndexRequest("posts", "doc", "2")
                    .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
            IndexRequest three = new IndexRequest("posts", "doc", "3")
                    .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
    
            bulkProcessor.add(one);
            bulkProcessor.add(two);
            bulkProcessor.add(three);
    
            boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
            bulkProcessor.close();
        }

    7), multiget

    @Test
        public void testMultiGet() {
            MultiGetRequest request = new MultiGetRequest();
            request.add(new MultiGetRequest.Item("index", "type", "example_id"));
            request.add(new MultiGetRequest.Item("index", "type", "another_id"));
    
            client.multiGetAsync(request, new ActionListener<MultiGetResponse>() {
                @Override
                public void onResponse(MultiGetResponse multiGetItemResponses) {
                    MultiGetItemResponse firstItem = multiGetItemResponses.getResponses()[0];
                    assertNull(firstItem.getFailure());
                    GetResponse firstGet = firstItem.getResponse();
                    String index = firstItem.getIndex();
                    String type = firstItem.getType();
                    String id = firstItem.getId();
                    if (firstGet.isExists()) {
                        long version = firstGet.getVersion();
                        String sourceAsString = firstGet.getSourceAsString();
                        Map<String, Object> sourceAsMap = firstGet.getSourceAsMap();
                        byte[] sourceAsBytes = firstGet.getSourceAsBytes();
                    } else {
    
                    }
                }
    
                @Override
                public void onFailure(Exception e) {
    
                }
            });
    
        }

     

  • 相关阅读:
    GitLab版本管理
    Failed to add reference to 'System.Net.Http'. Please make sure that it is in the Global Assembly Cache.
    在CentOS 6.3中安装拼音输入法
    Yum Error Another app is currently holding the yum lock; waiting for it to exit
    Centos使用光盘作为本地yum源
    Linux操作系统桌面环境GNOME和KDE的切换
    CentOS安装VMware Tools
    CentOS 加载/挂载光驱
    svn Couldn't open rep-cache database
    linux内存排查工具valgrind
  • 原文地址:https://www.cnblogs.com/wenbronk/p/9394856.html
Copyright © 2011-2022 走看看