zoukankan      html  css  js  c++  java
  • 生产者-消费者模型在Hudi中的应用

    介绍

    生产者-消费者模型用于解耦生产者与消费者,平衡两者之间的能力不平衡,该模型广泛应用于各个系统中,Hudi也使用了该模型控制对记录的处理,即记录会被生产者生产至队列中,然后由消费者从队列中消费,更具体一点,对于更新操作,生产者会将文件中老的记录放入队列中等待消费者消费,消费后交由HoodieMergeHandle处理;对于插入操作,生产者会将新记录放入队列中等待消费者消费,消费后交由HandleCreateHandle处理。

    入口

    前面的文章中提到过无论是HoodieCopyOnWriteTable#handleUpdate处理更新时直接生成了一个SparkBoundedInMemoryExecutor对象,还是HoodieCopyOnWriteTable#handleInsert处理插入时生成了一个CopyOnWriteLazyInsertIterable对象,再迭代时调用该对象的CopyOnWriteLazyInsertIterable#computeNext方法生成SparkBoundedInMemoryExecutor对象。最后两者均会调用SparkBoundedInMemoryExecutor#execute开始记录的处理,该方法核心代码如下

      public E execute() {
        try {
          ExecutorCompletionService<Boolean> producerService = startProducers();
          Future<E> future = startConsumer();
          // Wait for consumer to be done
          return future.get();
        } catch (Exception e) {
          throw new HoodieException(e);
        }
      }
    

    该方法会启动所有生产者和单个消费者进行处理。

    Hudi定义了BoundedInMemoryQueueProducer接口表示生产者,其子类实现如下

    • FunctionBasedQueueProducer,基于Function来生产记录,在合并日志log文件和数据parquet文件时使用,以便提供RealTimeView
    • IteratorBasedQueueProducer,基于迭代器来生产记录,在插入更新时使用。

    定义了BoundedInMemoryQueueConsumer类表示消费者,其主要子类实现如下

    • CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler,主要处理CopyOnWrite表类型时的插入。
      • MergeOnReadLazyInsertIterable$MergeOnReadInsertHandler,主要处理MergeOnRead

    表类型时的插入,其为CopyOnWriteInsertHandler的子类。

    • CopyOnWriteLazyInsertIterable$UpdateHandler,主要处理CopyOnWrite表类型时的更新。

    整个生产消费相关的类继承结构非常清晰。

    对于生产者的启动,startProducers方法核心代码如下

      public ExecutorCompletionService<Boolean> startProducers() {
        // Latch to control when and which producer thread will close the queue
        final CountDownLatch latch = new CountDownLatch(producers.size());
        final ExecutorCompletionService<Boolean> completionService =
            new ExecutorCompletionService<Boolean>(executorService);
        producers.stream().map(producer -> {
          return completionService.submit(() -> {
            try {
              preExecute();
              producer.produce(queue);
            } catch (Exception e) {
              logger.error("error producing records", e);
              queue.markAsFailed(e);
              throw e;
            } finally {
              synchronized (latch) {
                latch.countDown();
                if (latch.getCount() == 0) {
                  // Mark production as done so that consumer will be able to exit
                  queue.close();
                }
              }
            }
            return true;
          });
        }).collect(Collectors.toList());
        return completionService;
      }
    

    该方法使用CountDownLatch来协调生产者线程与消费者线程的退出动作,然后调用produce方法开始生产,对于插入更新时的IteratorBasedQueueProducer而言,其核心代码如下

      public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
        ...
        while (inputIterator.hasNext()) {
          queue.insertRecord(inputIterator.next());
        }
        ...
      }
    

    可以看到只要迭代器还有记录(可能为插入时的新记录或者更新时的旧记录),就会往队列中不断写入。

    对于消费者的启动,startConsumer方法的核心代码如下

      private Future<E> startConsumer() {
        return consumer.map(consumer -> {
          return executorService.submit(() -> {
            ...
            preExecute();
            try {
              E result = consumer.consume(queue);
              return result;
            } catch (Exception e) {
              queue.markAsFailed(e);
              throw e;
            }
          });
        }).orElse(CompletableFuture.completedFuture(null));
      }
    

    消费时会先进行执行前的准备,然后开始消费,其中consume方法的核心代码如下

      public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
        Iterator<I> iterator = queue.iterator();
    
        while (iterator.hasNext()) {
          consumeOneRecord(iterator.next());
        }
    
        // Notifies done
        finish();
    
        return getResult();
      }
    

    可以看到只要队列中还有记录,就可以获取该记录,然后调用不同BoundedInMemoryQueueConsumer子类的consumeOneRecord进行更新插入处理。

    值得一提的是Hudi对队列进行了流控,生产者不能无限制地将记录写入队列中,队列缓存的大小由用户配置,队列能放入记录的条数由采样的记录大小和队列缓存大小控制。

    在生产时,会调用BoundedInMemoryQueue#insertRecord将记录写入队列,其核心代码如下

      public void insertRecord(I t) throws Exception {
        ...
        rateLimiter.acquire();
        // We are retrieving insert value in the record queueing thread to offload computation
        // around schema validation
        // and record creation to it.
        final O payload = transformFunction.apply(t);
        adjustBufferSizeIfNeeded(payload);
        queue.put(Option.of(payload));
      }
    

    首先获取一个许可(Semaphore),未成功获取会被阻塞直至成功获取,然后获取记录的负载以便调整队列,然后放入内部队列(LinkedBlockingQueue)中,其中adjustBufferSizeIfNeeded方法的核心代码如下

      private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
        if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
          return;
        }
    
        final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
        final long newAvgRecordSizeInBytes =
            Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
        final int newRateLimit =
            (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));
    
        // If there is any change in number of records to cache then we will either release (if it increased) or acquire
        // (if it decreased) to adjust rate limiting to newly computed value.
        if (newRateLimit > currentRateLimit) {
          rateLimiter.release(newRateLimit - currentRateLimit);
        } else if (newRateLimit < currentRateLimit) {
          rateLimiter.acquire(currentRateLimit - newRateLimit);
        }
        currentRateLimit = newRateLimit;
        avgRecordSizeInBytes = newAvgRecordSizeInBytes;
        numSamples++;
      }
    

    首先看是否已经达到采样频率,然后计算新的记录平均大小和限流速率,如果新的限流速率大于当前速率,则可释放一些许可(供阻塞的生产者获取后继续生产),否则需要获取(回收)一些许可(许可变少后生产速率自然就降低了)。该操作可根据采样的记录大小动态调节速率,不至于在记录负载太大和记录负载太小时,放入同等个数,从而起到动态调节作用。

    在消费时,会调用BoundedInMemoryQueue#readNextRecord读取记录,其核心代码如下

      private Option<O> readNextRecord() {
        ...
        rateLimiter.release();
        Option<O> newRecord = Option.empty();
        while (expectMoreRecords()) {
          try {
            throwExceptionIfFailed();
            newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
            if (newRecord != null) {
              break;
            }
          } catch (InterruptedException e) {
            throw new HoodieException(e);
          }
        }
       	...
    
        if (newRecord != null && newRecord.isPresent()) {
          return newRecord;
        } else {
          // We are done reading all the records from internal iterator.
          this.isReadDone.set(true);
          return Option.empty();
        }
      }
    

    可以看到首先会释放一个许可,然后判断是否还可以读取记录(还在生产或者停止生产但队列不为空都可读取),然后从内部队列获取记录或返回。

    上述便是生产者-消费者在Hudi中应用的分析。

    总结

    Hudi采用了生产者-消费者模型来控制记录的处理,与传统多生产者-多消费者模型不同的是,Hudi现在只支持多生产者-单消费者模型,单消费者意味着Hudi暂时不支持文件的并发写入。而对于生产消费的队列的实现,Hudi并未仅仅只是基于LinkedBlockingQueue,而是采用了更精细化的速率控制,保证速率会随着记录负载大小的变化和配置的队列缓存大小而动态变化,这也降低了系统发生OOM的概率。

    欢迎关注ApacheHudi

  • 相关阅读:
    Discourse 如何不使用 Let’s Encrypt 而使用 CA 签名的密钥进行安装
    Discourse 重复安装过程中的密钥签发问题
    Discourse 升级后提示 https 混合内容
    CentOS 8 安装 docker 报错 containerd.io >= 1.2.2-3
    MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction
    培养自己的5项能力
    高效率工作方式
    项目的架构演进过程
    如何预防后台被攻击,且看Tomcat的安全配置
    redis的缓存更新策略,缓存粒度控制
  • 原文地址:https://www.cnblogs.com/apachehudi/p/11937800.html
Copyright © 2011-2022 走看看