producer 发送数据到 broker,如果 producer 端设置 acks = -1,同时 broker 侧配置 min.insync.replicas = 2,这时 broker 会创建 DelayedProduce,leader broker 会等待消息复制到其他副本中,或者超时后返回。
//kafka.server.ReplicaManager#appendRecords /** * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock. * 把消息追加到 leader 副本中,然后等待消息复制到其他副本,当超时或者 required acks 数满足后,会执行回调函数。 */ def appendRecords(timeout: Long, requiredAcks: Short, internalTopicsAllowed: Boolean, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Lock] = None, processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) { // requiredAcks 取值为 -1, 0, 1 if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds // 把数据写入本地文件中 val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, isFromClient = isFromClient, entriesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) // 遍历 localProduceResults val produceStatus = localProduceResults.map { case (topicPartition, result) => topicPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // response status } processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats)) // 可简单认为当 ack = -1 时,需要 DelayedProduce if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq // try to complete the request immediately, otherwise put it into the purgatory // this is because while the delayed produce operation is being created, new // requests may arrive and hence make this operation completable. // 尝试执行 delayedProduce 的 tryComplete,执行成功则返回,失败则加入到哈希轮定时器 delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) } else { // we can respond immediately val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) responseCallback(produceResponseStatus) } } else { // If required.acks is outside accepted range, something is wrong with the client // Just return an error and don't handle the request at all val responseStatus = entriesPerPartition.map { case (topicPartition, _) => topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS, LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset) } responseCallback(responseStatus) } }
什么时候触发 DelayedProduce 的 tryComplete 呢?
follower 一直从 leader 拉取消息,并上报自身的 loe,leader 根据副本的 loe 更新 hw,并且触发 DelayedProduce.tryComplete。
// kafka.cluster.Partition#updateReplicaLogReadResult def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean = { val replicaId = replica.brokerId // No need to calculate low watermark if there is no delayed DeleteRecordsRequest val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L replica.updateLogReadResult(logReadResult) val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L // check if the LW of the partition has incremented // since the replica's logStartOffset may have incremented val leaderLWIncremented = newLeaderLW > oldLeaderLW // check if we need to expand ISR to include this replica // if it is not in the ISR yet val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult) val result = leaderLWIncremented || leaderHWIncremented // some delayed operations may be unblocked after HW or LW changed if (result) tryCompleteDelayedRequests() debug(s"Recorded replica $replicaId log end offset (LEO) position ${logReadResult.info.fetchOffsetMetadata.messageOffset}.") result } // kafka.cluster.Partition#tryCompleteDelayedRequests /** * Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock. */ private def tryCompleteDelayedRequests() { val requestKey = new TopicPartitionOperationKey(topicPartition) replicaManager.tryCompleteDelayedFetch(requestKey) replicaManager.tryCompleteDelayedProduce(requestKey) replicaManager.tryCompleteDelayedDeleteRecords(requestKey) } // kafka.server.ReplicaManager#tryCompleteDelayedProduce /** * Try to complete some delayed produce requests with the request key; * this can be triggered when: * * 1. The partition HW has changed (for acks = -1) * 2. A follower replica's fetch operation is received (for acks > 1) */ def tryCompleteDelayedProduce(key: DelayedOperationKey) { val completed = delayedProducePurgatory.checkAndComplete(key) debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed)) }