- Yarn UI 页面显示的 NM 数量达不到 10k 规模;
- 客户端提交的 APP 一直处于 Accept 状态,无法分配资源。
- patch:https://issues.apache.org/jira/browse/YARN-9173
- patch:https://issues.apache.org/jira/browse/YARN-8833
- patch:https://issues.apache.org/jira/browse/YARN-7560
2020-07-27 19:35:23,793 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Size of scheduler event-queue is 3000 2020-07-27 19:35:23,824 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Size of scheduler event-queue is 4000 2020-07-27 19:35:23,853 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Size of scheduler event-queue is 5000 2020-07-27 19:35:23,881 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Size of scheduler event-queue is 6000 2020-07-27 19:35:23,910 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Size of scheduler event-queue is 7000
那这个日志信息是哪里打印的呢?查看源码,是在 ResourceManager#SchedulerEventDispatcher 内部类的 handle() 方法打印的。这个方法是做什么的呢?在 RM 初始化时,会对不同的事件调度器(RMNode 事件、RMAppEvent 事件、RMAppAttemptEvent 事件等)进行注册,在收到对应事件的事件请求(比如 RMNode 事件的 NODE_ADD、NODE_UPDATE 事件) 时,就会将请求转发到这里的 handle() 方法。
@Override public void handle(SchedulerEvent event) { try { int qSize = eventQueue.size(); ClusterMetrics.getMetrics().setRMSchedulerEventQueueSize(qSize); if (qSize != 0 && qSize % 1000 == 0 && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; LOG.info("Size of scheduler event-queue is " + qSize); } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { LOG.info("Very low remaining capacity on scheduler event queue: " + remCapacity); } this.eventQueue.put(event); } catch (InterruptedException e) { LOG.info("Interrupted. Trying to exit gracefully."); } }
从代码可以看出所有的事件请求都会添加到 eventQueue 阻塞队列中,那队列中的事件是在哪里取出来以及如何调用的呢?为解决这两个疑问,我们在取事件的逻辑中尝试打印了一些日志,代码如下。RM 在初始化时会初始化 SchedulerEventDispatcher 调度器对象,并创建一个独立的 EventProcessor 线程,该线程逻辑是个 while 循环,不停的从阻塞队列 eventQueue 中取事件,并转发到对应的事件调度器。
// 初始化 eventProcessor 线程 public SchedulerEventDispatcher(ResourceScheduler scheduler) { super(SchedulerEventDispatcher.class.getName()); this.scheduler = scheduler; this.eventProcessor = new Thread(new EventProcessor()); this.eventProcessor.setName("ResourceManager Event Processor"); } // 启动 eventProcessor 线程 protected void serviceStart() throws Exception { this.eventProcessor.start(); super.serviceStart(); } // eventProcessor 线程的具体逻辑 private final class EventProcessor implements Runnable { @Override public void run() { SchedulerEvent event; while (!stopped && !Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); } catch (InterruptedException e) { LOG.error("Returning, interrupted : " + e); return; // TODO: Kill RM. } try { LOG.info("[-----]ResourceManager scheduler event start. scheduler event class=" + event.getClass() + ". scheduler event type=" + event.getType() + ". " + "current eventQueue.size=" + eventQueue.size()); //打印eventQueue队列take出来的事件 scheduler.handle(event); LOG.info("[-----]ResourceManager scheduler event end"); // 判断handle()方法是否正常结束 } catch (Throwable t) { LOG.info("[-----]take event throwable"); // An error occurred, but we are shutting down anyway. // If it was an InterruptedException, the very act of // shutdown could have caused it and is probably harmless. if (stopped) { LOG.warn("Exception during shutdown: ", t); break; } LOG.fatal("Error in handling event type " + event.getType() + " to the scheduler", t); if (shouldExitOnError && !ShutdownHookManager.get().isShutdownInProgress()) { LOG.info("Exiting, bbye.."); System.exit(-1); } } } } }
大概清楚了事件调度器处理事件的流程,我们也在处理流程中添加了日志打印,最终发现 RM hang 住(资源注册失败、APP 一直处理提交状态)是由于阻塞队列最后一次 take 事件后没有正常结束,后面再请求过来的事件便都堵塞在eventQueue队列中,RM不再对外提供服务,异常日志如下:
# 从eventQueue队列中取数据的正常逻辑:(最后一次take操作) 2020-07-28 17:54:00,113 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: [-----]ResourceManager scheduler event start. scheduler event class="class" org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent. scheduler event type=NODE_UPDATE. current eventQueue.size=0 (scheduler event开始) 2020-07-28 17:54:00,113 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler: [-----]nodeupdate enter lihua-slave-182-86-226-190.lemon.lan 2020-07-28 17:54:00,113 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler: [-----]nodeupdate end lihua-slave-182-86-226-190.lemon.lan 2020-07-28 17:54:00,113 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: [-----]ResourceManager scheduler event end (scheduler event结束) # 从eventQueue队列中取数据的异常日志,队列开始阻塞,不再处理事件 2020-07-28 17:54:00,114 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: [-----]eventQueue.size=0 2020-07-28 17:54:00,114 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: [-----]eventQueue put eventType=NODE_UPDATE 2020-07-28 17:54:00,114 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: [-----]ResourceManager scheduler event start. scheduler event class="class" org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent. scheduler event type=NODE_UPDATE. current eventQueue.size=0 (take事件后没有正常结束) 2020-07-28 17:54:00,114 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: [-----]eventQueue.size=0 2020-07-28 17:54:00,114 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: [-----]eventQueue put eventType=NODE_UPDATE 2020-07-28 17:54:00,115 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: [-----]eventQueue.size=1 ...... 2020-07-28 17:54:03,008 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: [-----]eventQueue.size=6003 2020-07-28 17:54:03,009 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: [-----]eventQueue put eventType=NODE_UPDATE
为什么RM取出事件后没有正常结束呢?我们就打印了RM进程的堆栈信息。
"ResourceManager Event Processor" #33 prio=5 os_prio=0 tid=0x00007eff2a034000 nid=0x165cc waiting for monitor entry [0x00007efee61fe000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.nodeUpdate(FairScheduler.java:859) - waiting to lock <0x00000004c008ea20> (a org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.handle(FairScheduler.java:1093) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.handle(FairScheduler.java:122) at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$SchedulerEventDispatcher$EventProcessor.run(ResourceManager.java:713) at java.lang.Thread.run(Thread.java:748) "FairSchedulerUpdateThread" #36 daemon prio=5 os_prio=0 tid=0x00007eff2a030800 nid=0x165ca runnable [0x00007efee6400000] java.lang.Thread.State: RUNNABLE at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl.newRecordInstance(RecordFactoryPBImpl.java:70) at org.apache.hadoop.yarn.util.Records.newRecord(Records.java:36) at org.apache.hadoop.yarn.util.resource.Resources.createResource(Resources.java:102) at org.apache.hadoop.yarn.util.resource.Resources.componentwiseMax(Resources.java:306) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue.getMaxShare(FSQueue.java:128) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.ComputeFairShares.computeShare(ComputeFairShares.java:201) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.ComputeFairShares.resourceUsedWithWeightToResourceRatio(ComputeFairShares.java:187) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.ComputeFairShares.computeSharesInternal(ComputeFairShares.java:148) (计算Fair Share时进入死循环) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.ComputeFairShares.computeShares(ComputeFairShares.java:51) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy.computeShares(FairSharePolicy.java:161) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue.updateInternal(FSParentQueue.java:90) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue.update(FSQueue.java:285) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.update(FairScheduler.java:356) - locked <0x00000004c008ea20> (a org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler) at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler$UpdateThread.run(FairScheduler.java:292)
堆栈信息可能每次都会不一样,但都会出现某个对象被锁住,锁住时代码的调用逻辑链都是一样的,也就是 ComputeFairShares#computeSharesInternal() 这段代码逻辑。这段代码是做什么的呢?其实就是不断的循环,找到一个合适的比例值 rMax,使得每个 Schedulable 对象可分配的资源总和正好超过集群当前可分配的资源 totalResource。而问题的关键也在这段代码里,resourceUsedWithWeightToResourceRatio() 方法返回值超过 INT 类型最大值,即返回了负数,导致 while 循环逻辑进入了死循环,程序无法继续往前走。
private static void computeSharesInternal( Collection<? extends Schedulable> allSchedulables, Resource totalResources, ResourceType type, boolean isSteadyShare) { ..... double rMax = 1.0; while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type) < totalResource) { rMax *= 2.0; ...... } private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, Collection<? extends Schedulable> schedulables, ResourceType type) { int resourcesTaken = 0; for (Schedulable sched : schedulables) { int share = computeShare(sched, w2rRatio, type); resourcesTaken += share; } return resourcesTaken; }
打印 resourceUsedWithWeightToResourceRatio() 方法的返回值,可以看到循环一段时间后返回了负数。
2020-07-29 17:53:34,530 INFO org.mortbay.log: [------]resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)= -2 2020-07-29 17:53:34,530 INFO org.mortbay.log: [-----]computeShare before share':= Infinity; sche.getWeight= 1.0; w2rRatio= Infinity 2020-07-29 17:53:34,530 INFO org.mortbay.log: [-----]computeShare after share':= 2.147483647E9
因此,我们将 resourceUsedWithWeightToResourceRatio() 方法的变量类型从 int 改为 long,最终问题得到解决。
private static long resourceUsedWithWeightToResourceRatio(double w2rRatio, Collection<? extends Schedulable> schedulables, ResourceType type) { long resourcesTaken = 0; for (Schedulable sched : schedulables) { long share = computeShare(sched, w2rRatio, type); resourcesTaken = safeAdd(resourcesTaken, share); if (resourcesTaken == Long.MAX_VALUE) { break; } } return resourcesTaken; }