zoukankan      html  css  js  c++  java
  • uReplicator实现分析

    MirrorMakerWorker分析

    是整个同步机制的主入口,主要组织的逻辑有:

    • 配置数据的传入与处理,ConsumerConfig对象的构建
    • 度量对象的准备,定时上报的度量数据收集线程的定义与启动
    • CompactConsumerFetcherManager实例的创建与startConnections
    • 根据fetchNum创建KafkaConnector实例,KafkaConnector实例中会关联CompactConsumerFetcherManager实例
    • 添加Helix Controller
    • 添加优雅关闭的钩子
    • 构造producer的config producerProps
    • 根据维护KafkaConnector实例的connectorMap来创建MirrorMakerThread实例并启动
    • 通过shutdownLatch: CountDownLatch来等待关闭退出main方法

    与Helix关联的地方

    实现HelixWorkerOnlineOfflineStateModelFactory和OnlineOfflineStateModel,OnlineOfflineStateModel可以理解成是一个监听器。实例在上下线切换时可以监听到。

    helixZkManager = HelixManagerFactory.getZKHelixManager(helixClusterName, instanceId, InstanceType.PARTICIPANT, zkServer)
    val stateMachineEngine: StateMachineEngine = helixZkManager.getStateMachineEngine()
    // register the MirrorMaker worker
    val stateModelFactory = new HelixWorkerOnlineOfflineStateModelFactory(instanceId, fetchNum, connectorMap)
    stateMachineEngine.registerStateModelFactory("OnlineOffline", stateModelFactory)
    helixZkManager.connect()
    helixAdmin = helixZkManager.getClusterManagmentTool
    
    class HelixWorkerOnlineOfflineStateModelFactory(final val instanceId: String, final val fetchNum: Int,
                                                    final val connectorMap: ConcurrentHashMap[String, KafkaConnector]) extends StateModelFactory[StateModel] {
    
      override def createNewStateModel(partitionName: String) = new OnlineOfflineStateModel(instanceId, connectorMap)
    
    
      // register mm instance
      class OnlineOfflineStateModel(final val instanceId: String, final val connectors: ConcurrentHashMap[String, KafkaConnector]) extends StateModel {
    
        def onBecomeOnlineFromOffline(message: Message, context: NotificationContext) = {
          // add topic partition on the instance
          connectorMap.get(getFetcherId(message.getResourceName, message.getPartitionName.toInt)).addTopicPartition(message.getResourceName, message.getPartitionName.toInt)
        }
    
        def onBecomeOfflineFromOnline(message: Message, context: NotificationContext) = {
          // delete topic partition on the instance
          connectorMap.get(getFetcherId(message.getResourceName, message.getPartitionName.toInt)).deleteTopicPartition(message.getResourceName, message.getPartitionName.toInt)
        }
    
        def onBecomeDroppedFromOffline(message: Message, context: NotificationContext) = {
          // do nothing
        }
    
        private def getFetcherId(topic: String, partitionId: Int): String = {
          "" + Utils.abs(31 * topic.hashCode() + partitionId) % fetchNum
        }
      }
    
    }
    
    

    run方法逻辑

    • 通过KafkaConnector拿到KafkaStream,通过KafkaStream拿到ConsumerIterator
    • 在没有关闭时,一直迭代ConsumerIterator
    • 拿到迭代器中的数据,就是取到的消息(为什么迭代器中能一直有消息,因为这样反推iter-->KafkaStream-->KafkaConnector+Queue-->PartitionTopicInfo-->fetcherManager.partitionAddMap-->fetcherManager.partitionInfoMap-->fetcherManager.createFetcherThread-->CompactConsumerFetcherThread.partitionInfoMap-->CompactConsumerFetcherThread.processPartitionData-->CompactConsumerFetcherThread.doWork-->ShutdownableThread.run//spin)
    • 经过MirrorMakerMessageHandler处理消息形成ProducerRecord数组实例,主要是分区对齐
    • 用producer发到目标集群
    • 用maybeFlushAndCommitOffsets方法flush并提交offset
    • 真正commit offset的动作由自行实现的KafkaConnector完成,记录在ZK上,提交是定时提交

    CompactConsumerFetcherThread分析

    概述

    CompactConsumerFetcherThread是继承自Kafka提供的ShutdownableThreadShutdownableThread内部会在isRunning标志位ok的情况下以spin的形式一直调用doWork方法。

      override def run(): Unit = {
        info("Starting ")
        try{
          while(isRunning.get()){
            doWork()
          }
        } catch{
          case e: Throwable =>
            if(isRunning.get())
              error("Error due to ", e)
        }
        shutdownLatch.countDown()
        info("Stopped ")
      }
    

    doWork方法分析

    • 锁定partitionMapLock
    • 锁定updateMapLock
    • 将partitionAddMap中的数据放到partitionMap,然后清空partitionAddMap
    • 将partitionDeleteMap中的数据从partitionMap中移除并移除fetcherLagStats中对应的stat,然后清空partitionDeleteMap
    • 迭代partitionMap将需要拉取的topic、partition、fetchoffset、fetchsize等信息加入fetchRequestBuilder
    • 用fetchRequestBuilder构造出FetchRequest实例
    • 如果fetchRequest.requestInfo.isEmpty是空的,那么等待fetchBackOffMs
    • 对于两次拉取间隔是否过大做日志输出(DUMP_INTERVAL_MS = 5 * 60 * 1000)
    • processFetchRequest 处理拉的请求

    processFetchRequest方法分析

    当doWork方法准备好了FetchRequest实例就要靠processFetchRequest方法来拉数据给partitionInfoMap中的PartitionTopicInfo实例中的队列了。简单过程如下:

    • 迭代响应中的每条数据,按每个分区维度处理
    • 拿到消息
    • 根据拿到的消息算出下一次的new offset,并更新到partitionMap中
    • 更新度量信息,计算堆积
    • 将取到的消息在PartitionTopicInfo实例中放入队列。 PartitionTopicInfo实例的队列来自Connect中的构造KafkaStream实例时传递的同一个队列。 这样能打通连接器和stream
  • 相关阅读:
    内置函数
    Day19 列表生成器、迭代器&生成器
    Day18 高阶函数
    Day17 装饰器
    searchBar 隐藏
    iOS UITableView UIScrollView 的支持触摸事件
    手势冲突UIPanGestureRecognizer 和UIPinchGestureRecognizer
    navigationController 之间的切换
    searchDisplayController 时引起的数组越界
    在适配iPhone 6 Plus屏幕的时候,模拟器上两边有很细的白边如何解决
  • 原文地址:https://www.cnblogs.com/simoncook/p/10953177.html
Copyright © 2011-2022 走看看