zoukankan      html  css  js  c++  java
  • 副本同步

    几个概念的解释

    • LEO 日志的结尾位置,也是最后写入(append)消息的位置+1。这个位置不代表消费者能看到,仅仅表示单机的日志写入位置,因为要考虑其他副本的写入情况。leader与follower都有此指标。
    • HW high water mark的简称,对外公开的消费者的非事务消息(即未提交读模式)的位置。这个值的更新过程比较复杂。leader与follower都有此指标。与LEO的区别参见这里
    • LSO 事务消息涉及。最后稳定offset。如果是事务消息(即已提交读模式),这是消费者能看到的最大位置。可以参见《offset range查询》中 查询最新offset 段落。
    • epoch leader的年代。0.11版本引入这个概念,为了解决0,8版本在broker挂掉的过程中消息可能丢失和错乱的问题。具体可以参见huxi的Kafka水位(high watermark)与leader epoch的讨论

    The high watermark indicated the offset of messages that are fully replicated, while the end-of-log offset might be larger if there are newly appended records to the leader partition which are not replicated yet.

    副本同步主要结构

    主要涉及的类有:AbstractFetcherThread,ReplicaFetcherThread,PartitionFetchState,ReplicaFetcherManager,ShutdownableThread,ReplicaManager , ReplicaFetcherManager

    Follower机器逻辑简述

    线程创建部分

    1. handleLeaderAndIsrRequest是入口,在handleLeaderAndIsrRequest时会触发ReplicaManagerbecomeLeaderOrFollower,makeFollowers
    2. 触发ReplicaFetcherManageraddFetcherForPartitions
    3. ReplicaFetcherManagercreateFetcherThread方法创建fetcher线程。new ReplicaFetcherThread....

    一些字段的赋值说明:

    • 线程名 s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
    • clientId 就是线程名
    • isInterruptible 创建时用固定值是false
    • includeLogTruncation 创建时用固定值是true,这也就是意味着PartitionFetchState对象一开始建出来时truncatingLog字段是true

    Follower机器逻辑执行部分

    ReplicaFetcherThread类的结构继承关系是:
    ShutdownableThread
    |-- AbstractFetcherThread
    |---|-- ReplicaFetcherThread

    因为ShutdownableThread是个spin线程,子类实现doWork方法即可对接业务逻辑。
    AbstractFetcherThread中的doWork逻辑组织:

    • maybeTruncate 处理可能需要截断的日志,针对LEO大于HW(highwater mark的简称,下同)的,要截断到HW位置。
    • 构建FetchRequest请求拉数据。
    • 处理FetchRequest请求的响应,processFetchRequest。注意:该请求不一定能请求到消息数据(Record),而且该请求也不是仅仅请求消息,还请求leader的高水位等值。

    maybeTruncate逻辑:

    • 过滤出PartitionFetchState实例isTruncatingLog为true的分区。PartitionFetchState的truncatingLog字段什么时候为true,目前分析是ReplicaFetcherThread线程刚创建时,默认赋值了true,也就是一开始可能会截断。 ReplicaFetcherThread.buildLeaderEpochRequest。
    • 根据上步过滤出来的结果,构建根据epoch查询offset的请求OffsetsForLeaderEpochRequest。ReplicaFetcherThread.fetchEpochsFromLeader。
    • 根据查询回来的结果,设置(纠正)正确的offset。我理解成只要mark了正确的位置即可,后面写的时候覆盖写即可,不要真正删除。(待证实) AbstractFetcherThread.markTruncationComplete 。

    处理FetchRequest请求的响应的逻辑:

    • 根据结果判断有误错误码
    • 结果正常的 调用processPartitionData,并更新分区状态 partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset)) ReplicaFetcherThread.processPartitionData
    • processPartitionDataReplicaFetcherThread实现,主要逻辑有:
      • 写副本日志 replica.log.get.appendAsFollower(records)
      • 更新副本的HW。 highWatermark 可以理解成 大家都已经确认的offset,否则就没必要维护这个了,直接用LEO或者LSO好 与LEO的区别参见这里. 要构建用例恰好在这行前挂掉follower。
      • 维护到副本对象的元数据中。 一次fetch请求并不能更新副本的highWatermark,需要下一次fethc请求才能完成上一次的。replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)

    leader与follower的HW LEO更新过程

    Leader机器逻辑执行部分

    处理fetch请求的调用栈,消费者客户端与follower同步的fetch请求走的是一个逻辑,靠是否是来自follower做了些逻辑上的差异处理

    ReplicaManager.kafka$server$ReplicaManager$$read$1(TopicPartition, FetchRequest$PartitionData, int, boolean, int, boolean, boolean, boolean, ReplicaQuota, IsolationLevel) line: 856	
    ReplicaManager$$anonfun$readFromLocalLog$1.apply(Tuple2<TopicPartition,PartitionData>) line: 962	
    ReplicaManager$$anonfun$readFromLocalLog$1.apply(Object) line: 961	
    ResizableArray$class.foreach(ResizableArray, Function1) line: 59	
    ArrayBuffer<A>.foreach(Function1<A,U>) line: 48	
    ReplicaManager.readFromLocalLog(int, boolean, boolean, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, IsolationLevel) line: 961	
    ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 790	
    ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803	
    KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597	
    KafkaApis.handle(RequestChannel$Request) line: 101	
    

    涉及到的位点信息及更新逻辑:
    fetchOffset fetch开始位点,来自于fetch请求体
    highWatermark 更新逻辑

    kafka.server.ReplicaManager.readFromLocalLog(...).read(...) 操作逻辑:
    标志度量数据
    获取副本对象localReplica 获取逻辑是根据replica id(即broker id)获取,发fetch请求的时候 请求体中会带上replica id
    val initialHighWatermark = localReplica.highWatermark.messageOffset

    更新leader节点上维护远端副本的LEO信息
    有点绕 就是在leader节点上也 维护了 远端LEO的信息 ,因为leader节点高水位要靠远端的LEO来更新,leader节点高水位的更新逻辑就是 所有远端副本的LEO的最小值
    调用栈如下:

    Replica.logEndOffset_$eq(LogOffsetMetadata) line: 98	
    Replica.updateLogReadResult(LogReadResult) line: 83	
    Partition.updateReplicaLogReadResult(Replica, LogReadResult) line: 276	
    ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Tuple2<TopicPartition,LogReadResult>) line: 1314	
    ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Object) line: 1308	
    TraversableLike$$anonfun$map$1.apply(A) line: 234	
    TraversableLike$$anonfun$map$1.apply(Object) line: 234	
    ResizableArray$class.foreach(ResizableArray, Function1) line: 59	
    ArrayBuffer<A>.foreach(Function1<A,U>) line: 48	
    TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234	
    ArrayBuffer<A>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104	
    ReplicaManager.updateFollowerLogReadResults(int, Seq<Tuple2<TopicPartition,LogReadResult>>) line: 1308	
    ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 799	
    ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803	
    KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597	
    

    可以看出来是在leader处理fetch请求的时候做的逻辑,fetch请求带上来fetch offset就当成了远端副本的LEO

    TODO

    附录

    1. 创建同步fetch线程
      ReplicaFetcherManager.createFetcherThread(int, BrokerEndPoint) line: 30
      ReplicaFetcherManager(AbstractFetcherManager).kafka$server$AbstractFetcherManager$$addAndStartFetcherThread$1(BrokerAndFetcherId, BrokerIdAndFetcherId) line: 80
      AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(Tuple2<BrokerAndFetcherId,Map<TopicPartition,BrokerAndInitialOffset>>) line: 94
      AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(Object) line: 85
      TraversableLike$WithFilter$$anonfun$foreach$1.apply(A) line: 733
      Map$Map1<A,B>.foreach(Function1<Tuple2<A,B>,U>) line: 116
      TraversableLike$WithFilter.foreach(Function1<A,U>) line: 732
      ReplicaFetcherManager(AbstractFetcherManager).addFetcherForPartitions(Map<TopicPartition,BrokerAndInitialOffset>) line: 85
      ReplicaManager.makeFollowers(int, int, Map<Partition,PartitionState>, int, Map<TopicPartition,Errors>) line: 1272
      ReplicaManager.becomeLeaderOrFollower(int, LeaderAndIsrRequest, Function2<Iterable,Iterable,BoxedUnit>) line: 1065
      KafkaApis.handleLeaderAndIsrRequest(RequestChannel$Request) line: 173
      KafkaApis.handle(RequestChannel$Request) line: 103
      KafkaRequestHandler.run() line: 65
      KafkaThread(Thread).run() line: 748

    2. 构建请求
      ReplicaFetcherThread.buildFetchRequest(Seq<Tuple2<TopicPartition,PartitionFetchState>>) line: 234
      AbstractFetcherThread$$anonfun$2.apply() line: 104
      AbstractFetcherThread$$anonfun$2.apply() line: 103
      CoreUtils$.inLock(Lock, Function0) line: 217
      ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 103
      ReplicaFetcherThread(ShutdownableThread).run() line: 64




    看的过程中的临时记录:

    [2019-09-25 18:07:13,787] INFO Handling request:RequestHeader(apiKey=OFFSET_FOR_LEADER_EPOCH, apiVersion=0, clientId=broker-0-fetcher-0, correlationId=0) -- {topics=[{topic=test.vv19,partitions=[{partition=0,leader_epoch=25}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
    
    
    
    [2019-09-25 22:13:02,501] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [37]lso.messageOffset: [37] (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 22:13:02,502] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [37]lso.messageOffset: [37] (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 22:13:03,006] INFO testEnter0005-Received response:apikey:FETCH correlationId 24; (com.code260.ss.KafkaTestUtils$)
    
    
    
    LeaderEpochFileCache$$anonfun$assign$1.apply$mcV$sp() line: 62	
    LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61	
    LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61	
    CoreUtils$.inLock(Lock, Function0<T>) line: 217	
    CoreUtils$.inWriteLock(ReadWriteLock, Function0<T>) line: 225	
    LeaderEpochFileCache.assign(int, long) line: 60	
    Log$$anonfun$append$2$$anonfun$apply$9.apply(MutableRecordBatch) line: 689	
    Log$$anonfun$append$2$$anonfun$apply$9.apply(Object) line: 687	
    Iterator$class.foreach(Iterator, Function1) line: 891	
    Wrappers$JIteratorWrapper<A>(AbstractIterator<A>).foreach(Function1<A,U>) line: 1334	
    IterableLike$class.foreach(IterableLike, Function1) line: 72	
    Wrappers$JIterableWrapper<A>(AbstractIterable<A>).foreach(Function1<A,U>) line: 54	
    Log$$anonfun$append$2.apply() line: 687	
    Log$$anonfun$append$2.apply() line: 624	
    Log.maybeHandleIOException(Function0<String>, Function0<T>) line: 1669	
    Log.append(MemoryRecords, boolean, boolean, int) line: 624	
    Log.appendAsLeader(MemoryRecords, int, boolean) line: 597	
    Partition$$anonfun$13.apply() line: 500	
    Partition$$anonfun$13.apply() line: 488	
    CoreUtils$.inLock(Lock, Function0<T>) line: 217	
    CoreUtils$.inReadLock(ReadWriteLock, Function0<T>) line: 223	
    Partition.appendRecordsToLeader(MemoryRecords, boolean, int) line: 487	
    ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2<TopicPartition,MemoryRecords>) line: 724	
    ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) line: 708	
    TraversableLike$$anonfun$map$1.apply(A) line: 234	
    TraversableLike$$anonfun$map$1.apply(Object) line: 234	
    HashMap$$anonfun$foreach$1.apply(DefaultEntry<A,B>) line: 130	
    HashMap$$anonfun$foreach$1.apply(Object) line: 130	
    HashTable$class.foreachEntry(HashTable, Function1) line: 236	
    HashMap<A,B>.foreachEntry(Function1<DefaultEntry<A,B>,U>) line: 40	
    HashMap<A,B>.foreach(Function1<Tuple2<A,B>,U>) line: 130	
    TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234	
    HashMap<A,B>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104	
    ReplicaManager.appendToLocalLog(boolean, boolean, Map<TopicPartition,MemoryRecords>, short) line: 708	
    ReplicaManager.appendRecords(long, short, boolean, boolean, Map<TopicPartition,MemoryRecords>, Function1<Map<TopicPartition,PartitionResponse>,BoxedUnit>, Option<Lock>, Function1<Map<TopicPartition,RecordsProcessingStats>,BoxedUnit>) line: 458	
    KafkaApis.handleProduceRequest(RequestChannel$Request) line: 460	
    KafkaApis.handle(RequestChannel$Request) line: 100	
    KafkaRequestHandler.run() line: 65	
    KafkaThread(Thread).run() line: 748	
    
    
    
    
    
    LeaderEpochFileCache$$anonfun$assign$1.apply$mcV$sp() line: 62	
    LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61	
    LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61	
    CoreUtils$.inLock(Lock, Function0<T>) line: 217	
    CoreUtils$.inWriteLock(ReadWriteLock, Function0<T>) line: 225	
    LeaderEpochFileCache.assign(int, long) line: 60	
    Log$$anonfun$append$2$$anonfun$apply$9.apply(MutableRecordBatch) line: 689	
    Log$$anonfun$append$2$$anonfun$apply$9.apply(Object) line: 687	
    Iterator$class.foreach(Iterator, Function1) line: 891	
    Wrappers$JIteratorWrapper<A>(AbstractIterator<A>).foreach(Function1<A,U>) line: 1334	
    IterableLike$class.foreach(IterableLike, Function1) line: 72	
    Wrappers$JIterableWrapper<A>(AbstractIterable<A>).foreach(Function1<A,U>) line: 54	
    Log$$anonfun$append$2.apply() line: 687	
    Log$$anonfun$append$2.apply() line: 624	
    Log.maybeHandleIOException(Function0<String>, Function0<T>) line: 1669	
    Log.append(MemoryRecords, boolean, boolean, int) line: 624	
    Log.appendAsFollower(MemoryRecords) line: 607	
    ReplicaFetcherThread.processPartitionData(TopicPartition, long, ReplicaFetcherThread$PartitionData) line: 123	
    ReplicaFetcherThread.processPartitionData(TopicPartition, long, AbstractFetcherThread$PartitionData) line: 62	
    AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(PartitionFetchState) line: 184	
    AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(Object) line: 172	
    Some<A>(Option<A>).foreach(Function1<A,U>) line: 257	
    AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Tuple2<TopicPartition,PartitionData>) line: 172	
    AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Object) line: 169	
    ResizableArray$class.foreach(ResizableArray, Function1) line: 59	
    ArrayBuffer<A>.foreach(Function1<A,U>) line: 48	
    AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp() line: 169	
    AbstractFetcherThread$$anonfun$processFetchRequest$2.apply() line: 169	
    AbstractFetcherThread$$anonfun$processFetchRequest$2.apply() line: 169	
    CoreUtils$.inLock(Lock, Function0<T>) line: 217	
    ReplicaFetcherThread(AbstractFetcherThread).processFetchRequest(AbstractFetcherThread$FetchRequest) line: 167	
    ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 113	
    ReplicaFetcherThread(ShutdownableThread).run() line: 64	
    
    
    
    
    ReplicaFetcherThread.fetchEpochsFromLeader(Map<TopicPartition,Object>) line: 332	
    ReplicaFetcherThread(AbstractFetcherThread).maybeTruncate() line: 130	
    ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 102	
    ReplicaFetcherThread(ShutdownableThread).run() line: 64	 
    
      
    
    kafka.server.ReplicaManager.lastOffsetForLeaderEpoch  
      
    
    kafka.server.ReplicaFetcherThread.maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset])  
      
    
    
    kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(topicPartition: TopicPartition)  
      
    kafka.server.ReplicaFetcherThread.maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset])  
    
    
    
    
  • 相关阅读:
    SQL Server 2005 镜像构建说明(转载)
    表变量 vs. 临时表
    SQL Server日志
    MDX常见计算方法(百分比/分配/平均值/基于时间的计算)
    MDX中一些边界问题的处理
    MDX中的Where vs. Subselect/Subcube
    MDX优化Set操作
    SSIS处理导入数据时, 存在的更新, 不存在的插入(转载)
    MDX实现同比和环比
    iPhone4S出现应用无法打开时的解决方案
  • 原文地址:https://www.cnblogs.com/simoncook/p/11809447.html
Copyright © 2011-2022 走看看