zoukankan      html  css  js  c++  java
  • 【Kafka源码】broker被选为controller之后的连锁反应


    今天我们主要分析下broker被选为controller之后,主要干了什么。门面代码先列出来:

    def onControllerFailover() {
    	if (isRunning) {
    		info("Broker %d starting become controller state transition".format(config.brokerId))
    		//read controller epoch from zk
    		readControllerEpochFromZookeeper()
    		// increment the controller epoch
    		incrementControllerEpoch(zkUtils.zkClient)
    		// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
    		registerReassignedPartitionsListener()
    		registerIsrChangeNotificationListener()
    		registerPreferredReplicaElectionListener()
    		partitionStateMachine.registerListeners()
    		replicaStateMachine.registerListeners()
    		initializeControllerContext()
    		replicaStateMachine.startup()
    		partitionStateMachine.startup()
    		// register the partition change listeners for all existing topics on failover
    		controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
    		info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
    		brokerState.newState(RunningAsController)
    		maybeTriggerPartitionReassignment()
    		maybeTriggerPreferredReplicaElection()
    		/* send partition leadership info to all live brokers */
    		sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
    		if (config.autoLeaderRebalanceEnable) {
    			info("starting the partition rebalance scheduler")
    			autoRebalanceScheduler.startup()
    			autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
    				5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
    		}
    		deleteTopicManager.start()
    	}
    	else
    		info("Controller has been shut down, aborting startup/failover")
    }
    

    一个门面,涉及到的监听器和其他内容比较多,我们一一分析。

    一、controller epoch

    首先从zk的节点/controller_epoch下获取之前的epoch,然后将其+1后持久化到zk中。

    二、注册监听器

    这块就是订阅zk的节点信息,如果节点信息有变化,会做出一些操作。

    2.1 registerReassignedPartitionsListener

    private def registerReassignedPartitionsListener() = {
    	zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
    }
    

    这块订阅的路径是:/admin/reassign_partitions,表示的是分区的重新分配。如果有变化,会有下面的操作:

    /**
    	* Starts the partition reassignment process unless -
    	* 1. Partition previously existed
    	* 2. New replicas are the same as existing replicas
    	* 3. Any replica in the new set of replicas are dead
    	* If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned
    	* partitions.
    	*/
    class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
    	this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
    	val zkUtils = controller.controllerContext.zkUtils
    	val controllerContext = controller.controllerContext
    
    	/**
    	* Invoked when some partitions are reassigned by the admin command
    	* @throws Exception On any error.
    	*/
    	@throws(classOf[Exception])
    	def handleDataChange(dataPath: String, data: Object) {
    		debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
    			.format(dataPath, data))
    		//解析zk节点上的数据
    		val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString)
    		//获取需要重新分配的分区列表
    		val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
    			partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
    		}
    		partitionsToBeReassigned.foreach { partitionToBeReassigned =>
    			inLock(controllerContext.controllerLock) {
    			//首先判断topic是否正在等待被删除,如果是,就把相关的分区从列表中删除
    				if (controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
    					error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
    						.format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
    					controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
    				} else {//进行重分配
    					val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
    					controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
    				}
    			}
    		}
    	}
    
    	/**
    		* Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
    		*
    		* @throws Exception
    		* On any error.
    		*/
    	@throws(classOf[Exception])
    	def handleDataDeleted(dataPath: String) {
    	}
    }
    

    下面我们具体看下重新分配的过程,也就是initiateReassignReplicasForTopicPartition里面做了什么。

    2.1.1 initiateReassignReplicasForTopicPartition

    def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,reassignedPartitionContext: ReassignedPartitionsContext) {
    	val newReplicas = reassignedPartitionContext.newReplicas
    	val topic = topicAndPartition.topic
    	val partition = topicAndPartition.partition
    	//获取存活的replica
    	val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
    	try {
    		val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
    		assignedReplicasOpt match {
    			case Some(assignedReplicas) =>
    				if (assignedReplicas == newReplicas) {
    					throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
    						" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
    				} else {
    					if (aliveNewReplicas == newReplicas) {
    						info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
    						// first register ISR change listener 监听ISR变化
    						watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
    					controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
    						// mark topic ineligible for deletion for the partitions being reassigned
    						deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
    						onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
    					} else {
    						// some replica in RAR is not alive. Fail partition reassignment
    						throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
    							" %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
    							"Failing partition reassignment")
    					}
    				}
    			case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
    				.format(topicAndPartition))
    		}
    	} catch {
    		case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
    			// remove the partition from the admin path to unblock the admin client
    			removePartitionFromReassignedPartitions(topicAndPartition)
    	}
    }
    

    我们分析到代码watchIsrChangesForReassignedPartition时,发现里面定义的数据监听之后,其实也是调用了onPartitionReassignment,所以我们之间看下onPartitionReassignment,这是重新分配的重点。

    2.1.2 onPartitionReassignment

    这个方法由重新分区监听器触发,当admin触发时,它首先创建/admin/reassign_partitions路径,以触发zk监听器。分区重新分配会经历下面几步:

    RAR = Reassigned replicas 重新分配的副本

    OAR = Original list of replicas for partition,分区最初的副本列表

    AR = current assigned replicas:当前分配的副本

    • 1、通过OAR + RAR更新zk中的AR
    • 2、发送LeaderAndIsr请求给AR中的每个副本,我们这样做的目的是强制更新zk中的controller epoch。
    • 3、将RAR-OAR中副本状态变为新副本状态NewReplica,启动新副本
    • 4、等待RAR中所有副本与leader同步
    • 5、将RAR中所有的副本设置为OnlineReplica状态
    • 6、设置AR到RAR的内存中
    • 7、如果leader不在RAR中,从RAR中选举一个leader。如果需要选举,需要发送LeaderAndIsr请求。如果不是,那么controller epoch会自增,然后发送LeaderAndIsr请求。在任何情况下,都要保证AR=RAR。防止出现leader把RAR-OAR中的副本加到isr中。
    • 8、把OAR-RAR中的副本设为OfflineReplica状态。当OfflineReplica状态变化时,我们会移除zk中ISR的OAR-RAR部分,然后发送LeaderAndIsr给leader,通知他ISR的缩减。然后,我们把OAR-RAR的副本状态改为StopReplica。
    • 9、将OAR-RAR中所有的副本状态改为StopReplica。这会物理删除这些副本。
    • 10、使用RAR更新ZK中的AR
    • 11、更新zk节点/admin/reassign_partitions,删除对应的分区
    • 12、选举完成后,副本和isr信息变化了。重新发送更新源数据的请求给每个broker。

    整个过程比较绕,需要仔细理解下,下面是一个简单的过程,可以参考。

    * For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
    * may go through the following transition.
    * AR                 leader/isr
    * {1,2,3}            1/{1,2,3}           (initial state)
    * {1,2,3,4,5,6}      1/{1,2,3}           (step 2)
    * {1,2,3,4,5,6}      1/{1,2,3,4,5,6}     (step 4)
    * {1,2,3,4,5,6}      4/{1,2,3,4,5,6}     (step 7)
    * {1,2,3,4,5,6}      4/{4,5,6}           (step 8)
    * {4,5,6}            4/{4,5,6}           (step 10)
    *
    * Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
    * This way, if the controller crashes before that step, we can still recover.
    

    2.2 registerIsrChangeNotificationListener

    注册路径/isr_change_notification监听器。

    /**
    * Called when leader intimates of isr change
    *
    * @param controller
    */
    class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
    
    override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = {
    	import scala.collection.JavaConverters._
    
    	inLock(controller.controllerContext.controllerLock) {
    		debug("[IsrChangeNotificationListener] Fired!!!")
    		val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
    		try {
    			val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
    			if (topicAndPartitions.nonEmpty) {
    				controller.updateLeaderAndIsrCache(topicAndPartitions)
    				processUpdateNotifications(topicAndPartitions)
    			}
    		} finally {
    			// delete processed children
    			childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath(
    				ZkUtils.IsrChangeNotificationPath + "/" + x))
    		}
    	}
    }
    

    主要是更新下leader和isr的缓存,主要是controller的epoch,然后发送更新源数据的请求。

    2.3 registerPreferredReplicaElectionListener

    监听/admin/preferred_replica_election路径的数据,preferred replica在leader挂掉的情况下,会直接被选为leader,也就是就是assigned replicas列表中的第一个replica。

    三、分区和副本状态机

    3.1 注册分区状态机监听器

    首先是分区状态机,分区的状态有以下几个:

    • NonExistentPartition,分区不存在,他的前一个状态只能是OfflinePartition
    • NewPartition:新分区,还没有选出leader,前一个状态为NonExistentPartition
    • OnlinePartition:分区上线,leader已经选举出来了,前一个状态为NewPartition/OfflinePartition
    • OfflinePartition:分区下线,前一个状态为NewPartition/OnlinePartition
    // register topic and partition change listeners
    def registerListeners() {
      registerTopicChangeListener()
      if(controller.config.deleteTopicEnable)
        registerDeleteTopicListener()
    }
    

    监听/brokers/topics路径数据变化,如果允许删除topic的话,监听/admin/delete_topics路径数据变化。

    下面我们看下两个监听背后的动作。

    3.1.1 registerTopicChangeListener

    这块主要处理了/brokers/topics路径下一些topic的变化,包括新增和删除的后续操作。

    /**
     * This is the zookeeper listener that triggers all the state transitions for a partition
     */
    class TopicChangeListener extends IZkChildListener with Logging {
      this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "
    
    @throws(classOf[Exception])
    def handleChildChange(parentPath : String, children : java.util.List[String]) {
      inLock(controllerContext.controllerLock) {
        if (hasStarted.get) {
          try {
            val currentChildren = {
              import JavaConversions._
              debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
              (children: Buffer[String]).toSet
            }
            val newTopics = currentChildren -- controllerContext.allTopics
            val deletedTopics = controllerContext.allTopics -- currentChildren
            controllerContext.allTopics = currentChildren
    
            val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
            controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
              !deletedTopics.contains(p._1.topic))
            controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
            info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
              deletedTopics, addedPartitionReplicaAssignment))
            if(newTopics.size > 0)
              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
          } catch {
            case e: Throwable => error("Error while handling new topic", e )
          }
        }
      }
    }
    }
    

    3.1.2 registerDeleteTopicListener

    监听zk节点,把需要删除的topic放到待删除队列中,然后由kafka执行删除,主要删除的是zk下面相关的节点,和日志文件。

    3.2 注册副本状态机监听器

    副本状态机,有以下几种状态:

    • NewReplica:controller在重新分区时会创建新副本,这个状态下,只能收到成为follower的请求,前一个状态是NonExistentReplica。
    • OnlineReplica:副本启动后的状态,这个状态下,他可以收到成为leader或follower的请求。前一个状态可以是NewReplica, OnlineReplica or OfflineReplica。
    • OfflineReplica:分区挂掉后的状态,前一个状态为NewReplica, OnlineReplica
    • ReplicaDeletionStarted:副本删除开始时的状态,前一个状态为OfflineReplica
    • ReplicaDeletionSuccessful:副本响应删除请求时没有错误码,这时候的状态,前一个状态为ReplicaDeletionStarted
    • ReplicaDeletionIneligible:副本删除失败的状态,前一个状态为ReplicaDeletionStarted
    • NonExistentReplica:副本删除成功后的状态,前一个状态为ReplicaDeletionSuccessful。

    3.2.1 registerBrokerChangeListener

    监听/brokers/ids路径下的节点变化。主要是broker是否有新增或者删除,然后做对应的操作。

    /**
     * This is the zookeeper listener that triggers all the state transitions for a replica
     */
    class BrokerChangeListener() extends IZkChildListener with Logging {
      this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
      def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
        info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
        inLock(controllerContext.controllerLock) {
          if (hasStarted.get) {
            ControllerStats.leaderElectionTimer.time {
              try {
                val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
                val curBrokerIds = curBrokers.map(_.id)
                val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
                val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
                val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
                val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
                controllerContext.liveBrokers = curBrokers
                val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
                val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
                val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
                info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
                .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
                newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
                deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
                if(newBrokerIds.size > 0)
                  controller.onBrokerStartup(newBrokerIdsSorted)
                if(deadBrokerIds.size > 0)
                  controller.onBrokerFailure(deadBrokerIdsSorted)
            } catch {
                case e: Throwable => error("Error while handling broker changes", e)
            }
            }
        }
        }
    }
    }
    

    3.3 初始化controller上下文

    这块主要获取了一些原始数据,包括topic、分区等等,然后启动了一些管理器。

    private def initializeControllerContext() {
    	// update controller cache with delete topic information
    	//存活的brokerId列表
    	controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster().toSet
    	//所有的topic
    	controllerContext.allTopics = zkUtils.getAllTopics().toSet
    	//所有topic的分区信息
    	controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)
    	//分区的leader信息
    	controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
    	//已经挂掉的broker列表,默认为空
    	controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
    	// update the leader and isr cache for all existing partitions from Zookeeper
    	updateLeaderAndIsrCache()
    	// start the channel manager
    	startChannelManager()
    	initializePreferredReplicaElection()
    	initializePartitionReassignment()
    	initializeTopicDeletion()
    	info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
    	info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
    	info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
    }
    

    前面几行已经有了注释,也比较清楚,下面我们从startChannelManager开始。这个ChannelManager是什么?其实就是用于leader与各个broker通信的通道。这个manager也就是管理这些请求的管理器。

    这里主要处理几种请求:

    • LEADER_AND_ISR
    • STOP_REPLICA
    • UPDATE_METADATA_KEY

    这个通道启动完成后,就是初始化三个动作:

    • initializePreferredReplicaElection
    • initializePartitionReassignment
    • initializeTopicDeletion

    3.4 副本状态机监听器启动

    也就是replicaStateMachine.startup()。这个方法通过读取zk中的分区信息,把所有的副本状态改为OnlineReplica。

    /**
     * Invoked on successful controller election. First registers a broker change listener since that triggers all
     * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.
     * Then triggers the OnlineReplica state change for all replicas.
     */
    def startup() {
      // initialize replica state
      initializeReplicaState()
      // set started flag
      hasStarted.set(true)
      // move all Online replicas to Online
      handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
      info("Started replica state machine with initial state -> " + replicaState.toString())
    }
    

    3.5 分区状态机监听器启动

    类似于副本状态机监听器,这个也是初始化了分区的状态,然后把分区的状态变为OnlineState。

    /**
     * Invoked on successful controller election. First registers a topic change listener since that triggers all
     * state transitions for partitions. Initializes the state of partitions by reading from zookeeper. Then triggers
     * the OnlinePartition state change for all new or offline partitions.
     */
    def startup() {
      // initialize partition state
      initializePartitionState()
      // set started flag
      hasStarted.set(true)
      // try to move partitions to online state
      triggerOnlinePartitionStateChange()
    
      info("Started partition state machine with initial state -> " + partitionState.toString())
    }
    

    3.6 自动负载定时器

    如果开启了auto.leader.rebalance.enable参数,那么就会启动分区负载定时器。配置中可以设置leader.imbalance.check.interval.seconds参数,表示定时检查的时间间隔,单位为秒。

    if (config.autoLeaderRebalanceEnable) {
    	info("starting the partition rebalance scheduler")
    	autoRebalanceScheduler.startup()
    	autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
    		5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
    }
    

    我们可以着重看下checkAndTriggerPartitionRebalance方法。

    private def checkAndTriggerPartitionRebalance(): Unit = {
    	if (isActive()) {
    		trace("checking need to trigger partition rebalance")
    		// get all the active brokers
    		var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
    		inLock(controllerContext.controllerLock) {
    			preferredReplicasForTopicsByBrokers =
    				controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
    					case (topicAndPartition, assignedReplicas) => assignedReplicas.head
    				}
    		}
    		debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
    		// for each broker, check if a preferred replica election needs to be triggered
    		preferredReplicasForTopicsByBrokers.foreach {
    			case (leaderBroker, topicAndPartitionsForBroker) => {
    				var imbalanceRatio: Double = 0
    				var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
    				inLock(controllerContext.controllerLock) {
    					topicsNotInPreferredReplica =
    						topicAndPartitionsForBroker.filter {
    							case (topicPartition, replicas) => {
    								controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
    									controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
    							}
    						}
    					debug("topics not in preferred replica " + topicsNotInPreferredReplica)
    					val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
    					val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
    					imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
    					trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
    				}
    				// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
    				// that need to be on this broker
    				if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
    					topicsNotInPreferredReplica.foreach {
    						case (topicPartition, replicas) => {
    							inLock(controllerContext.controllerLock) {
    								// do this check only if the broker is live and there are no partitions being reassigned currently
    								// and preferred replica election is not in progress
    								if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
    									controllerContext.partitionsBeingReassigned.size == 0 &&
    									controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
    									!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
    									controllerContext.allTopics.contains(topicPartition.topic)) {
    									onPreferredReplicaElection(Set(topicPartition), true)
    								}
    							}
    						}
    					}
    				}
    			}
    		}
    	}
    }
    }
    

    3.7 启动删除topic进程

    如果允许程序自动删除topic的话(delete.topic.enable=true),那么就会启动这个进程。

  • 相关阅读:
    linux 安装jdk
    linux 安装 Redis
    jvisualvm监控远程jvm的两种连接方式
    list转map(JDK8-Lambda表达式)
    循环删除list中的某一元素的三种方式
    SpringBoot-@ControllerAdvice 拦截异常并统一处理
    Spring-@ControllerAdvice 拦截异常并统一处理
    Tomcat系列
    thinkPHP5.1模型User设计
    uniapp后台api设计(微信user表)
  • 原文地址:https://www.cnblogs.com/f-zhao/p/7826965.html
Copyright © 2011-2022 走看看