zoukankan      html  css  js  c++  java
  • 消息写入

    消息写入

    生产消息时的rpc请求日志

    leader节点

    [2019-09-25 19:40:22,266] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8855) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=35,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:22,266] INFO testEnter0006-replica:0 newLogEndOffset:35 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:22,267] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:22,610] INFO Handling request:RequestHeader(apiKey=METADATA, apiVersion=5, clientId=producer-1, correlationId=34) -- {topics=[test.vv19],allow_auto_topic_creation=true} from connection 172.16.113.38:9094-172.16.113.38:60308-1;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
    
    
    ********************************
    [2019-09-25 19:40:22,769] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8856) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=35,log_start_offset=0,max_bytes=1048576}]}]} from connection 
    ********************************
    
    
    172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:22,769] INFO testEnter0006-replica:0 newLogEndOffset:35 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:22,769] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)
    
    ********************************
    [2019-09-25 19:40:22,773] INFO Handling request:RequestHeader(apiKey=PRODUCE, apiVersion=5, clientId=producer-1, correlationId=35) -- {acks=-1,timeout=30000,partitionSizes=[test.vv19-0=79]} from connection 172.16.113.38:9094-172.16.113.38:60308-1;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:22,834] INFO Updated PartitionLeaderEpoch. New: {epoch:27, offset:35}, Current: {epoch:25, offset34} for Partition: test.vv19-0. Cache now contains 11 entries. (kafka.server.epoch.LeaderEpochFileCache)
    ********************************
    
    
    
    ********************************
    [2019-09-25 19:40:23,313] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765], 36 [0 : 2844]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)
    ********************************
    
    
    [2019-09-25 19:40:23,400] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8857) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=36,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:23,400] INFO testEnter0006-replica:0 newLogEndOffset:36 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$)
    
    
    ********************************
    [2019-09-25 19:40:23,401] INFO testEnter0007-maybeIncrementLeaderHW:Set(36 [0 : 2844]) newHighWatermark:36 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)
    ********************************
    
    
    [2019-09-25 19:40:23,922] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8858) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=36,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:23,923] INFO testEnter0006-replica:0 newLogEndOffset:36 oldLogEndOffsetMetadata 36; (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:23,923] INFO testEnter0007-maybeIncrementLeaderHW:Set(36 [0 : 2844]) newHighWatermark:36 oldHighWatermark 36; (com.code260.ss.KafkaTestUtils$)
    

    更新leader上维护的follower的LEO testEnter0006 时的调用栈
    也是在处理follower发布过来的fetch请求时更新

    Replica.logEndOffset_$eq(LogOffsetMetadata) line: 98	
    Replica.updateLogReadResult(LogReadResult) line: 83	
    Partition.updateReplicaLogReadResult(Replica, LogReadResult) line: 276	
    ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Tuple2<TopicPartition,LogReadResult>) line: 1314	
    ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Object) line: 1308	
    TraversableLike$$anonfun$map$1.apply(A) line: 234	
    TraversableLike$$anonfun$map$1.apply(Object) line: 234	
    ResizableArray$class.foreach(ResizableArray, Function1) line: 59	
    ArrayBuffer<A>.foreach(Function1<A,U>) line: 48	
    TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234	
    ArrayBuffer<A>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104	
    ReplicaManager.updateFollowerLogReadResults(int, Seq<Tuple2<TopicPartition,LogReadResult>>) line: 1308	
    ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 799	
    ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803	
    KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597	
    

    更新leader的HW testEnter0007-maybeIncrementLeaderHW 调用栈
    在处理follower发布过来的fetch请求时更新

    Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Replica, long) line: 396	
    Partition$$anonfun$maybeExpandIsr$1.apply$mcZ$sp() line: 325	
    Partition$$anonfun$maybeExpandIsr$1.apply() line: 309	
    Partition$$anonfun$maybeExpandIsr$1.apply() line: 309	
    CoreUtils$.inLock(Lock, Function0<T>) line: 217	
    CoreUtils$.inWriteLock(ReadWriteLock, Function0<T>) line: 225	
    Partition.maybeExpandIsr(int, LogReadResult) line: 307	
    Partition.updateReplicaLogReadResult(Replica, LogReadResult) line: 283	
    ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Tuple2<TopicPartition,LogReadResult>) line: 1314	
    ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Object) line: 1308	
    TraversableLike$$anonfun$map$1.apply(A) line: 234	
    TraversableLike$$anonfun$map$1.apply(Object) line: 234	
    ResizableArray$class.foreach(ResizableArray, Function1) line: 59	
    ArrayBuffer<A>.foreach(Function1<A,U>) line: 48	
    TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234	
    ArrayBuffer<A>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104	
    ReplicaManager.updateFollowerLogReadResults(int, Seq<Tuple2<TopicPartition,LogReadResult>>) line: 1308	
    ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 799	
    ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803	
    KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597	
    
    
    

    Follower节点

    [2019-09-25 19:40:22,767] INFO testEnter0005-Received response:apikey:FETCH correlationId 8855; (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:22,767] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:22,768] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
    
    
    ********************************
    [2019-09-25 19:40:23,316] INFO testEnter0005-Received response:apikey:FETCH correlationId 8856; (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:23,369] INFO Updated PartitionLeaderEpoch. New: {epoch:27, offset:35}, Current: {epoch:25, offset34} for Partition: test.vv19-0. Cache now contains 11 entries. (kafka.server.epoch.LeaderEpochFileCache)
    [2019-09-25 19:40:23,396] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:23,397] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
    ********************************
    
    
    
    ********************************
    [2019-09-25 19:40:23,920] INFO testEnter0005-Received response:apikey:FETCH correlationId 8857; (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:23,921] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:23,922] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$)
    ********************************
    
    
    
    [2019-09-25 19:40:24,426] INFO testEnter0005-Received response:apikey:FETCH correlationId 8858; (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:24,427] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$)
    [2019-09-25 19:40:24,428] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$)
    

    请求处理

    handleProduceRequest ProduceRequest
    ReplicaManager.appendRecords

    • timeout: Long // 来自请求体
    • requiredAcks: Short // 来自请求体
    • internalTopicsAllowed: Boolean // request.header.clientId是否是__admin_client
    • isFromClient: Boolean // 固定送true
    • entriesPerPartition: Map[TopicPartition, MemoryRecords]
    • responseCallback: Map[TopicPartition, ProduceResponse.PartitionResponse] => Unit
    • delayedProduceLock: Option[Lock] // 未送
    • processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit

    主要写日志逻辑:
    kafka.log.Log.append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int) 623行 要细读

    消息写入时:Log LogSegment FileRecords MeomoryRecords File LogOffsetMetadata 之间的联系

    一条新的消息的offset是怎么产生的?每次append消息后会更新下一次的offset:

     // increment the log end offset
    updateLogEndOffset(appendInfo.lastOffset + 1)  
    

    leader节点在处理生产消息请求时对相应的offset的处理

    KafkaApis handleProduceRequest
    ReplicaManager appendRecords
    ReplicaManager appendToLocalLog
    processingStatsCallback // KafkaApis def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit
    responseCallback // KafkaApis def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse])
    ProduceRequest clearPartitionRecords

    ReplicaManager appendToLocalLog
    生产消息的度量数据收集(为写入速度做准备),全局的和topic粒度的。
    check是否是向内部topic发送消息的
    消息写入leader partition.appendRecordsToLeader
    val info = log.appendAsLeader
    replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
    (info, maybeIncrementLeaderHW(leaderReplica))
    更新firstOffset lastOffset numAppendedMessages
    生产消息的度量数据收集(条数和消息大小),全局的和topic粒度的。

    Log.append
    分配消息的val offset = new LongRef(nextOffsetMetadata.messageOffset)
    更新 firstOffset appendInfo.firstOffset = offset.value
    更新 lastOffset appendInfo.lastOffset = offset.value - 1
    如果有消息校验不同过 收集拒掉的消息的度量数据
    更新leader epoch对应的offset(只有当epoch发生改变时才更新,而且更新是直接flush到磁盘且用FD的sync强制落盘,fileOutputStream.getFD().sync()) leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset) 并 LeaderEpochCheckpoint 进行flush
    segment.append
    producerAppendInfo.maybeCacheTxnFirstOffsetMetadata
    producerStateManager.update(producerAppendInfo)
    事务消息的一些idx的处理
    producerStateManager.updateMapEndOffset
    updateLogEndOffset
    updateFirstUnstableOffset
    按需flush()
    返回appendInfo

    processingStatsCallback
    更新produceMessageConversionsRate度量数据conversionCount topic粒度和全局的,conversionCount是指高版本格式消息向低版本格式消息的转换,转换逻辑在org.apache.kafka.common.record.AbstractRecords.downConvert(Iterable<? extends RecordBatch>, byte, long, Time),调用发起是在org.apache.kafka.clients.producer.internals.Sender.sendProduceRequest(long, int, short, int, List)

    responseCallback
    是否有错误,有的话写日志;Throttle处理;

    Partition.maybeIncrementLeaderHW分析

  • 相关阅读:
    [转载]解析.Net框架下的XML编程技术
    Intercept and Manage Windows Originated by Thirdparty Components Hosted in C# Application
    [创业]<高效能人士的七个习惯>有感
    [关于DES程序]对前DES1.0源码bug的修正
    The secret benefit of search engine optimisation: Increased usability
    效率不高的7个原因
    [PDF]Intrusion Detection Techniques and Approaches
    [转载]基于数据挖掘的入侵检测系统
    [转载]在 C# 中加载自己编写的动态链接库
    [转载]在WinForm中使用Web Services 来实现 软件 自动升级( Auto Update ) (C#)
  • 原文地址:https://www.cnblogs.com/simoncook/p/11809443.html
Copyright © 2011-2022 走看看