zoukankan      html  css  js  c++  java
  • ElasticSearch-hadoop saveToEs源码分析

    ElasticSearch-hadoop saveToEs源码分析:

    类的调用路径关系为:

    EsSpark -> 
         EsRDDWriter -> 
               RestService -> 
                      RestRepository -> 
                                RestClient

    他们的作用:

    • EsSpark,读取ES和存储ES的入口
    • EsRDDWriter,调用RestService创建PartitionWriter,对ES进行数据写入
    • RestService,负责创建 RestRepository,PartitionWriter
    • RestRepository,bulk高层抽象,底层利用NetworkClient做真实的http bulk请求

    各个类对应的源码追踪如下:

    https://github.com/elastic/elasticsearch-hadoop/blob/2.1/spark/core/main/scala/org/elasticsearch/spark/rdd/EsSpark.scala

      def saveToEs(rdd: RDD[_], resource: String) { saveToEs(rdd, Map(ES_RESOURCE_WRITE -> resource)) }
      def saveToEs(rdd: RDD[_], resource: String, cfg: Map[String, String]) {
        saveToEs(rdd, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_WRITE -> resource))
      }
      def saveToEs(rdd: RDD[_], cfg: Map[String, String]) {
        CompatUtils.warnSchemaRDD(rdd, LogFactory.getLog("org.elasticsearch.spark.rdd.EsSpark"))
    
        if (rdd == null || rdd.partitions.length == 0) {
          return
        }
    
        val sparkCfg = new SparkSettingsManager().load(rdd.sparkContext.getConf)
        val config = new PropertiesSettings().load(sparkCfg.save())
        config.merge(cfg.asJava)
    
        rdd.sparkContext.runJob(rdd, new EsRDDWriter(config.save()).write _)
      }

    https://github.com/elastic/elasticsearch-hadoop/blob/2.1/spark/core/main/scala/org/elasticsearch/spark/rdd/EsRDDWriter.scala

      def write(taskContext: TaskContext, data: Iterator[T]) {
        val writer = RestService.createWriter(settings, taskContext.partitionId, -1, log)
    
        taskContext.addOnCompleteCallback(() => writer.close())
    
        if (runtimeMetadata) {
          writer.repository.addRuntimeFieldExtractor(metaExtractor)
        }
    
        while (data.hasNext) {
          writer.repository.writeToIndex(processData(data))
        }
      }

    https://github.com/elastic/elasticsearch-hadoop/blob/2.1/mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java

        public static PartitionWriter createWriter(Settings settings, int currentSplit, int totalSplits, Log log) {
            Version.logVersion();
    
            InitializationUtils.discoverEsVersion(settings, log);
            InitializationUtils.discoverNodesIfNeeded(settings, log);
            InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
            InitializationUtils.filterNonDataNodesIfNeeded(settings, log);
    
            List<String> nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
    
            // check invalid splits (applicable when running in non-MR environments) - in this case fall back to Random..
            int selectedNode = (currentSplit < 0) ? new Random().nextInt(nodes.size()) : currentSplit % nodes.size();
    
            // select the appropriate nodes first, to spread the load before-hand
            SettingsUtils.pinNode(settings, nodes.get(selectedNode));
    
            Resource resource = new Resource(settings, false);
    
            log.info(String.format("Writing to [%s]", resource));
    
            // single index vs multi indices
            IndexExtractor iformat = ObjectUtils.instantiate(settings.getMappingIndexExtractorClassName(), settings);
            iformat.compile(resource.toString());
    
            RestRepository repository = (iformat.hasPattern() ? initMultiIndices(settings, currentSplit, resource, log) : initSingleIndex(settings, currentSplit, resource, log));
    
            return new PartitionWriter(settings, currentSplit, totalSplits, repository);
        }

    https://github.com/elastic/elasticsearch-hadoop/blob/2.1/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java 

        /**
         * Writes the objects to index.
         *
         * @param object object to add to the index
         */
        public void writeToIndex(Object object) {
            Assert.notNull(object, "no object data given");
    
            lazyInitWriting();
            doWriteToIndex(command.write(object));
        }
        private void doWriteToIndex(BytesRef payload) {
            // check space first
            if (payload.length() > ba.available()) {
                if (autoFlush) {
                    flush();
                }
                else {
                    throw new EsHadoopIllegalStateException(
                            String.format("Auto-flush disabled and bulk buffer full; disable manual flush or increase capacity [current size %s]; bailing out", ba.capacity()));
                }
            }
    
            data.copyFrom(payload);
            payload.reset();
    
            dataEntries++;
            if (bufferEntriesThreshold > 0 && dataEntries >= bufferEntriesThreshold) {
                if (autoFlush) {
                    flush();
                }
                else {
                    // handle the corner case of manual flush that occurs only after the buffer is completely full (think size of 1)
                    if (dataEntries > bufferEntriesThreshold) {
                        throw new EsHadoopIllegalStateException(
                                String.format(
                                        "Auto-flush disabled and maximum number of entries surpassed; disable manual flush or increase capacity [current size %s]; bailing out",
                                        bufferEntriesThreshold));
                    }
                }
            }
        }
        public void flush() {
            BitSet bulk = tryFlush();
            if (!bulk.isEmpty()) {
                throw new EsHadoopException(String.format("Could not write all entries [%s/%s] (maybe ES was overloaded?). Bailing out...", bulk.cardinality(), bulk.size()));
            }
        }
        public BitSet tryFlush() {
            if (log.isDebugEnabled()) {
                log.debug(String.format("Sending batch of [%d] bytes/[%s] entries", data.length(), dataEntries));
            }
    
            BitSet bulkResult = EMPTY;
    
            try {
                // double check data - it might be a false flush (called on clean-up)
                if (data.length() > 0) {
                    bulkResult = client.bulk(resourceW, data);
                    executedBulkWrite = true;
                }
            } catch (EsHadoopException ex) {
                hadWriteErrors = true;
                throw ex;
            }
    
            // discard the data buffer, only if it was properly sent/processed
            //if (bulkResult.isEmpty()) {
            // always discard data since there's no code path that uses the in flight data
            discard();
            //}
    
            return bulkResult;
        }

    https://github.com/elastic/elasticsearch-hadoop/blob/2.1/mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java

        public BitSet bulk(Resource resource, TrackingBytesArray data) {
            Retry retry = retryPolicy.init();
            int httpStatus = 0;
    
            boolean isRetry = false;
    
            do {
                // NB: dynamically get the stats since the transport can change
                long start = network.transportStats().netTotalTime;
                Response response = execute(PUT, resource.bulk(), data);
                long spent = network.transportStats().netTotalTime - start;
    
                stats.bulkTotal++;
                stats.docsSent += data.entries();
                stats.bulkTotalTime += spent;
                // bytes will be counted by the transport layer
    
                if (isRetry) {
                    stats.docsRetried += data.entries();
                    stats.bytesRetried += data.length();
                    stats.bulkRetries++;
                    stats.bulkRetriesTotalTime += spent;
                }
    
                isRetry = true;
    
                httpStatus = (retryFailedEntries(response, data) ? HttpStatus.SERVICE_UNAVAILABLE : HttpStatus.OK);
            } while (data.length() > 0 && retry.retry(httpStatus));
    
            return data.leftoversPosition();
        }
  • 相关阅读:
    8. Django系列之上传文件与下载-djang为服务端,requests为客户端
    机器学习入门15
    机器学习入门14
    机器学习入门13
    机器学习入门12
    ML
    AI
    机器学习入门11
    机器学习入门10
    机器学习入门09
  • 原文地址:https://www.cnblogs.com/bonelee/p/6054199.html
Copyright © 2011-2022 走看看