在我的博文《HBase——HMaster启动之一》、《HBase——HMaster启动之二》中已经详细介绍过HMaster在启动过程中调用的各种方法。下面,单就HRegionServer在启动过程中与HMaster的交互做一下深入分析。
首先,让我们来到HRegionServer.run,由于其也是间接实现了Runnable接口。因此,在这里,就从他的run方法开始分析。对于前面的preRegistrationInitialization方法我在这里并不打算讲,因为在我的博文《HBase——HMaster启动之一》中已经详细描述过。这里,就后面的两个方法reportForDuty、handleReportForDutyResponse进行详细描述。
让我们来到reportForDuty方法中。在下图中我已经将其中的重点框选出来。其中的第一个方法是HRegionServer作为与HMaster交互的重点。我在本节将详细讲述。至于第二个方法,相信我在仔细描述第一个方法之后,大家就会感觉清晰明了。首先来到createRegionServerStatusStub方法。由于是长图,所以没有办法编辑,还请大家谅解。
首先,来到masterAddressTracker.getMasterAddress方法。这个MasterAddressTracker在HRegionServer构造时创建并启动,他所监听的ZK上的路径是/hasse/master。refresh为true,这里强制去ZK上查询HMaster的信息。然后调用了rpcClient.createBlockingRpcChannel。这里的rpcClient是在preRegistrationInitialization方法中构造的。
由于在rpcClient.createBlockingRpcChannel方法中仅仅返回了新构造的BlockingRpcChannelImplementation,因此,让我们来到BlockingRpcChannelImplementation的构造方法。这里,我特别标明了BlockingRpcChannel,这个类的完整名称是org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel。这是由于hbase中将protobuf加入了其第三方包中。其真正类名应该是com.google.protobuf.BlockingRpcChannel。不论如何,他都是PB中的一个接口。这里的BlockingRpcChannelImplementation重写了接口中的callBlockingMethod方法,也就是说,在实际调用的时候,真正调用了rpcClient.callBlockingMethod方法。
接着,我们回到HRegionServer.createRegionServerStatusStub方法中。由上面的那张长图可以看到,由rpcClient.createBlockingRpcChannel返回的对象传入了RegionServerStatusService.newBlockingStub、LockService.newBlockingStub中,构造了这两个service的本地stub。这是PB的基础用法,与HDFS中对于PB的使用方式不用。这里封装的并不是很好。接着,将这两个service赋给HRegionServer的成员变量。




然后,我们就可以回到reportForDuty方法的第二个重点方法this.rssStub.regionServerStartup。这里的rssStub就是刚刚构建的service。其在具体方法调用的时候,正是通过BlockingRpcChannelImplementation中复写的方法来实现的。
也就是说在实际调用rssStub.regionServerStartup,调用到了AbstractRpcClient.callBlockingMethod。关于调用的详细流程,我在博文《hbase之RPC调用流程简介》中有简单介绍,如果大家感觉不是很详细,可以私信我的163邮箱15935152719@163.com。我然后再写一篇博文来详述。这里我们假定这个请求已经发送到了服务端。且服务端的MasterRpcServices.regionServerStartup已经开始调用。
接下来,我们到达HMaster端的MasterRpcServices.regionServerStartup。如下图所示,这里主要调用了ServerManager.regionServerStartup。
让我们继续深入ServerManager.regionServerStartup,由于其只是调用了ServerManager.checkAndRecordNewServer方法,并没有其它重点内容,我在这里就不贴图了。而是来到ServerManager.checkAndRecordNewServer。如下图所示。findServerWithSameHostnamePortWithLock主要是查看当前已注册的RegionServer中是否有同名的ServerName,如果有,则返回ServerName,如果没有,则返回null。然后调用recordNewServerWithLock方法,将其ServerName放入成员变量onlineServers中,也就是说,该RegionServer已经实现了HMaster端的注册。接着调用已注册listeners的serverAdded方法,这些listeners都实现了ServerListener。主要有三个实现ServerListener接口的类:AssignmentManager、RSProcedureDispatcher、DrainingServerTracker。其中DrainingServerTracker是在其start方法中构造的匿名类。
至此,HMaster端实现与HRegionServer的第一次通信。接着,HRegionServer开始调用HRegionServe.handleReportForDutyResponse方法。如下图所示。一开始的for循环是为了将HMaster端返回的部分信息添加到当前的conf成员变量中。紧接着createMyEphemeralNode方法将当前RegionServer的节点信息写入ZK中,路径为/hbase/rs/~。然后调用ZNodeClearer.writeMyEphemeralNodeOnDisk将信息本地化。再然后调用了setupWALAndReplication、startReplicationService两个方法。由于setupWALAndReplication方法比较重要,我将在后面贴图详细介绍。
首先,我们来到setupWALAndReplication方法。在setupWALAndReplication方法中,主要做了两件事情,其一是构造WALFactory,另外一个就是调用createNewReplicationInstance。



在构造WALFactory过程中,主要通过反射了AsyncFSWALProvider并将其封装到SyncReplicationWALProvider中,然后调用其init方法。
在createNewReplicationInstance方法中主要通过反射方法创建了Replication,并调用了其initialize方法。并将新构建的Replication赋给了HRegionServer.replicationSourceHandler与HRegionServer.replicationSinkHandler。由于这里的Replication比较重要,我将在下面详细介绍。
接下来,让我们来到Replication.initialize方法,如下图所示。方法很长,但重点只是构造了ReplicationSourceManager。而在ReplicationSourceManager的构造方法也只是将入参封装,并构建了一个线程池。
接下来,来到handleReportForDutyResponse中第二个重要也是最重要的方法startServices。这个方法很长,几乎所有的RegionServer的服务都是直接或间接通过这个方法启动的。
首先,我们来看initializeThreads。



