zoukankan      html  css  js  c++  java
  • kafka 创建 topic

    执行 windows 脚本

    kafka-topics.bat --create --zookeeper localhost:2181/kafka-zhang --replication-factor 1 --partitions 1 --topic zhang

    命令行程序,获取 broker 元数据,计算副本分配方案,然后把 topic 的信息写入 zk

    // kafka.zk.AdminZkClient#createTopic
      def createTopic(topic: String,
                      partitions: Int,
                      replicationFactor: Int,
                      topicConfig: Properties = new Properties,
                      rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
        val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
        val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
        createOrUpdateTopicPartitionAssignmentPathInZK(topic, replicaAssignment, topicConfig)
      }

    KafkaController 监听 zk 节点的变化,并产生 TopicChange 事件 

    case object TopicChange extends ControllerEvent {
      override def state: ControllerState = ControllerState.TopicChange
    
      override def process(): Unit = {
        if (!isActive) return
        val topics = zkClient.getAllTopicsInCluster.toSet
        val newTopics = topics -- controllerContext.allTopics
        val deletedTopics = controllerContext.allTopics -- topics
        controllerContext.allTopics = topics
    
        registerPartitionModificationsHandlers(newTopics.toSeq)
        val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
        controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
          !deletedTopics.contains(p._1.topic))
        controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
        info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
          s"[$addedPartitionReplicaAssignment]")
        if (addedPartitionReplicaAssignment.nonEmpty)
          onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
      }
    }

    在 KafkaController.TopicChange#process 中触发 KafkaController#onNewPartitionCreation

    // kafka.controller.KafkaController#onNewPartitionCreation  
      private def onNewPartitionCreation(newPartitions: Set[TopicPartition]) {
        info(s"New partition creation callback for ${newPartitions.mkString(",")}")
        partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
        replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
        partitionStateMachine.handleStateChanges(newPartitions.toSeq, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
        replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
      }

    partition 状态由 NewPartition 变成 OnlinePartition,会初始化 leader 和 isr 信息,并写入 zk 中,然后发送 LeaderAndIsrRequest 请求通知各 broker。

    // kafka.controller.PartitionStateMachine#initializeLeaderAndIsrForPartitions  
      private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
        val successfulInitializations = mutable.Buffer.empty[TopicPartition]
        val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))
        val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
            val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
            partition -> liveReplicasForPartition
        }
        val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }
    
        partitionsWithoutLiveReplicas.foreach { case (partition, replicas) =>
          val failMsg = s"Controller $controllerId epoch ${controllerContext.epoch} encountered error during state change of " +
            s"partition $partition from New to Online, assigned replicas are " +
            s"[${replicas.mkString(",")}], live brokers are [${controllerContext.liveBrokerIds}]. No assigned " +
            "replica is alive."
          logFailedStateChange(partition, NewPartition, OnlinePartition, new StateChangeFailedException(failMsg))
        }
        val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>
          val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList)
          val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
          partition -> leaderIsrAndControllerEpoch
        }.toMap
        val createResponses = try {
          zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
        } catch {
          case e: Exception =>
            partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
            Seq.empty
        }
        createResponses.foreach { createResponse =>
          val code = createResponse.resultCode
          val partition = createResponse.ctx.get.asInstanceOf[TopicPartition]
          val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition)
          if (code == Code.OK) {
            controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
            controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(leaderIsrAndControllerEpoch.leaderAndIsr.isr,
              partition, leaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(partition), isNew = true)
            successfulInitializations += partition
          } else {
            logFailedStateChange(partition, NewPartition, OnlinePartition, code)
          }
        }
        successfulInitializations
      }  

    最后把 replica 状态由 NewReplica 变成 OnlineReplica,至此,topic 创建完成。

  • 相关阅读:
    吴裕雄--天生自然 python语言数据分析:开普勒系外行星搜索结果分析
    吴裕雄--天生自然 R语言数据分析:火箭发射的地点、日期/时间和结果分析
    吴裕雄--天生自然 PYTHON数据分析:基于Keras的CNN分析太空深处寻找系外行星数据
    吴裕雄--天生自然 python数据分析:基于Keras使用CNN神经网络处理手写数据集
    吴裕雄--天生自然 PYTHON数据分析:钦奈水资源管理分析
    吴裕雄--天生自然 PYTHON数据分析:医疗数据分析
    独家解密:阿里大规模数据中心性能分析
    日志服务Python消费组实战(三):实时跨域监测多日志库数据
    日志服务Python消费组实战(二):实时分发数据
    日志服务与SIEM(如Splunk)集成方案实战
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12864532.html
Copyright © 2011-2022 走看看