zoukankan      html  css  js  c++  java
  • ES bulk源码分析——ES 5.0

    对bulk request的处理流程:

    1、遍历所有的request,对其做一些加工,主要包括:获取routing(如果mapping里有的话)、指定的timestamp(如果没有带timestamp会使用当前时间),如果没有指定id字段,在action.bulk.action.allow_id_generation配置为true的情况下,会自动生成一个base64UUID作为id字段,并会将request的opType字段置为CREATE,因为如果是使用es自动生成的id的话,默认就是createdocument而不是updatedocument。(注:坑爹啊,我从github上面下的最新的ES代码,发现自动生成id这一段已经没有设置opType字段了,看起来和有指定id是一样的处理逻辑了,见https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java)。

    2、创建一个shardId--> Operation的Map,再次遍历所有的request,获取获取每个request应该发送到的shardId,获取的过程是这样的:request有routing就直接返回,如果没有,会先对id求一个hash,这里的hash函数默认是Murmur3,当然你也可以通过配置index.legacy.routing.hash.type来决定使用的hash函数,决定发到哪个shard:

    return MathUtils.mod(hash, indexMetaData.getNumberOfShards()); 注意:最新版ES代码已经改变!

    即用hash对shard的总数求模来获取shardId,将shardId作为key,通过遍历的index和request组成BulkItemRequest的集合作为value放入之前说的map中(为什么要拿到遍历的index,因为在bulk response中可以看到对每个request的请求处理结果的),其实说了这么多就是要对request按shard来分组(为负载均衡)。

    3、遍历上面得到的map,对不同的分组创建一个bulkShardRequest,包含配置consistencyLevel和timeout。并从集群state中获得primary shard,如果primary在本机就直接执行,如果不在会再发送到其shard所在的node。

    源码位置:https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

        void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
            final ClusterState clusterState = clusterService.state();
            // TODO use timeout to wait here if its blocked...
            clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
    
            final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
            MetaData metaData = clusterState.metaData();
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest docWriteRequest = bulkRequest.requests.get(i);
                //the request can only be null because we set it to null in the previous step, so it gets ignored
                if (docWriteRequest == null) {
                    continue;
                }
                if (addFailureIfIndexIsUnavailable(docWriteRequest, bulkRequest, responses, i, concreteIndices, metaData)) {
                    continue;
                }
                Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
                try {
                    switch (docWriteRequest.opType()) {
                        case CREATE:
                        case INDEX:
                            IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                            MappingMetaData mappingMd = null;
                            final IndexMetaData indexMetaData = metaData.index(concreteIndex);
                            if (indexMetaData != null) {
                                mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
                            }
                            indexRequest.resolveRouting(metaData);
                            indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
                            break;
                        case UPDATE:
                            TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
                            break;
                        case DELETE:
                            TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest) docWriteRequest);
                            break;
                        default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
                    }
                } catch (ElasticsearchParseException | RoutingMissingException e) {
                    BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e);
                    BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
                    responses.set(i, bulkItemResponse);
                    // make sure the request gets never processed again
                    bulkRequest.requests.set(i, null);
                }
            }
    
            // first, go over all the requests and create a ShardId -> Operations mapping
            Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest request = bulkRequest.requests.get(i);
                if (request == null) {
                    continue;
                }
                String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
                ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
                List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
                shardRequests.add(new BulkItemRequest(i, request));
            }
    
            if (requestsByShard.isEmpty()) {
                listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
                return;
            }
    
            final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
            String nodeId = clusterService.localNode().getId();
            for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
                final ShardId shardId = entry.getKey();
                final List<BulkItemRequest> requests = entry.getValue();
                BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
                        requests.toArray(new BulkItemRequest[requests.size()]));
                bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
                bulkShardRequest.timeout(bulkRequest.timeout());
                if (task != null) {
                    bulkShardRequest.setParentTask(nodeId, task.getId());
                }
                shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
                    @Override
                    public void onResponse(BulkShardResponse bulkShardResponse) {
                        for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
                            // we may have no response if item failed
                            if (bulkItemResponse.getResponse() != null) {
                                bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                            }
                            responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                        }
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }
                });
            }
        }

    路由代码:

    ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();

  • 相关阅读:
    知乎高赞:假如我有500w存进余额宝,可以每天坐着等吃吗?
    为什么想举办一场 中国深圳•测试开发大会
    2020 年互联网大厂薪资出炉!老夫酸了.......
    再见,我亲手创办的公司
    互联网公司的年会也太太太刺激了吧!
    python环境安装及配置
    Python isdigit()方法
    购物车程序优化2
    python中常见的报错信息
    pycharm中的快捷键
  • 原文地址:https://www.cnblogs.com/bonelee/p/6078947.html
Copyright © 2011-2022 走看看