这里首先构造了MemStoreFlusher。在构造MemStoreFlusher时,值得我们注意的是,实例化了FlushHandler。接下来构实例化了CompactSplit。在构造CompactSplit的过程中,有值得我们关注的地方。下面我们跳过这张图,来到CompactSplit的构造方法。
来到CompactSplit的构造方法以及他在构造方法中主要调用的两个方法。在他的createCompactionExecutors方法中构造了一个StealJobQueue对象。这个类继承自PriorityBlockingQueue,并且内部有一个BlockingQueue类型的stealFromQueue成员变量。然后构建了两个线程池,分别以StealJobQueue、StealJobQueue.stealFromQueue为线程池的工作队列。

接下来构建了一个PressureAwareCompactionThroughputController,并调用了其setup方法。由于其实现了Configurable接口,所以在通过ReflectionUtils.newInstance实例化的时候,调用了其setConf方法。在调用其setup时,实例化并调用了一个ScheduledChore对象,该对象在其复写的chore方法中调用了PressureAwareCompactionThroughputController.tune。
介绍完CompactSplit的实例化后,让我们继续回到HRegionServer.initializeThreads方法中。在方法内,接着构造了CompactionChecker、PeriodicMemStoreFlusher这两个ScheduledChore。接下来构造了Leases(看到这里,你是不是很容易想到HDFS的租约系统LeaseManager)。他扩展了Thread而不是Chore,因为当有事情要做时,睡眠时间可以被中断,而不是Chore睡眠时间是不变的。

紧接着,分别调用MovedRegionsCleaner.create、nonceManager.createCleanupScheduledChore创建了两个ScheduledChore,在二者的chore方法中调用的方法分别是:regionServer.cleanMovedRegions、ServerNonceManager.cleanUpOldNonces。
然后构造了RegionServerRpcQuotaManager与RegionServerSpaceQuotaManager,最后调用了registerConfigurationObservers,将相关的ConfigurationObserver注册到ConfigurationManager中。
介绍完initializeThreads方法后,让我们回到startServices方法。
在startServices方法中接着构建了SecureBulkLoadManager并调用了其start方法。在其start方法中在hadoop上创建了/hbase.rootdir/staging目录。
接着构建了LogRoller,他的作用是定期运行以确定是否应该滚动WAL。同样,他继承了Thread而不是Chore,因为当有事情要做时,睡眠时间可以被中断,而不是Chore睡眠时间是不变的。
然后方法FlushThroughputControllerFactory.create的调用构造了NoLimitThroughputController。
接着构建了RemoteProcedureResultReporter与CompactedHFilesDischarger(继承自ScheduledChore)。
然后呢,就像HMaster在startServiceThreads方法中一样,创建了一大堆有不同用途的线程池,由ExecutorService管理(注:这里的ExecutorService是org.apache.hadoop.hbase.executor.ExecutorService)。
紧接着,调用了walRoller、cacheFlusher、procedureResultReporter线程的start方法。这里简单介绍一下cacheFlusher.start。其中cacheFlusher是我们在initializeThreads方法中构造的,他的类型是MemStoreFlusher,其成员变量flushHandlers在构造期间已经完成初始化。在这里完成了赋值与线程的启动。
再然后,启动了this.leases线程。构造并启动了SplitLogWorker。
最后,分别调用了startHeapMemoryManager、initializeMemStoreChunkCreator。关于initializeMemStoreChunkCreator方法的调用,我在HMaster启动那一节已经介绍过,这里就略过了。需要注意的是在构建ChunkCreator时创建MemStoreChunkPool都注册到了startHeapMemoryManager方法创建的hMemManager,也就是交由hMemManager管理。所以,这里只是介绍startHeapMemoryManager。如下图所示,调用HeapMemoryManager.create创建了HeapMemoryManager。然后调用HeapMemoryManager.start方法创建了HeapMemoryTunerChore并将其加入到ChoreService中调用。值得注意的是,在构建HeapMemoryTunerChore时,其内部的成员变量heapMemTuner类型为DefaultHeapMemoryTuner,并且,这个类实现了Configurable接口。当然他也是通过ReflectionUtils.newInstance方法创建的。
到此为止,我们已经介绍完了上面所有贴图。下面,让我们回到HRegionServer.run方法的后半部分。在这里,已经接近结束了。我只是简单介绍一下。

大家可能对这里的rspmHost很迷惑,这个是什么时候出现的。确实,我之前并没有介绍过,在这里补充一下。在HRegionServer.initializeZooKeeper方法的最后,构建了RegionServerProcedureManagerHost,并将其赋给了成员变量rspmHost。接着,便调用了rspmHost.loadProcedures、rspmHost.initialize。实现了他的初始调用。
来到最后的tryRegionServerReport,HRegionServer正是通过这个方法与HMaster保持心跳。
本节到这里就结束了,大家有什么不懂的地方可以私信我的邮箱15935152719@163.com。如果感觉不错,希望留下你的赞。你的肯定是小编继续前进的动力。
