zoukankan      html  css  js  c++  java
  • offset range 查询

    offset range 查询

    我们在实际使用过程中经常需要查询某个topic的某分区的offset的range
    命令行:

    kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxxx:9092 -topic xxxtopic --time -2      
    kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxxx:9092 -topic xxxtopic --time -1
    

    -1 -2 的特殊含义:

    public class ListOffsetRequest extends AbstractRequest {
        public static final long EARLIEST_TIMESTAMP = -2L;
        public static final long LATEST_TIMESTAMP = -1L;
    }
    

    客户端

    KafkaConsumer.endOffsets(Collection)
    KafkaConsumer.beginningOffsets(Collection)
    Fetcher.beginningOrEndOffset(Collection, long, long)
    Fetcher.retrieveOffsetsByTimes(Map<TopicPartition, Long>, long, boolean)
    Fetcher.sendListOffsetRequests(boolean, Map<TopicPartition, Long>)

    // Group the partitions by node.
            final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>();
            for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
                TopicPartition tp  = entry.getKey();
                PartitionInfo info = metadata.fetch().partition(tp);
                if (info == null) {
                    metadata.add(tp.topic());
                    log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", tp);
                    return RequestFuture.staleMetadata();
                } else if (info.leader() == null) {
                    log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", tp);
                    return RequestFuture.leaderNotAvailable();
                } else {
                    Node node = info.leader();
                    Map<TopicPartition, Long> topicData = timestampsToSearchByNode.get(node);
                    if (topicData == null) {
                        topicData = new HashMap<>();
                        timestampsToSearchByNode.put(node, topicData);
                    }
                    topicData.put(entry.getKey(), entry.getValue());
                }
            }
    
            final RequestFuture<Map<TopicPartition, OffsetData>> listOffsetRequestsFuture = new RequestFuture<>();
            final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new HashMap<>();
            final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
            for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) {
                sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps)
                        .addListener(new RequestFutureListener<Map<TopicPartition, OffsetData>>() {
                            @Override
                            public void onSuccess(Map<TopicPartition, OffsetData> value) {
                                synchronized (listOffsetRequestsFuture) {
                                    fetchedTimestampOffsets.putAll(value);
                                    if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone())
                                        listOffsetRequestsFuture.complete(fetchedTimestampOffsets);
                                }
                            }
    
                            @Override
                            public void onFailure(RuntimeException e) {
                                synchronized (listOffsetRequestsFuture) {
                                    // This may cause all the requests to be retried, but should be rare.
                                    if (!listOffsetRequestsFuture.isDone())
                                        listOffsetRequestsFuture.raise(e);
                                }
                            }
                        });
            }
            return listOffsetRequestsFuture;
    

    简单点说:就是找到leader节点然后给其发送ListOffsetRequest请求。这个请求是按时间进行offset定位。

    broker端

    KafkaApis.handleListOffsetRequestV1AndAbove(request: RequestChannel.Request)

    查询最新offset

    这个值应该是在生产的时候维护好的

    val lastFetchableOffset = offsetRequest.isolationLevel match {
      case IsolationLevel.READ_COMMITTED => localReplica.lastStableOffset.messageOffset
      case IsolationLevel.READ_UNCOMMITTED => localReplica.highWatermark.messageOffset
    }
    

    这个地方也能反映出 LEO,LSO,highwater的区别!!

    查询最早offset

    kafka.log.Log.fetchOffsetsByTimestamp(targetTimestamp: Long)
    这个值应该是在生产的时候维护好的

    @threadsafe
    class Log(@volatile var dir: File,
              @volatile var config: LogConfig,
              @volatile var logStartOffset: Long,
              @volatile var recoveryPoint: Long,
              scheduler: Scheduler,
              brokerTopicStats: BrokerTopicStats,
              time: Time,
              val maxProducerIdExpirationMs: Int,
              val producerIdExpirationCheckIntervalMs: Int,
              val topicPartition: TopicPartition,
              val producerStateManager: ProducerStateManager,
              logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
    // ......
    if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
            return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))
    

    按时间戳查询offset

    先确定target segment

          val targetSeg = {
            // Get all the segments whose largest timestamp is smaller than target timestamp
            val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
            // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
            if (earlierSegs.length < segmentsCopy.length)
              Some(segmentsCopy(earlierSegs.length))
            else
              None
          }
    

    再到seg的index根据时间查找
    LogSegment.findOffsetByTimestamp(timestamp: Long, startingOffset: Long)
    先定位到index然后再二分查找

    
    // LogSegment.scala
    val timestampOffset = timeIndex.lookup(timestamp)
    val position = index.lookup(math.max(timestampOffset.offset, startingOffset)).position
    
    // AbstractIndex.scala
    
      /**
       * Lookup lower and upper bounds for the given target.
       */
      private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
        // check if the index is empty
        if(_entries == 0)
          return (-1, -1)
    
        // check if the target offset is smaller than the least offset
        if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
          return (-1, 0)
    
        // binary search for the entry
        var lo = 0
        var hi = _entries - 1
        while(lo < hi) {
          val mid = ceil(hi/2.0 + lo/2.0).toInt
          val found = parseEntry(idx, mid)
          val compareResult = compareIndexEntry(found, target, searchEntity)
          if(compareResult > 0)
            hi = mid - 1
          else if(compareResult < 0)
            lo = mid
          else
            return (mid, mid)
        }
    
        (lo, if (lo == _entries - 1) -1 else lo + 1)
      }    
    
    
  • 相关阅读:
    将个人博客与github关联
    docker镜像制作
    perf命令
    vmstat命令
    ps命令
    top命令
    linux查看当前用户登陆信息
    .NET CORE应用程序启动
    WebAPI简介
    Redis-位图
  • 原文地址:https://www.cnblogs.com/simoncook/p/11809433.html
Copyright © 2011-2022 走看看