zoukankan      html  css  js  c++  java
  • ES之6:restHighLevelClient源码

    本文讨论的是JAVA High Level Rest Client向ElasticSearch6.3.2发送请求(index操作、update、delete……)的一个详细过程的理解,主要涉及到Rest Client如何选择哪一台Elasticsearch服务器发起请求。

    maven依赖如下:

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>6.3.2</version>
    </dependency>
    

    High Level Rest Client 为这些请求提供了两套接口:同步和异步,异步接口以Async结尾。以update请求为例,如下:

    官方也提供了详细的示例来演示如何使用这些API:java-rest-high,在使用之前需要先初始化一个RestHighLevelClient 然后就可以参考API文档开发了。RestHighLevelClient 底层封装的是一个http连接池,当需要执行 update、index、delete操作时,直接从连接池中取出一个连接,然后发送http请求到ElasticSearch服务端,服务端基于Netty接收请求。

    The high-level client will internally create the low-level client used to perform requests based on the provided builder. That low-level client maintains a pool of connections 
    

    本文的主要内容是探究一下 index/update/delete请求是如何一步步构造,并发送到ElasticSearch服务端的,并重点探讨选择向哪个ElasticSearch服务器发送请求的 round robin 算法

    以update请求为例:构造了update请求后:执行esClient.update(updateRequest);发起请求:

    updateRequest.doc(XContentFactory.jsonBuilder().startObject().field(fieldName, val).endObject());
                UpdateResponse response = esClient.update(updateRequest);
    

    最终会执行到performRequest(),index、delete请求最终也是执行到这个方法:

        /**
         * Sends a request to the Elasticsearch cluster that the client points to. Blocks until the request is completed and returns
         * its response or fails by throwing an exception. Selects a host out of the provided ones in a round-robin fashion. Failing hosts
         * are marked dead and retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times
         * they previously failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead
         * nodes that deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown.
         *
         *
         */
        public Response performRequest(String method, String endpoint, Map<String, String> params,
                                       HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                       Header... headers) throws IOException {
            SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
            performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory,
                listener, headers);
            return listener.get();
        }
    

    看这个方法的注释,向Elasticsearch cluster发送请求,并等待响应。等待响应就是通过创建一个SyncResponseListener,然后执行performRequestAsyncNoCatch先异步把HTTP请求发送出去,然后SyncResponseListener等待获取请求的响应结果,即:listener.get();阻塞等待直到拿到HTTP请求的响应结果。

    performRequestAsyncNoCatch()里面调用的内容如下:

    client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback<HttpResponse>() {
                @Override
                public void completed(HttpResponse httpResponse) {
    

    也就是CloseableHttpAsyncClient的execute()方法向ElasticSearch服务端发起了HTTP请求。(rest-high-level client封装的底层http连接池)

    以上就是:ElasticSearch JAVA High Level 同步方法的具体执行过程。总结起来就二句:performRequestAsyncNoCatch异步发送请求,SyncResponseListener阻塞获取响应结果。异步方法的执行方式也是类似的。

    这篇文章中提到,ElasticSearch集群中每个节点默认都是Coordinator 节点,可以接收Client的请求。因为在创建ElasticSearch JAVA High Level 时,一般会配置多个IP地址,如下就配置了三台:

    //	    es中默认 每个节点都是 coordinating node
                String[] nodes = clusterNode.split(",");
                HttpHost host_0 = new HttpHost(nodes[0].split(":")[0], Integer.parseInt(nodes[0].split(":")[1]), "http");
                HttpHost host_1 = new HttpHost(nodes[1].split(":")[0], Integer.parseInt(nodes[1].split(":")[1]), "http");
                HttpHost host_2 = new HttpHost(nodes[2].split(":")[0], Integer.parseInt(nodes[2].split(":")[1]), "http");
                restHighLevelClient = new RestHighLevelClient(RestClient.builder(host_0, host_1, host_2));
    

    那么,Client在发起HTTP请求时,到底是请求到了哪台ElasticSearch服务器上呢?这就是本文想要讨论的问题。

    而发送请求主要由RestClient实现,看看这个类的源码注释,里面就提到了**sending a request, a host gets selected out of the provided ones in a round-robin fashion. **

    /**
     * Client that connects to an Elasticsearch cluster through HTTP.
     * The hosts that are part of the cluster need to be provided at creation time, but can also be replaced later
     * The method {@link #performRequest(String, String, Map, HttpEntity, Header...)} allows to send a request to the cluster. When
     * sending a request, a host gets selected out of the provided ones in a round-robin fashion. Failing hosts are marked dead and
     * retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times they previously
     * failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead nodes that
     * deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown.
     * <p>
     * Requests can be either synchronous or asynchronous. The asynchronous variants all end with {@code Async}.
     * <p>
     */
    public class RestClient implements Closeable {
        
        //一些代码
        
        
            /**
         * {@code HostTuple} enables the {@linkplain HttpHost}s and {@linkplain AuthCache} to be set together in a thread
         * safe, volatile way.
         */
        private static class HostTuple<T> {
            final T hosts;
            final AuthCache authCache;
    
            HostTuple(final T hosts, final AuthCache authCache) {
                this.hosts = hosts;
                this.authCache = authCache;
            }
        }
    }
        
    

    HostTuple是RestClient是静态内部类,封装在配置文件中配置的ElasticSearch集群中各台机器的IP地址和端口。

    因此,对于Client而言,存在2个问题:

    1. 怎样选一台“可靠的”机器,然后放心地把我的请求交给它?
    2. 如果Client端的请求量非常大,不能老是把请求都往ElasticSearch某一台服务器发,应该要考虑一下负载均衡。

    其实具体的算法实现细节我也没有深入去研究理解,不过把这两个问题抽象出来,其实在很多场景中都能碰到。

    客户端想要连接服务端,服务器端提供了很多主机可供选择,我应该需要考虑哪些因素,选一台合适的主机连接?

    performRequestAsync方法的参数中,会调用RestClient类的netxtHost():方法,选择合适的ElasticSearch服务器IP进行连接。

    void performRequestAsyncNoCatch(String method, String endpoint, Map<String, String> params,
                                        HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                        ResponseListener responseListener, Header... headers) {
        
        //省略其他无关代码
            performRequestAsync(startTime, nextHost(), request, ignoreErrorCodes, httpAsyncResponseConsumerFactory,
                    failureTrackingResponseListener);
    }
    
     /**
         * Returns an {@link Iterable} of hosts to be used for a request call.
         * Ideally, the first host is retrieved from the iterable and used successfully for the request.
         * Otherwise, after each failure the next host has to be retrieved from the iterator so that the request can be retried until
         * there are no more hosts available to retry against. The maximum total of attempts is equal to the number of hosts in the iterable.
         * The iterator returned will never be empty. In case there are no healthy hosts available, or dead ones to be be retried,
         * one dead host gets returned so that it can be retried.
         */
        private HostTuple<Iterator<HttpHost>> nextHost() {
    

    nextHost()方法的大致逻辑如下:

    do{
        //先从HostTuple中拿到ElasticSearch集群配置的主机信息
        //....
        
        if (filteredHosts.isEmpty()) {
            //last resort: if there are no good hosts to use, return a single dead one, the one that's closest to being retried
            //所有的主机都不可用,那就死马当活马医
            HttpHost deadHost = sortedHosts.get(0).getKey();
            nextHosts = Collections.singleton(deadHost);
        }else{
            List<HttpHost> rotatedHosts = new ArrayList<>(filteredHosts);
            //rotate()方法选取最适合连接的主机
                    Collections.rotate(rotatedHosts, rotatedHosts.size() - lastHostIndex.getAndIncrement());
                    nextHosts = rotatedHosts;
        }
        
    }while(nextHosts.isEmpty())
    

    选择ElasticSearch主机连接主要是由rotate()实现的。该方法里面又有2种实现,具体代码就不贴了,看注释:

        /**
         * Rotates the elements in the specified list by the specified distance.
         * After calling this method, the element at index <tt>i</tt> will be
         * the element previously at index <tt>(i - distance)</tt> mod
         * <tt>list.size()</tt>, for all values of <tt>i</tt> between <tt>0</tt>
         * and <tt>list.size()-1</tt>, inclusive.  (This method has no effect on
         * the size of the list.)
         *
         * <p>For example, suppose <tt>list</tt> comprises<tt> [t, a, n, k, s]</tt>.
         * After invoking <tt>Collections.rotate(list, 1)</tt> (or
         * <tt>Collections.rotate(list, -4)</tt>), <tt>list</tt> will comprise
         * <tt>[s, t, a, n, k]</tt>.
         *
         * <p>Note that this method can usefully be applied to sublists to
         * move one or more elements within a list while preserving the
         * order of the remaining elements.  For example, the following idiom
         * moves the element at index <tt>j</tt> forward to position
         * <tt>k</tt> (which must be greater than or equal to <tt>j</tt>):
         * <pre>
         *     Collections.rotate(list.subList(j, k+1), -1);
         * </pre>
         * To make this concrete, suppose <tt>list</tt> comprises
         * <tt>[a, b, c, d, e]</tt>.  To move the element at index <tt>1</tt>
         * (<tt>b</tt>) forward two positions, perform the following invocation:
         * <pre>
         *     Collections.rotate(l.subList(1, 4), -1);
         * </pre>
         * The resulting list is <tt>[a, c, d, b, e]</tt>.
         *
         * <p>To move more than one element forward, increase the absolute value
         * of the rotation distance.  To move elements backward, use a positive
         * shift distance.
         *
         * <p>If the specified list is small or implements the {@link
         * RandomAccess} interface, this implementation exchanges the first
         * element into the location it should go, and then repeatedly exchanges
         * the displaced element into the location it should go until a displaced
         * element is swapped into the first element.  If necessary, the process
         * is repeated on the second and successive elements, until the rotation
         * is complete.  If the specified list is large and doesn't implement the
         * <tt>RandomAccess</tt> interface, this implementation breaks the
         * list into two sublist views around index <tt>-distance mod size</tt>.
         * Then the {@link #reverse(List)} method is invoked on each sublist view,
         * and finally it is invoked on the entire list.  For a more complete
         * description of both algorithms, see Section 2.3 of Jon Bentley's
         * <i>Programming Pearls</i> (Addison-Wesley, 1986).
         *
         */
        public static void rotate(List<?> list, int distance) {
            if (list instanceof RandomAccess || list.size() < ROTATE_THRESHOLD)
                rotate1(list, distance);
            else
                rotate2(list, distance);
        }
    

    关于的httpclient的定制化配置,在上述源码中可知,通过setRequestConfigCallback和setHttpClientConfigCallback进行定制化配置:

    @Bean
        public RestHighLevelClient restHighLevelClient(){
            //解析hostlist配置信息
            String[] split = hostlist.split(",");
            //创建HttpHost数组,其中存放es主机和端口的配置信息
            HttpHost[] httpHostArray = new HttpHost[split.length];
            for(int i=0;i<split.length;i++){
                String item = split[i];
                httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
            }
            //创建RestHighLevelClient客户端
            //return new RestHighLevelClient(RestClient.builder(httpHostArray));//.setMaxRetryTimeoutMillis(5 * 60 * 1000)); //超时时间设为5分钟);
            RestClientBuilder builder = RestClient.builder(httpHostArray);
    
            builder.setRequestConfigCallback(requestConfigBuilder -> {
                requestConfigBuilder.setConnectTimeout(connectTimeoutMillis);
                requestConfigBuilder.setSocketTimeout(socketTimeoutMillis);
                requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeoutMillis);
                return requestConfigBuilder;
            });
    
            builder.setHttpClientConfigCallback(httpClientBuilder -> {
                httpClientBuilder.setMaxConnTotal(maxConnectTotal);
                httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
                return httpClientBuilder;
            });
    
    
            return new RestHighLevelClient(builder);
            
        }

    里面的5个变量,通过spring的@Value注入。

     变量的用法见《httpclient的几个重要参数,及httpclient连接池的重要参数说明

     
  • 相关阅读:
    ./sample_mnist: error while loading shared libraries: libnvinfer.so.4: cannot open shared object file: No such file or directory
    Unable to correct problems, you have held broken packages
    `TypeError: torch.mm received an invalid combination of arguments
    error: ‘module’ object has no attribute ‘_rebuild_tensor_v2’
    cafee编译错误几个总结
    对yolo与fasterrcnn anchors的理解
    msf提权常用命令
    解析漏洞总结
    webshell方法总结
    XSS之会话劫持
  • 原文地址:https://www.cnblogs.com/duanxz/p/5206304.html
Copyright © 2011-2022 走看看