zoukankan      html  css  js  c++  java
  • spark streaming 接收kafka消息之三 -- kafka broker 如何处理 fetch 请求

    首先看一下 KafkaServer 这个类的声明:

    Represents the lifecycle of a single Kafka broker. Handles all functionality required to start up and shutdown a single Kafka node.
    代表了单个 broker 的生命周期,处理所有功能性的请求,以及startup 和shutdown 一个broker node。

    在这个类的startup中,有一个线程池被实例化了:

    /* start processing requests */
    // 处理所有的请求
    apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
      kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
      brokerTopicStats, clusterId, time)
     // 请求处理的线程池
    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
      config.numIoThreads)

     KafkaRequestHandlerPool 的源代码如下:

     1 class KafkaRequestHandlerPool(val brokerId: Int,
     2                               val requestChannel: RequestChannel,
     3                               val apis: KafkaApis,
     4                               time: Time,
     5                               numThreads: Int) extends Logging with KafkaMetricsGroup {
     6 
     7   /* a meter to track the average free capacity of the request handlers */
     8   private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
     9 
    10   this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
    11   val runnables = new Array[KafkaRequestHandler](numThreads)
    12   for(i <- 0 until numThreads) { // 实例化所有runnable 对象
    13     runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
    14 // 初始化并启动daemon thread
    15     Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()
    16   }
    17  // 关闭线程池中的所有的线程
    18   def shutdown() {
    19     info("shutting down")
    20     for (handler <- runnables)
    21       handler.initiateShutdown()
    22     for (handler <- runnables)
    23       handler.awaitShutdown()
    24     info("shut down completely")
    25   }
    26 }

    再看一下 KafkaRequestHandler 的源码:

     1 class KafkaRequestHandler(id: Int,
     2                           brokerId: Int,
     3                           val aggregateIdleMeter: Meter,
     4                           val totalHandlerThreads: Int,
     5                           val requestChannel: RequestChannel,
     6                           apis: KafkaApis,
     7                           time: Time) extends Runnable with Logging {
     8   this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
     9   private val latch = new CountDownLatch(1)
    10 
    11   def run() {
    12     while (true) { // 这个 run 方法会一直运行
    13       try {
    14         var req : RequestChannel.Request = null
    15         while (req == null) { // 如果没有 请求过来,就一直死循环下去
    16           // We use a single meter for aggregate idle percentage for the thread pool.
    17           // Since meter is calculated as total_recorded_value / time_window and
    18           // time_window is independent of the number of threads, each recorded idle
    19           // time should be discounted by # threads.
    20           val startSelectTime = time.nanoseconds
    21           req = requestChannel.receiveRequest(300)
    22           val endTime = time.nanoseconds
    23           if (req != null)
    24             req.requestDequeueTimeNanos = endTime
    25           val idleTime = endTime - startSelectTime
    26           aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
    27         }
    28 
    29         if (req eq RequestChannel.AllDone) {
    30           debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
    31           latch.countDown()
    32           return
    33         }
    34         trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
    35         apis.handle(req) // 处理请求
    36       } catch {
    37         case e: FatalExitError =>
    38           latch.countDown()
    39           Exit.exit(e.statusCode)
    40         case e: Throwable => error("Exception when handling request", e)
    41       }
    42     }
    43   }
    44 
    45   def initiateShutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
    46 
    47   def awaitShutdown(): Unit = latch.await()
    48 
    49 }

     重点看一下, kafka.server.KafkaApis#handle 源码:

     1 /**
     2  * Top-level method that handles all requests and multiplexes to the right api
     3  */
     4 def handle(request: RequestChannel.Request) {
     5   try {
     6     trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
     7       format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
     8     ApiKeys.forId(request.requestId) match {
     9       case ApiKeys.PRODUCE => handleProduceRequest(request)
    10       case ApiKeys.FETCH => handleFetchRequest(request) // 这是请求fetch消息的请求
    11       case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
    12       case ApiKeys.METADATA => handleTopicMetadataRequest(request)
    13       case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
    14       case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
    15       case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
    16       case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
    17       case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
    18       case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
    19       case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
    20       case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
    21       case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
    22       case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
    23       case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
    24       case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
    25       case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
    26       case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
    27       case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
    28       case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
    29       case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
    30       case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
    31       case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
    32       case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
    33       case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
    34       case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
    35       case ApiKeys.END_TXN => handleEndTxnRequest(request)
    36       case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
    37       case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
    38       case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
    39       case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
    40       case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
    41       case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
    42       case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
    43     }
    44   } catch {
    45     case e: FatalExitError => throw e
    46     case e: Throwable => handleError(request, e)
    47   } finally {
    48     request.apiLocalCompleteTimeNanos = time.nanoseconds
    49   }
    50 }

    再看 handleFetchRequest:

     1 // call the replica manager to fetch messages from the local replica
     2     replicaManager.fetchMessages(
     3       fetchRequest.maxWait.toLong, // 在这里是 0
     4       fetchRequest.replicaId,
     5       fetchRequest.minBytes,
     6       fetchRequest.maxBytes,
     7       versionId <= 2,
     8       authorizedRequestInfo,
     9       replicationQuota(fetchRequest),
    10       processResponseCallback,
    11       fetchRequest.isolationLevel)

    fetchMessage 源码如下:

     1 /**
     2  * Fetch messages from the leader replica, and wait until enough data can be fetched and return;
     3  * the callback function will be triggered either when timeout or required fetch info is satisfied
     4  */
     5 def fetchMessages(timeout: Long,
     6                   replicaId: Int,
     7                   fetchMinBytes: Int,
     8                   fetchMaxBytes: Int,
     9                   hardMaxBytesLimit: Boolean,
    10                   fetchInfos: Seq[(TopicPartition, PartitionData)],
    11                   quota: ReplicaQuota = UnboundedQuota,
    12                   responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
    13                   isolationLevel: IsolationLevel) {
    14   val isFromFollower = replicaId >= 0
    15   val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
    16   val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
    17  // 从本地 logs 中读取数据
    18   // read from local logs
    19   val logReadResults = readFromLocalLog(
    20     replicaId = replicaId,
    21     fetchOnlyFromLeader = fetchOnlyFromLeader,
    22     readOnlyCommitted = fetchOnlyCommitted,
    23     fetchMaxBytes = fetchMaxBytes,
    24     hardMaxBytesLimit = hardMaxBytesLimit,
    25     readPartitionInfo = fetchInfos,
    26     quota = quota,
    27     isolationLevel = isolationLevel)
    28 
    29   // if the fetch comes from the follower,
    30   // update its corresponding log end offset
    31   if(Request.isValidBrokerId(replicaId))
    32     updateFollowerLogReadResults(replicaId, logReadResults)
    33 
    34   // check if this fetch request can be satisfied right away
    35   val logReadResultValues = logReadResults.map { case (_, v) => v }
    36   val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum
    37   val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult) =>
    38     errorIncurred || (readResult.error != Errors.NONE))
    39  // 立即返回的四个条件:
    40 // 1. Fetch 请求不希望等待
    41 // 2. Fetch 请求不请求任何数据
    42 // 3. 有足够数据可以返回
    43 // 4. 当读取数据的时候有error 发生
    44   // respond immediately if 1) fetch request does not want to wait
    45   //                        2) fetch request does not require any data
    46   //                        3) has enough data to respond
    47   //                        4) some error happens while reading data
    48   if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
    49     val fetchPartitionData = logReadResults.map { case (tp, result) =>
    50       tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
    51         result.lastStableOffset, result.info.abortedTransactions)
    52     }
    53     responseCallback(fetchPartitionData)
    54   } else {// DelayedFetch
    55     // construct the fetch results from the read results
    56     val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) =>
    57       val fetchInfo = fetchInfos.collectFirst {
    58         case (tp, v) if tp == topicPartition => v
    59       }.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos"))
    60       (topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
    61     }
    62     val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
    63       fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
    64     val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, isolationLevel, responseCallback)
    65 
    66     // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
    67     val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }
    68 
    69     // try to complete the request immediately, otherwise put it into the purgatory;
    70     // this is because while the delayed fetch operation is being created, new requests
    71     // may arrive and hence make this operation completable.
    72     delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
    73   }
    74 }

    继续追踪 readFromLocalLog 源码:

      1 /**
      2  * Read from multiple topic partitions at the given offset up to maxSize bytes
      3  */
      4 // 他负责从多个 topic partition中读数据到最大值,默认1M
      5 隔离级别: 读已提交、读未提交
      6 def readFromLocalLog(replicaId: Int,
      7                      fetchOnlyFromLeader: Boolean,
      8                      readOnlyCommitted: Boolean,
      9                      fetchMaxBytes: Int,
     10                      hardMaxBytesLimit: Boolean,
     11                      readPartitionInfo: Seq[(TopicPartition, PartitionData)],
     12                      quota: ReplicaQuota,
     13                      isolationLevel: IsolationLevel): Seq[(TopicPartition, LogReadResult)] = {
     14 
     15   def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
     16     val offset = fetchInfo.fetchOffset
     17     val partitionFetchSize = fetchInfo.maxBytes
     18     val followerLogStartOffset = fetchInfo.logStartOffset
     19 
     20     brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()
     21     brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
     22 
     23     try {
     24       trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
     25         s"remaining response limit $limitBytes" +
     26         (if (minOneMessage) s", ignoring response/partition size limits" else ""))
     27 
     28       // decide whether to only fetch from leader
     29       val localReplica = if (fetchOnlyFromLeader)
     30         getLeaderReplicaIfLocal(tp)
     31       else
     32         getReplicaOrException(tp)
     33 
     34       val initialHighWatermark = localReplica.highWatermark.messageOffset
     35       val lastStableOffset = if (isolationLevel == IsolationLevel.READ_COMMITTED)
     36         Some(localReplica.lastStableOffset.messageOffset)
     37       else
     38         None
     39 
     40       // decide whether to only fetch committed data (i.e. messages below high watermark)
     41       val maxOffsetOpt = if (readOnlyCommitted)
     42         Some(lastStableOffset.getOrElse(initialHighWatermark))
     43       else
     44         None
     45 
     46       /* Read the LogOffsetMetadata prior to performing the read from the log.
     47        * We use the LogOffsetMetadata to determine if a particular replica is in-sync or not.
     48        * Using the log end offset after performing the read can lead to a race condition
     49        * where data gets appended to the log immediately after the replica has consumed from it
     50        * This can cause a replica to always be out of sync.
     51        */
     52       val initialLogEndOffset = localReplica.logEndOffset.messageOffset
     53       val initialLogStartOffset = localReplica.logStartOffset
     54       val fetchTimeMs = time.milliseconds
     55       val logReadInfo = localReplica.log match {
     56         case Some(log) =>
     57           val adjustedFetchSize = math.min(partitionFetchSize, limitBytes)
     58 
     59           // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
     60 // 尝试从 Log 中读取数据
     61           val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage, isolationLevel)
     62 
     63           // If the partition is being throttled, simply return an empty set.
     64           if (shouldLeaderThrottle(quota, tp, replicaId))
     65             FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)
     66           // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
     67           // progress in such cases and don't need to report a `RecordTooLargeException`
     68           else if (!hardMaxBytesLimit && fetch.firstEntryIncomplete)
     69             FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)
     70           else fetch
     71 
     72         case None =>
     73           error(s"Leader for partition $tp does not have a local log")
     74           FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)
     75       }
     76 
     77       LogReadResult(info = logReadInfo,
     78                     highWatermark = initialHighWatermark,
     79                     leaderLogStartOffset = initialLogStartOffset,
     80                     leaderLogEndOffset = initialLogEndOffset,
     81                     followerLogStartOffset = followerLogStartOffset,
     82                     fetchTimeMs = fetchTimeMs,
     83                     readSize = partitionFetchSize,
     84                     lastStableOffset = lastStableOffset,
     85                     exception = None)
     86     } catch {
     87       // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
     88       // is supposed to indicate un-expected failure of a broker in handling a fetch request
     89       case e@ (_: UnknownTopicOrPartitionException |
     90                _: NotLeaderForPartitionException |
     91                _: ReplicaNotAvailableException |
     92                _: OffsetOutOfRangeException) =>
     93         LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
     94                       highWatermark = -1L,
     95                       leaderLogStartOffset = -1L,
     96                       leaderLogEndOffset = -1L,
     97                       followerLogStartOffset = -1L,
     98                       fetchTimeMs = -1L,
     99                       readSize = partitionFetchSize,
    100                       lastStableOffset = None,
    101                       exception = Some(e))
    102       case e: Throwable =>
    103         brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
    104         brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
    105         error(s"Error processing fetch operation on partition $tp, offset $offset", e)
    106         LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
    107                       highWatermark = -1L,
    108                       leaderLogStartOffset = -1L,
    109                       leaderLogEndOffset = -1L,
    110                       followerLogStartOffset = -1L,
    111                       fetchTimeMs = -1L,
    112                       readSize = partitionFetchSize,
    113                       lastStableOffset = None,
    114                       exception = Some(e))
    115     }
    116   }
    117  // maxSize, 默认1M
    118   var limitBytes = fetchMaxBytes
    119   val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]
    120   var minOneMessage = !hardMaxBytesLimit // hardMaxBytesLimit 
    121   readPartitionInfo.foreach { case (tp, fetchInfo) =>
    122     val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
    123     val messageSetSize = readResult.info.records.sizeInBytes
    124     // Once we read from a non-empty partition, we stop ignoring request and partition level size limits
    125     if (messageSetSize > 0)
    126       minOneMessage = false
    127     limitBytes = math.max(0, limitBytes - messageSetSize)
    128     result += (tp -> readResult)
    129   }
    130   result
    131 }

    Log.read 源码如下:

     1 /**
     2  * Read messages from the log.
     3  *
     4  * @param startOffset The offset to begin reading at
     5  * @param maxLength The maximum number of bytes to read
     6  * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
     7  * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
     8  * @param isolationLevel The isolation level of the fetcher. The READ_UNCOMMITTED isolation level has the traditional
     9  *                       read semantics (e.g. consumers are limited to fetching up to the high watermark). In
    10  *                       READ_COMMITTED, consumers are limited to fetching up to the last stable offset. Additionally,
    11  *                       in READ_COMMITTED, the transaction index is consulted after fetching to collect the list
    12  *                       of aborted transactions in the fetch range which the consumer uses to filter the fetched
    13  *                       records before they are returned to the user. Note that fetches from followers always use
    14  *                       READ_UNCOMMITTED.
    15  *
    16  * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset
    17  * @return The fetch data information including fetch starting offset metadata and messages read.
    18  */
    19 def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false,
    20          isolationLevel: IsolationLevel): FetchDataInfo = {
    21   trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
    22 
    23   // Because we don't use lock for reading, the synchronization is a little bit tricky.
    24   // We create the local variables to avoid race conditions with updates to the log.
    25   val currentNextOffsetMetadata = nextOffsetMetadata
    26   val next = currentNextOffsetMetadata.messageOffset
    27   if (startOffset == next) {
    28     val abortedTransactions =
    29       if (isolationLevel == IsolationLevel.READ_COMMITTED) Some(List.empty[AbortedTransaction])
    30       else None
    31     return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false,
    32       abortedTransactions = abortedTransactions)
    33   }
    34 
    35   var segmentEntry = segments.floorEntry(startOffset)
    36 
    37   // return error on attempt to read beyond the log end offset or read below log start offset
    38   if (startOffset > next || segmentEntry == null || startOffset < logStartOffset)
    39     throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
    40 
    41   // Do the read on the segment with a base offset less than the target offset
    42   // but if that segment doesn't contain any messages with an offset greater than that
    43   // continue to read from successive segments until we get some messages or we reach the end of the log
    44   while(segmentEntry != null) {
    45     val segment = segmentEntry.getValue
    46 
    47     // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after
    48     // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may
    49     // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log
    50     // end of the active segment.
    51     val maxPosition = {
    52       if (segmentEntry == segments.lastEntry) {
    53         val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
    54         // Check the segment again in case a new segment has just rolled out.
    55         if (segmentEntry != segments.lastEntry)
    56           // New log segment has rolled out, we can read up to the file end.
    57           segment.size
    58         else
    59           exposedPos
    60       } else {
    61         segment.size
    62       }
    63     }
    64 // 从segment 中去读取数据
    65     val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
    66     if (fetchInfo == null) {
    67       segmentEntry = segments.higherEntry(segmentEntry.getKey)
    68     } else {
    69       return isolationLevel match {
    70         case IsolationLevel.READ_UNCOMMITTED => fetchInfo
    71         case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
    72       }
    73     }
    74   }
    75 
    76   // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
    77   // this can happen when all messages with offset larger than start offsets have been deleted.
    78   // In this case, we will return the empty set with log end offset metadata
    79   FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
    80 }

    LogSegment 的 read 方法:

     1 /**
     2  * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
     3  * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
     4  *
     5  * @param startOffset A lower bound on the first offset to include in the message set we read
     6  * @param maxSize The maximum number of bytes to include in the message set we read
     7  * @param maxOffset An optional maximum offset for the message set we read
     8  * @param maxPosition The maximum position in the log segment that should be exposed for read
     9  * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists)
    10  *
    11  * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
    12  *         or null if the startOffset is larger than the largest offset in this log
    13  */
    14 @threadsafe
    15 def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
    16          minOneMessage: Boolean = false): FetchDataInfo = {
    17   if (maxSize < 0)
    18     throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
    19 
    20   val logSize = log.sizeInBytes // this may change, need to save a consistent copy
    21   val startOffsetAndSize = translateOffset(startOffset)
    22  // offset 已经到本 segment 的结尾,返回 null
    23   // if the start position is already off the end of the log, return null
    24   if (startOffsetAndSize == null)
    25     return null
    26  // 开始位置
    27   val startPosition = startOffsetAndSize.position
    28   val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
    29  // 调整的最大位置
    30   val adjustedMaxSize =
    31     if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
    32     else maxSize
    33 
    34   // return a log segment but with zero size in the case below
    35   if (adjustedMaxSize == 0)
    36     return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
    37 
    38   // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    39   val fetchSize: Int = maxOffset match {
    40     case None =>
    41       // no max offset, just read until the max position
    42       min((maxPosition - startPosition).toInt, adjustedMaxSize)
    43     case Some(offset) =>
    44       // there is a max offset, translate it to a file position and use that to calculate the max read size;
    45       // when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the
    46       // true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an
    47       // offset between new leader's high watermark and the log end offset, we want to return an empty response.
    48       if (offset < startOffset)
    49         return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
    50       val mapping = translateOffset(offset, startPosition)
    51       val endPosition =
    52         if (mapping == null)
    53           logSize // the max offset is off the end of the log, use the end of the file
    54         else
    55           mapping.position
    56       min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
    57   }
    58 
    59   FetchDataInfo(offsetMetadata, log.read(startPosition, fetchSize),
    60     firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
    61 }
    62 
    63 log.read(startPosition, fetchSize)  的源码如下:
    64 /**
    65  * Return a slice of records from this instance, which is a view into this set starting from the given position
    66  * and with the given size limit.
    67  *
    68  * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read.
    69  *
    70  * If this message set is already sliced, the position will be taken relative to that slicing.
    71  *
    72  * @param position The start position to begin the read from
    73  * @param size The number of bytes after the start position to include
    74  * @return A sliced wrapper on this message set limited based on the given position and size
    75  */
    76 public FileRecords read(int position, int size) throws IOException {
    77     if (position < 0)
    78         throw new IllegalArgumentException("Invalid position: " + position);
    79     if (size < 0)
    80         throw new IllegalArgumentException("Invalid size: " + size);
    81 
    82     final int end;
    83     // handle integer overflow
    84     if (this.start + position + size < 0)
    85         end = sizeInBytes();
    86     else
    87         end = Math.min(this.start + position + size, sizeInBytes());
    88     return new FileRecords(file, channel, this.start + position, end, true);
    89 }

    processResponseCallback(在kafka.server.KafkaApis#handleFetchRequest 中定义)源码如下:

     1 // fetch response callback invoked after any throttling
     2   def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
     3     def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = {
     4       val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
     5       fetchedPartitionData.asScala.foreach { case (tp, partitionData) =>
     6         convertedData.put(tp, convertedPartitionData(tp, partitionData))
     7       }
     8       val response = new FetchResponse(convertedData, 0)
     9       val responseStruct = response.toStruct(versionId)
    10 
    11       trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
    12       response.responseData.asScala.foreach { case (topicPartition, data) =>
    13         // record the bytes out metrics only when the response is being sent
    14         brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
    15       }
    16 
    17       val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs,
    18         request.connectionId, request.header)
    19       RequestChannel.Response(request, responseSend)
    20     }
    21 
    22     if (fetchRequest.isFromFollower)
    23       sendResponseExemptThrottle(createResponse(0))
    24     else
    25       sendResponseMaybeThrottle(request, request.header.clientId, requestThrottleMs =>
    26         requestChannel.sendResponse(createResponse(requestThrottleMs)))
    27   }
    28 
    29   // When this callback is triggered, the remote API call has completed.
    30   // Record time before any byte-rate throttling.
    31   request.apiRemoteCompleteTimeNanos = time.nanoseconds
    32 
    33   if (fetchRequest.isFromFollower) {
    34     // We've already evaluated against the quota and are good to go. Just need to record it now.
    35     val responseSize = sizeOfThrottledPartitions(versionId, fetchRequest, mergedPartitionData, quotas.leader)
    36     quotas.leader.record(responseSize)
    37     fetchResponseCallback(bandwidthThrottleTimeMs = 0)
    38   } else {
    39     // Fetch size used to determine throttle time is calculated before any down conversions.
    40     // This may be slightly different from the actual response size. But since down conversions
    41     // result in data being loaded into memory, it is better to do this after throttling to avoid OOM.
    42     val response = new FetchResponse(fetchedPartitionData, 0)
    43     val responseStruct = response.toStruct(versionId)
    44     quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
    45       fetchResponseCallback)
    46   }
    47 }

    结论,会具体定位到具体LogSegment, 通过 start 和 size 来获取 logSegement中的记录,最大大小默认为1 M,可以设置。

    并且提供了数据隔离机制,可以支持读已提交和读未提交(默认是读未提交)。如果没有数据会直接返回的。

  • 相关阅读:
    来自风湿病研究院RA患者队列研究显示, RA日常诊治时特别是早期RA患者成功维持新ACR/EULAR缓解标准能获
    系列超声发现脊柱关节炎附着点处新骨形成
    超声(PDUS)能否容易检出侵蚀?比较PDUS与microCT对正常人群和RA患者小关节生理和皮质断裂的评价
    根据ACR/EULAR 2010 标准定义RA放射学侵蚀病变
    比较依那西普和柳氮磺胺吡碇治疗早期中轴脊柱关节炎1年后的停药缓解率和缓解时间-ESTHER试验的2年数据
    Matlab Computer Vision and Pattern Recognition toolbox
    vi/vim 命令手册(初级篇)
    GCC设定include和库路径(转载)
    linux 下查找文件或者内容常有命令
    svn命令在linux下的使用
  • 原文地址:https://www.cnblogs.com/johnny666888/p/11087408.html
Copyright © 2011-2022 走看看