zoukankan      html  css  js  c++  java
  • 03-springboot整合elasticsearch-源码初识

        前面两个小节已经知道了spring boot怎么整合es,以及es的简单使用,但是springboot中是怎么和es服务器交互的。我们可以简单了解一下。要看一下源码
    在看源码的同时,先要对springboot请求ES服务器的原理了解一下,ES官网(https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-update.html)给出了很详细的说明,可以自行进行了解。

    1.RestClient es交互的基础服务器

    对于单机es,一般使用的是ElasticsearchOperations

    1.1 数据存储的具体过程

    本质上还是使用ElasticsearchRestTemplate进行操作。
    数据存储发起操作

    1.存储数据
    String documentId = operations.index(indexQuery,indexCoordinates);
    
    2.index操作的过程
    @Override
    	public String index(IndexQuery query, IndexCoordinates index) {
    
    		maybeCallbackBeforeConvertWithQuery(query, index);//实体类再保存操作之前的回调方法
    
    		IndexRequest request = requestFactory.indexRequest(query, index);//==获取indexRequest的过程==
    		String documentId = execute(client -> client.index(request, RequestOptions.DEFAULT).getId());//==与ES服务器交互过程==
    
    		// We should call this because we are not going through a mapper.
    		Object queryObject = query.getObject();
    		if (queryObject != null) {
    			setPersistentEntityId(queryObject, documentId);
    		}
    
    		maybeCallbackAfterSaveWithQuery(query, index);//实体类再保存之后的回调方法
    
    		return documentId;
    	}
    
    

    查看IndexRequest的创建过程如下

    	public IndexRequest indexRequest(IndexQuery query, IndexCoordinates index) {
    		String indexName = index.getIndexName();
    
    		IndexRequest indexRequest;
    
    		if (query.getObject() != null) {
    			String id = StringUtils.isEmpty(query.getId()) ? getPersistentEntityId(query.getObject()) : query.getId();
    			// If we have a query id and a document id, do not ask ES to generate one.
    			if (id != null) {
    				indexRequest = new IndexRequest(indexName).id(id);
    			} else {
    				indexRequest = new IndexRequest(indexName);
    			}
                            /**
                             * 1.将传来的object转成Map,再转成json串
                             * 2.将Object的json串转成字节BytesReference,请求的ContentType设置为Request.JSON方式
                             */     
    			indexRequest.source(elasticsearchConverter.mapObject(query.getObject()).toJson(), Requests.INDEX_CONTENT_TYPE);
    		} 
                    // 省略一部分代码。。。。。
    		return indexRequest;
    	}
    

    请求数据处理过程

    public final IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
            return performRequestAndParseEntity(indexRequest, RequestConverters::index, options, IndexResponse::fromXContent, emptySet());
        }
    
    1.RequestConverters::index 这个过程创建Request对象
    
    static Request index(IndexRequest indexRequest) {
            String method = Strings.hasLength(indexRequest.id()) ? HttpPut.METHOD_NAME : HttpPost.METHOD_NAME; //根据有无ID选择传输方式是PUT还是POST
            boolean isCreate = (indexRequest.opType() == DocWriteRequest.OpType.CREATE);
            //拼接请求的uri
            String endpoint = endpoint(indexRequest.index(), indexRequest.type(), indexRequest.id(), isCreate ? "_create" : null);
            Request request = new Request(method, endpoint);
            //增加request的请求参数    
            Params parameters = new Params(request);
            parameters.withRouting(indexRequest.routing());
            parameters.withParent(indexRequest.parent());
            parameters.withTimeout(indexRequest.timeout());
            parameters.withVersion(indexRequest.version());
            parameters.withVersionType(indexRequest.versionType());
            parameters.withIfSeqNo(indexRequest.ifSeqNo());
            parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
            parameters.withPipeline(indexRequest.getPipeline());
            parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
            parameters.withWaitForActiveShards(indexRequest.waitForActiveShards(), ActiveShardCount.DEFAULT);
            //将请求的参数变成byte[]    
            BytesRef source = indexRequest.source().toBytesRef();
            ContentType contentType = createContentType(indexRequest.getContentType());
            request.setEntity(new NByteArrayEntity(source.bytes, source.offset, source.length, contentType));
            return request;
        }
    
    2.创建response
    
    3.创建空的集合 : new EmptySet<>();
    
    4. 给ES服务器发送数据
     
      private <Req, Resp> Resp internalPerformRequest(Req request,
                                                CheckedFunction<Req, Request, IOException> requestConverter,
                                                RequestOptions options,
                                                CheckedFunction<Response, Resp, IOException> responseConverter,
                                                Set<Integer> ignores) throws IOException {
            Request req = requestConverter.apply(request);
            req.setOptions(options);
            Response response;
            try {
                response = client.performRequest(req);
            } catch (ResponseException e) {
                if (ignores.contains(e.getResponse().getStatusLine().getStatusCode())) {
                    try {
                        return responseConverter.apply(e.getResponse());
                    } catch (Exception innerException) {
                        throw parseResponseException(e);
                    }
                }
                throw parseResponseException(e);
            }
    
            try {
                return responseConverter.apply(response);
            } catch(Exception e) {
                throw new IOException("Unable to parse response body for " + response, e);
            }
        }
    

    RestClien请求发送的过程

    1.发送请求过程
     public Response performRequest(Request request) throws IOException {
            SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
            performRequestAsyncNoCatch(request, listener);
            return listener.get();
        }
      //创建请求的url,创建request对象
     void performRequestAsyncNoCatch(Request request, ResponseListener listener) throws IOException {
            Map<String, String> requestParams = new HashMap<>(request.getParameters());
            String ignoreString = requestParams.remove("ignore");
            Set<Integer> ignoreErrorCodes;
            if (ignoreString == null) {
                if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
                    //404 never causes error if returned for a HEAD request
                    ignoreErrorCodes = Collections.singleton(404);
                } else {
                    ignoreErrorCodes = Collections.emptySet();
                }
            } else {
                String[] ignoresArray = ignoreString.split(",");
                ignoreErrorCodes = new HashSet<>();
                if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
                    //404 never causes error if returned for a HEAD request
                    ignoreErrorCodes.add(404);
                }
                for (String ignoreCode : ignoresArray) {
                    try {
                        ignoreErrorCodes.add(Integer.valueOf(ignoreCode));
                    } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e);
                    }
                }
            }
            URI uri = buildUri(pathPrefix, request.getEndpoint(), requestParams);//创建url和请求参数拼接
            HttpRequestBase httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());//创建request对象
            setHeaders(httpRequest, request.getOptions().getHeaders());
            FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
            long startTime = System.nanoTime();
            performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
                    request.getOptions().getWarningsHandler() == null ? warningsHandler : request.getOptions().getWarningsHandler(),
                    request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);//发送过程
        }
    
      //数据发送,完成后将响应信息封装进response对象中
     private void performRequestAsync(final long startTime, final NodeTuple<Iterator<Node>> nodeTuple, final HttpRequestBase request,
                                         final Set<Integer> ignoreErrorCodes,
                                         final WarningsHandler thisWarningsHandler,
                                         final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                         final FailureTrackingResponseListener listener) {
            final Node node = nodeTuple.nodes.next(); //获取注册的节点
            final HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(node.getHost(), request);
            final HttpAsyncResponseConsumer<HttpResponse> asyncResponseConsumer =
                httpAsyncResponseConsumerFactory.createHttpAsyncResponseConsumer();
            final HttpClientContext context = HttpClientContext.create();
            context.setAuthCache(nodeTuple.authCache);
            client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback<HttpResponse>() {
                @Override
                public void completed(HttpResponse httpResponse) { //执行完成后的回调方法
                    try {
                        RequestLogger.logResponse(logger, request, node.getHost(), httpResponse);
                        int statusCode = httpResponse.getStatusLine().getStatusCode();
                        Response response = new Response(request.getRequestLine(), node.getHost(), httpResponse);
                        if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
                            onResponse(node);
                            if (thisWarningsHandler.warningsShouldFailRequest(response.getWarnings())) {
                                listener.onDefinitiveFailure(new WarningFailureException(response));
                            } else {
                                listener.onSuccess(response);
                            }
                        } else {
                            ResponseException responseException = new ResponseException(response);
                            if (isRetryStatus(statusCode)) {
                                //mark host dead and retry against next one
                                onFailure(node);
                                retryIfPossible(responseException);
                            } else {
                                //mark host alive and don't retry, as the error should be a request problem
                                onResponse(node);
                                listener.onDefinitiveFailure(responseException);
                            }
                        }
                    } catch(Exception e) {
                        listener.onDefinitiveFailure(e);
                    }
                }
            });
        }
    
    
    
    
  • 相关阅读:
    单核时代,PHP之类多线程或者多进程的,是怎么处理并发的?是排队吗?
    高并发下的Node.js与负载均衡
    telnet 查看端口是否可访问
    同步与异步--
    函数式编程沉思录(草稿)
    面向状态机编程
    promise是有状态的moand
    异步链式编程—promise沉思录
    同步与异步
    网络编程释疑之:同步,异步,阻塞,非阻塞
  • 原文地址:https://www.cnblogs.com/perferect/p/13157229.html
Copyright © 2011-2022 走看看