zoukankan      html  css  js  c++  java
  • ES之6:restHighLevelClient源码

    本文讨论的是JAVA High Level Rest Client向ElasticSearch6.3.2发送请求(index操作、update、delete……)的一个详细过程的理解,主要涉及到Rest Client如何选择哪一台Elasticsearch服务器发起请求。

    maven依赖如下:

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>6.3.2</version>
    </dependency>
    

    High Level Rest Client 为这些请求提供了两套接口:同步和异步,异步接口以Async结尾。以update请求为例,如下:

    官方也提供了详细的示例来演示如何使用这些API:java-rest-high,在使用之前需要先初始化一个RestHighLevelClient 然后就可以参考API文档开发了。RestHighLevelClient 底层封装的是一个http连接池,当需要执行 update、index、delete操作时,直接从连接池中取出一个连接,然后发送http请求到ElasticSearch服务端,服务端基于Netty接收请求。

    The high-level client will internally create the low-level client used to perform requests based on the provided builder. That low-level client maintains a pool of connections 
    

    本文的主要内容是探究一下 index/update/delete请求是如何一步步构造,并发送到ElasticSearch服务端的,并重点探讨选择向哪个ElasticSearch服务器发送请求的 round robin 算法

    以update请求为例:构造了update请求后:执行esClient.update(updateRequest);发起请求:

    updateRequest.doc(XContentFactory.jsonBuilder().startObject().field(fieldName, val).endObject());
                UpdateResponse response = esClient.update(updateRequest);
    

    最终会执行到performRequest(),index、delete请求最终也是执行到这个方法:

        /**
         * Sends a request to the Elasticsearch cluster that the client points to. Blocks until the request is completed and returns
         * its response or fails by throwing an exception. Selects a host out of the provided ones in a round-robin fashion. Failing hosts
         * are marked dead and retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times
         * they previously failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead
         * nodes that deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown.
         *
         *
         */
        public Response performRequest(String method, String endpoint, Map<String, String> params,
                                       HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                       Header... headers) throws IOException {
            SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
            performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory,
                listener, headers);
            return listener.get();
        }
    

    看这个方法的注释,向Elasticsearch cluster发送请求,并等待响应。等待响应就是通过创建一个SyncResponseListener,然后执行performRequestAsyncNoCatch先异步把HTTP请求发送出去,然后SyncResponseListener等待获取请求的响应结果,即:listener.get();阻塞等待直到拿到HTTP请求的响应结果。

    performRequestAsyncNoCatch()里面调用的内容如下:

    client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback<HttpResponse>() {
                @Override
                public void completed(HttpResponse httpResponse) {
    

    也就是CloseableHttpAsyncClient的execute()方法向ElasticSearch服务端发起了HTTP请求。(rest-high-level client封装的底层http连接池)

    以上就是:ElasticSearch JAVA High Level 同步方法的具体执行过程。总结起来就二句:performRequestAsyncNoCatch异步发送请求,SyncResponseListener阻塞获取响应结果。异步方法的执行方式也是类似的。

    这篇文章中提到,ElasticSearch集群中每个节点默认都是Coordinator 节点,可以接收Client的请求。因为在创建ElasticSearch JAVA High Level 时,一般会配置多个IP地址,如下就配置了三台:

    //	    es中默认 每个节点都是 coordinating node
                String[] nodes = clusterNode.split(",");
                HttpHost host_0 = new HttpHost(nodes[0].split(":")[0], Integer.parseInt(nodes[0].split(":")[1]), "http");
                HttpHost host_1 = new HttpHost(nodes[1].split(":")[0], Integer.parseInt(nodes[1].split(":")[1]), "http");
                HttpHost host_2 = new HttpHost(nodes[2].split(":")[0], Integer.parseInt(nodes[2].split(":")[1]), "http");
                restHighLevelClient = new RestHighLevelClient(RestClient.builder(host_0, host_1, host_2));
    

    那么,Client在发起HTTP请求时,到底是请求到了哪台ElasticSearch服务器上呢?这就是本文想要讨论的问题。

    而发送请求主要由RestClient实现,看看这个类的源码注释,里面就提到了**sending a request, a host gets selected out of the provided ones in a round-robin fashion. **

    /**
     * Client that connects to an Elasticsearch cluster through HTTP.
     * The hosts that are part of the cluster need to be provided at creation time, but can also be replaced later
     * The method {@link #performRequest(String, String, Map, HttpEntity, Header...)} allows to send a request to the cluster. When
     * sending a request, a host gets selected out of the provided ones in a round-robin fashion. Failing hosts are marked dead and
     * retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times they previously
     * failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead nodes that
     * deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown.
     * <p>
     * Requests can be either synchronous or asynchronous. The asynchronous variants all end with {@code Async}.
     * <p>
     */
    public class RestClient implements Closeable {
        
        //一些代码
        
        
            /**
         * {@code HostTuple} enables the {@linkplain HttpHost}s and {@linkplain AuthCache} to be set together in a thread
         * safe, volatile way.
         */
        private static class HostTuple<T> {
            final T hosts;
            final AuthCache authCache;
    
            HostTuple(final T hosts, final AuthCache authCache) {
                this.hosts = hosts;
                this.authCache = authCache;
            }
        }
    }
        
    

    HostTuple是RestClient是静态内部类,封装在配置文件中配置的ElasticSearch集群中各台机器的IP地址和端口。

    因此,对于Client而言,存在2个问题:

    1. 怎样选一台“可靠的”机器,然后放心地把我的请求交给它?
    2. 如果Client端的请求量非常大,不能老是把请求都往ElasticSearch某一台服务器发,应该要考虑一下负载均衡。

    其实具体的算法实现细节我也没有深入去研究理解,不过把这两个问题抽象出来,其实在很多场景中都能碰到。

    客户端想要连接服务端,服务器端提供了很多主机可供选择,我应该需要考虑哪些因素,选一台合适的主机连接?

    performRequestAsync方法的参数中,会调用RestClient类的netxtHost():方法,选择合适的ElasticSearch服务器IP进行连接。

    void performRequestAsyncNoCatch(String method, String endpoint, Map<String, String> params,
                                        HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                        ResponseListener responseListener, Header... headers) {
        
        //省略其他无关代码
            performRequestAsync(startTime, nextHost(), request, ignoreErrorCodes, httpAsyncResponseConsumerFactory,
                    failureTrackingResponseListener);
    }
    
     /**
         * Returns an {@link Iterable} of hosts to be used for a request call.
         * Ideally, the first host is retrieved from the iterable and used successfully for the request.
         * Otherwise, after each failure the next host has to be retrieved from the iterator so that the request can be retried until
         * there are no more hosts available to retry against. The maximum total of attempts is equal to the number of hosts in the iterable.
         * The iterator returned will never be empty. In case there are no healthy hosts available, or dead ones to be be retried,
         * one dead host gets returned so that it can be retried.
         */
        private HostTuple<Iterator<HttpHost>> nextHost() {
    

    nextHost()方法的大致逻辑如下:

    do{
        //先从HostTuple中拿到ElasticSearch集群配置的主机信息
        //....
        
        if (filteredHosts.isEmpty()) {
            //last resort: if there are no good hosts to use, return a single dead one, the one that's closest to being retried
            //所有的主机都不可用,那就死马当活马医
            HttpHost deadHost = sortedHosts.get(0).getKey();
            nextHosts = Collections.singleton(deadHost);
        }else{
            List<HttpHost> rotatedHosts = new ArrayList<>(filteredHosts);
            //rotate()方法选取最适合连接的主机
                    Collections.rotate(rotatedHosts, rotatedHosts.size() - lastHostIndex.getAndIncrement());
                    nextHosts = rotatedHosts;
        }
        
    }while(nextHosts.isEmpty())
    

    选择ElasticSearch主机连接主要是由rotate()实现的。该方法里面又有2种实现,具体代码就不贴了,看注释:

        /**
         * Rotates the elements in the specified list by the specified distance.
         * After calling this method, the element at index <tt>i</tt> will be
         * the element previously at index <tt>(i - distance)</tt> mod
         * <tt>list.size()</tt>, for all values of <tt>i</tt> between <tt>0</tt>
         * and <tt>list.size()-1</tt>, inclusive.  (This method has no effect on
         * the size of the list.)
         *
         * <p>For example, suppose <tt>list</tt> comprises<tt> [t, a, n, k, s]</tt>.
         * After invoking <tt>Collections.rotate(list, 1)</tt> (or
         * <tt>Collections.rotate(list, -4)</tt>), <tt>list</tt> will comprise
         * <tt>[s, t, a, n, k]</tt>.
         *
         * <p>Note that this method can usefully be applied to sublists to
         * move one or more elements within a list while preserving the
         * order of the remaining elements.  For example, the following idiom
         * moves the element at index <tt>j</tt> forward to position
         * <tt>k</tt> (which must be greater than or equal to <tt>j</tt>):
         * <pre>
         *     Collections.rotate(list.subList(j, k+1), -1);
         * </pre>
         * To make this concrete, suppose <tt>list</tt> comprises
         * <tt>[a, b, c, d, e]</tt>.  To move the element at index <tt>1</tt>
         * (<tt>b</tt>) forward two positions, perform the following invocation:
         * <pre>
         *     Collections.rotate(l.subList(1, 4), -1);
         * </pre>
         * The resulting list is <tt>[a, c, d, b, e]</tt>.
         *
         * <p>To move more than one element forward, increase the absolute value
         * of the rotation distance.  To move elements backward, use a positive
         * shift distance.
         *
         * <p>If the specified list is small or implements the {@link
         * RandomAccess} interface, this implementation exchanges the first
         * element into the location it should go, and then repeatedly exchanges
         * the displaced element into the location it should go until a displaced
         * element is swapped into the first element.  If necessary, the process
         * is repeated on the second and successive elements, until the rotation
         * is complete.  If the specified list is large and doesn't implement the
         * <tt>RandomAccess</tt> interface, this implementation breaks the
         * list into two sublist views around index <tt>-distance mod size</tt>.
         * Then the {@link #reverse(List)} method is invoked on each sublist view,
         * and finally it is invoked on the entire list.  For a more complete
         * description of both algorithms, see Section 2.3 of Jon Bentley's
         * <i>Programming Pearls</i> (Addison-Wesley, 1986).
         *
         */
        public static void rotate(List<?> list, int distance) {
            if (list instanceof RandomAccess || list.size() < ROTATE_THRESHOLD)
                rotate1(list, distance);
            else
                rotate2(list, distance);
        }
    

    关于的httpclient的定制化配置,在上述源码中可知,通过setRequestConfigCallback和setHttpClientConfigCallback进行定制化配置:

    @Bean
        public RestHighLevelClient restHighLevelClient(){
            //解析hostlist配置信息
            String[] split = hostlist.split(",");
            //创建HttpHost数组,其中存放es主机和端口的配置信息
            HttpHost[] httpHostArray = new HttpHost[split.length];
            for(int i=0;i<split.length;i++){
                String item = split[i];
                httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
            }
            //创建RestHighLevelClient客户端
            //return new RestHighLevelClient(RestClient.builder(httpHostArray));//.setMaxRetryTimeoutMillis(5 * 60 * 1000)); //超时时间设为5分钟);
            RestClientBuilder builder = RestClient.builder(httpHostArray);
    
            builder.setRequestConfigCallback(requestConfigBuilder -> {
                requestConfigBuilder.setConnectTimeout(connectTimeoutMillis);
                requestConfigBuilder.setSocketTimeout(socketTimeoutMillis);
                requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeoutMillis);
                return requestConfigBuilder;
            });
    
            builder.setHttpClientConfigCallback(httpClientBuilder -> {
                httpClientBuilder.setMaxConnTotal(maxConnectTotal);
                httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
                return httpClientBuilder;
            });
    
    
            return new RestHighLevelClient(builder);
            
        }

    里面的5个变量,通过spring的@Value注入。

     变量的用法见《httpclient的几个重要参数,及httpclient连接池的重要参数说明

     
  • 相关阅读:
    HDU 5744
    HDU 5815
    POJ 1269
    HDU 5742
    HDU 4609
    fzu 1150 Farmer Bill's Problem
    fzu 1002 HangOver
    fzu 1001 Duplicate Pair
    fzu 1150 Farmer Bill's Problem
    fzu 1182 Argus 优先队列
  • 原文地址:https://www.cnblogs.com/duanxz/p/5206304.html
Copyright © 2011-2022 走看看