zoukankan      html  css  js  c++  java
  • kafka消费组创建和删除原理

    0.10.0.0版本的kafka的消费者和消费组已经不在zk上注册节点了,那么消费组是以什么形式存在的呢?

    1 入口

    看下kafka自带的脚本kafka-consumer-groups.sh,可见脚本调用了kafka.admin.ConsumerGroupCommand

    exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"
    

    看下ConsumerGroupCommand,从代码中可以看出新版本的kafka不支持删除消费组操作,实际上,当消费组内消费者为空的时候消费组就会被删除。

    def main(args: Array[String]) {
        // ...
        val consumerGroupService = {
          if (opts.options.has(opts.newConsumerOpt)) new KafkaConsumerGroupService(opts) // 对于新版本kafka来说调用的是KafkaConsumerGroupService
          else new ZkConsumerGroupService(opts)
        }
    
        try {
          if (opts.options.has(opts.listOpt))
            consumerGroupService.list() // 以此为例来看下消费组存在的形式
          else if (opts.options.has(opts.describeOpt))
            consumerGroupService.describe()
          else if (opts.options.has(opts.deleteOpt)) {
            consumerGroupService match {
              case service: ZkConsumerGroupService => service.delete()
              case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService")
            }
          }
        } 
        // ...
      }
    

    我们以KafkaConsumerGroupService#list为例来看下消费组存在的形式。KafkaConsumerGroupService#list用于获取所有的消费组。沿着代码一直追溯可以看到其会调用AdminClient#listAllGroups。从代码中可以看出要想获取到所有消费组,就需要遍历每个broker。而要获取某个broker上的消费组则需要发送ApiKeys.LIST_GROUPS的请求。

    def listAllGroups(): Map[Node, List[GroupOverview]] = {
        findAllBrokers.map {
          case broker =>
            broker -> { // 需要遍历每个broker
              try {
                listGroups(broker)
              } catch {
                case e: Exception =>
                  debug(s"Failed to find groups from broker ${broker}", e)
                  List[GroupOverview]()
              }
            }
        }.toMap
    }
    
    def listGroups(node: Node): List[GroupOverview] = { // 向相应broker发送请求来获取改broker上的消费组信息
        val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest())
        val response = new ListGroupsResponse(responseBody)
        Errors.forCode(response.errorCode()).maybeThrow()
        response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList
      }
    

    看下KafkaApis.scala对应的请求处理方法handleListGroupsRequest

    def handleListGroupsRequest(request: RequestChannel.Request) {
        // ... 
        
          val (error, groups) = coordinator.handleListGroups() // 关键,获取消费组列表
          val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } 
          new ListGroupsResponse(error.code, allGroups.asJava)
        }
        requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
      }
    

    顺着coordinator.handleListGroups一直往下,可以看到最终是调用GroupMetadataManager#currentGroups来获取到broker上的消费组的。到这里我们可以看出,消费组和GroupMetadataManager有关。

    def currentGroups(): Iterable[GroupMetadata] = groupsCache.values
    

    2 存在形式

    GroupMetadata表示一个消费组,MemberMetadata表示一个消费者。先放下总结的图
    group

    GroupMetadataManager有个groupsCache属性保存了该broker所管辖的消费组

    private val groupsCache = new Pool[String, GroupMetadata]
    

    看下GroupMetadata的内部属性

    private[coordinator] class GroupMetadata(val groupId: String, val protocolType: String) {
    
      private val members = new mutable.HashMap[String, MemberMetadata] // 消费组的客户端
      private var state: GroupState = Stable
      var generationId = 0 // generationId 用于reblance
      var leaderId: String = null
      var protocol: String = null
      // ... 
    }
    
    // MemberMetadata表示一个消费者
    private[coordinator] class MemberMetadata(val memberId: String,
                                              val groupId: String,
                                              val clientId: String,
                                              val clientHost: String,
                                              val sessionTimeoutMs: Int,
                                              var supportedProtocols: List[(String, Array[Byte])]) {
    
      var assignment: Array[Byte] = Array.empty[Byte] // 消费者分配到的partiton
      var awaitingJoinCallback: JoinGroupResult => Unit = null
      var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
      var latestHeartbeat: Long = -1
      var isLeaving: Boolean = false
      // ...
    }
    

    以上就是消费组及其消费者的存在形式,即存在缓存变量中,而不是持久在其他什么地方

    3 消费组的创建

    消费组是不会单独创建的,消费组的创建是在消费者第一次发送join_group请求的时候创建的。创建消费组过程也很简单,就是在GroupMetadataManager#groupsCache加入代表该消费组的GroupMetadata

    GroupCoordinator#handleJoinGroup

    def handleJoinGroup(groupId: String,
                          memberId: String,
                          clientId: String,
                          clientHost: String,
                          sessionTimeoutMs: Int,
                          protocolType: String,
                          protocols: List[(String, Array[Byte])],
                          responseCallback: JoinCallback) {
        // ...
        } else {
          var group = groupManager.getGroup(groupId)
          if (group == null) {
            if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
              responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
            } else {
              group = groupManager.addGroup(new GroupMetadata(groupId, protocolType)) // 关键,如果group为空,则添加一个group
              doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
            }
          } else {
            doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
          }
        }
      }
    

    GroupMetadataManager#addGroup

    def addGroup(group: GroupMetadata): GroupMetadata = {
        val currentGroup = groupsCache.putIfNotExists(group.groupId, group) // 加入代表该消费组的GroupMetadata
        if (currentGroup != null) {
          currentGroup
        } else {
          group
        }
      }
    

    4 消费组的删除

    在第一节ConsumerGroupCommand中我们可以知道消费组是不支持手动删除的,那么消费组是怎么删除的呢,实际上当消费组中的消费者为空的时候,消费组就会被删除。

    4.1 删除动作

    看下GroupMetadataManager#removeGroup,我先看下删除消费组都有哪些动作

    def removeGroup(group: GroupMetadata) {
        if (groupsCache.remove(group.groupId, group)) { // 从cache中移除group
            // 然后再__consumer_offsets主题中该group对应的partition写一个tombstone消息,用于压缩,这是因为__consumer_offsets不会删除,只会压缩
    
          val groupPartition = partitionFor(group.groupId) // 计算group相关联分区,默认是abs(hashcode) % 50
          val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition)
    
          // 然后将tombstone写入该partition,用于压缩
          val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
            timestamp = timestamp, magicValue = magicValue)
    
          val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
          partitionOpt.foreach { partition =>
            val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
    
            trace("Marking group %s as deleted.".format(group.groupId))
    
            try {
              partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
            } catch {
              case t: Throwable =>
                error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
              // ignore and continue
            }
          }
        }
      }
    

    由以上可以看出,删除消费组有两个动作

    1. 将cache,即(Pool[String, GroupMetadata])中的消费组移除
    2. 在__consumer_offsets中要删除消费组相关的partition中写入tombstone,而不会删除要删除消费组的相关记录

    4.2 触发删除的动作

    唯一调用GroupMetadataManager#removeGroup的地方是GroupCoordinator#onCompleteJoin,而调用GroupCoordinator#onCompleteJoin的唯一地方是DelayedJoin。

    GroupCoordinator#onCompleteJoin

    def onCompleteJoin(group: GroupMetadata) {
        // ...
            if (group.isEmpty) {
              group.transitionTo(Dead) // 先将消费组置位dead状态,然后移除
              groupManager.removeGroup(group)
              info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
            }
          }
          // ...
    }
    

    GroupCoordinator#onCompleteJoin

    private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
                                                group: GroupMetadata,
                                                sessionTimeout: Long)
      extends DelayedOperation(sessionTimeout) {
    
      override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
      override def onExpiration() = coordinator.onExpireJoin()
      override def onComplete() = coordinator.onCompleteJoin(group)
    }
    

    难道是在joinGroup操作的时候删除消费组吗?其实并不是,而是在heartbeat超时的时候删除的,即当最后一个消费者心跳超时或者说消费组内没有了消费者的时候,该消费组就对被删除。从DelayedHeartbeat开始看下

    private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                                group: GroupMetadata,
                                                member: MemberMetadata,
                                                heartbeatDeadline: Long,
                                                sessionTimeout: Long)
      extends DelayedOperation(sessionTimeout) {
      override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
      override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) // 关注这里
      override def onComplete() = coordinator.onCompleteHeartbeat()
    }
    
    def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
        group synchronized {
          if (!shouldKeepMemberAlive(member, heartbeatDeadline))
            onMemberFailure(group, member) // 关注这里
        }
      }
    }
    
    private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
        trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
        group.remove(member.memberId)
        group.currentState match {
          case Dead =>
          case Stable | AwaitingSync => maybePrepareRebalance(group) // 假设消费组有一个消费者处于Stable状态,当该消费者超时后,就会调用maybePrepareRebalance
          case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
        }
    }
    
    private def maybePrepareRebalance(group: GroupMetadata) {
        group synchronized {
          if (group.canRebalance)
            prepareRebalance(group) // 关注这里
        }
    }
    
    private def prepareRebalance(group: GroupMetadata) {
        
        if (group.is(AwaitingSync))
          resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)
    
        group.transitionTo(PreparingRebalance)
        info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
    
        val rebalanceTimeout = group.rebalanceTimeout
        val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout) // 最终DelayedJoin在这里被调用
        val groupKey = GroupKey(group.groupId)
        joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
      }
    

    由以上我们可以总结出,就是在heartbeat超时后会进行reblance操作,最终调用GroupCoordinator#prepareRebalance,这个时候如果消费组中members为空则会删除。

    5 总结

    1. 消费组只存在一个Pool[String, GroupMetadata], 并没有持久化
    2. 当第一个消费者join请求来的时候,才会创建消费组,创建消费组即在Pool[String, GroupMetadata]加入代表该消费组的GroupMetadata
    3. 不能手动删除消费组,删除消费组的时机是当最后一个消费者离开的时候,会触发heartbeat超时从而reblance将消费组删除
    4. 消费组删除涉及两个动作,一个是将消费组从Pool[String, GroupMetadata]中移除,另一个是在__consumer_offsets中写入tombstone
    5. __consumer_offsets只会压缩不会删除
  • 相关阅读:
    [ZJOI2010]数字计数
    [SCOI2009]windy数
    [Tjoi2018]数学计算
    [ZJOI2008] 骑士
    [CQOI2009] 中位数
    11.7 模拟赛
    10.31 模拟赛
    随机游走
    10.29 模拟赛
    10.28 模拟赛
  • 原文地址:https://www.cnblogs.com/set-cookie/p/9758472.html
Copyright © 2011-2022 走看看