zoukankan      html  css  js  c++  java
  • kafka producer的batch.size和linger.ms

    1.问题

    batch.size和linger.ms是对kafka producer性能影响比较大的两个参数。batch.size是producer批量发送的基本单位,默认是16384Bytes,即16kB;lingger.ms是sender线程在检查batch是否ready时候,判断有没有过期的参数,默认大小是0ms。

    那么producer是按照batch.size大小批量发送消息呢,还是按照linger.ms的时间间隔批量发送消息呢?这里先说结论:其实满足batch.size和ling.ms之一,producer便开始发送消息。

    2.源码分析

    首先sender线程主要代码如下,我们主要关心sender线程阻塞的情况:

    void run(long now) {
            Cluster cluster = metadata.fetch();
    
            // result.nextReadyCheckDelayMs表示下次检查是否ready的时间,也是//selecotr会阻塞的时间
            RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    
            if (result.unknownLeadersExist)
                this.metadata.requestUpdate();
    
            Iterator<Node> iter = result.readyNodes.iterator();
    
            long notReadyTimeout = Long.MAX_VALUE;
            while (iter.hasNext()) {
                Node node = iter.next();
                if (!this.client.ready(node, now)) {
                    iter.remove();
                    notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
                }
            }
    
            Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                             result.readyNodes,
                                                                             this.maxRequestSize,
                                                                             now);
            if (guaranteeMessageOrder) {
                for (List<RecordBatch> batchList : batches.values()) {
                    for (RecordBatch batch : batchList)
                        this.accumulator.mutePartition(batch.topicPartition);
                }
            }
    
            List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        
            for (RecordBatch expiredBatch : expiredBatches)
                this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
    
            sensors.updateProduceRequestMetrics(batches);
            List<ClientRequest> requests = createProduceRequests(batches, now);
    
            // 暂且只关心result.nextReadyCheckDelayMs
            long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
            if (result.readyNodes.size() > 0) {
                log.trace("Nodes with data ready to send: {}", result.readyNodes);
                log.trace("Created {} produce requests: {}", requests.size(), requests);
                pollTimeout = 0;
            }
            for (ClientRequest request : requests)
                client.send(request, now);
    
            // poll最终会调用selector,pollTimeout也就是selector阻塞的时间
            this.client.poll(pollTimeout, now);
        }
    

    selector

    private int select(long ms) throws IOException {
            if (ms < 0L)
                throw new IllegalArgumentException("timeout should be >= 0");
    
            if (ms == 0L)
                return this.nioSelector.selectNow();
            else
                return this.nioSelector.select(ms);
        }
    

    我们可以从实例化一个新的KafkaProducer开始分析(还没有调用send方法),在sender线程调用accumulator#ready(..)时候,会返回result,其中包含selector可能要阻塞的时间。由于还没有调用send方法,所以Deque<RecordBatch>为空,所以result中包含的nextReadyCheckDelayMs也是最大值,这个时候selector会一直阻塞。

    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
            Set<Node> readyNodes = new HashSet<Node>();
             // 初始化为最大值
            long nextReadyCheckDelayMs = Long.MAX_VALUE;
            boolean unknownLeadersExist = false;
    
            boolean exhausted = this.free.queued() > 0;
            for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
                TopicPartition part = entry.getKey();
                Deque<RecordBatch> deque = entry.getValue();
    
                Node leader = cluster.leaderFor(part);
                if (leader == null) {
                    unknownLeadersExist = true;
                } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                    synchronized (deque) {
                        RecordBatch batch = deque.peekFirst();
                        if (batch != null) {
                            boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                            long waitedTimeMs = nowMs - batch.lastAttemptMs;
                            long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
    
                            // 和linger.ms有关
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                            boolean full = deque.size() > 1 || batch.records.isFull();
                            boolean expired = waitedTimeMs >= timeToWaitMs;
                            boolean sendable = full || expired || exhausted || closed || flushInProgress();
                            if (sendable && !backingOff) {
                                readyNodes.add(leader);
                            } else {
                                nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                            }
                        }
                    }
                }
            }
    
            return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
        }
    

    然后我们调用send方法往内存中放入了一条数据,由于是新建的一个RecordBatch,所以会唤醒sender线程
    KafkaProducer#doSend(...)

    if (result.batchIsFull || result.newBatchCreated) {
                    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                    this.sender.wakeup();
                }
    

    这个时候会唤醒阻塞在selector#select(..)的sender线程,sender线程又运行到accumulator#ready(..),由于Deque<RecordBatch>有值,所以返回的result包含的nextReadyCheckDelayMs不再是最大值,而是和linger.ms有关的值。也就是时候selector会z最多阻塞lingger.ms后就返回,然后再次轮询。

    也就是说当Deque<RecordBatch>不为空的时候,sender线程会最多阻塞linger.ms时间;Deque<RecordBatch>为空的时候,sender线程会阻塞Long.MAX_VALUE时间;一旦调用了KafkaProduer#send(..)将消息放到内存中,新建了个RecordBatch,则会将sender线wakeup。

    另外从上面的代码,即KafkaProducer#doSend(...)中也可以看到,如果有一个RecordBatch满了,也会调用Sender#wakeup(..),所以综上所述:只要满足linger.ms和batch.size满了就会激活sender线程来发送消息。

  • 相关阅读:
    【NOIP2016】换教室
    【NOIP模拟赛】总结
    【Codeforces Round 418】An impassioned circulation of affection DP
    DP测试总结
    【NOIP2012】疫情控制
    【HNOI2016】序列 莫队+单调栈+RMQ
    【Luogu P2709 小B的询问】莫队
    【HNOI2017】影魔
    【HNOI2017】大佬
    阿里云MaxCompute 2019-7月刊
  • 原文地址:https://www.cnblogs.com/set-cookie/p/8902340.html
Copyright © 2011-2022 走看看