zoukankan      html  css  js  c++  java
  • 【原创】大叔问题定位分享(7)Spark任务中Job进度卡住不动

     Spark2.1.1

    最近运行spark任务时会发现任务经常运行很久,具体job如下:

    Job Id  ▾

    Description

    Submitted

    Duration

    Stages: Succeeded/Total

    Tasks (for all stages): Succeeded/Total

    16

    (kill)treeReduce at CRFWithLBFGS.scala:160

    2018/12/03 12:39:50

    2.3 h

    0/5

    196/4723

     job中正在运行的stage如下:

    Stage Id  ▾

    Description

    Submitted

    Duration

    Tasks: Succeeded/Total

    Input

    Output

    Shuffle Read

    Shuffle Write

    60

    (kill)treeReduce at CRFWithLBFGS.scala:160+details

    2018/12/03 12:39:57

    2.3 h

    196/200

    4.5 GB

       

    1455.1 MB

     该stage中有4个task一直处于running状态,这些task的统计信息异常(Input Size / RecordsShuffle Write Size / Records均为0.0B/0),并且这4个task都位于同一个executor上:

    33

    8938

    0

    RUNNING

    PROCESS_LOCAL

    12 / $executor_server_ip

    stdout

    stderr

    2018/12/03 12:39:57

    2.3 h

     

    0.0 B / 0

     

    0.0 B / 0

     有问题的task所在的executor统计信息也有异常(Total Tasks0),该executor如下:

    12

    stdout

    stderr

    $executor_server_ip:36755

    0 ms

    0

    0

    0

    0

    0.0 B / 0

    0.0 B / 0

     此时Driver堆栈信息如下:

    "Driver" #26 prio=5 os_prio=0 tid=0x00007f163a116000 nid=0x5192 waiting on condition [0x00007f15bb9a0000]

       java.lang.Thread.State: WAITING (parking)

            at sun.misc.Unsafe.park(Native Method)

            - parking to wait for  <0x00000001a8c4f9e0> (a scala.concurrent.impl.Promise$CompletionLatch)

            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

            at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)

            at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)

            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)

            at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)

            at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)

            at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)

            at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:619)

            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)

            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1988)

            at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)

            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

            at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

            at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)

            at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1151)

            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

            at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

            at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1128)

            at org.apache.spark.rdd.RDD$$anonfun$treeReduce$1.apply(RDD.scala:1059)

            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

            at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

            at org.apache.spark.rdd.RDD.treeReduce(RDD.scala:1037)

            at breeze.optimize.CachedDiffFunction.calculate(CachedDiffFunction.scala:23)

            at breeze.optimize.LineSearch$$anon$1.calculate(LineSearch.scala:41)

            at breeze.optimize.LineSearch$$anon$1.calculate(LineSearch.scala:30)

            at breeze.optimize.StrongWolfeLineSearch.breeze$optimize$StrongWolfeLineSearch$$phi$1(StrongWolfe.scala:69)

            at breeze.optimize.StrongWolfeLineSearch$$anonfun$minimize$1.apply$mcVI$sp(StrongWolfe.scala:142)

            at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)

            at breeze.optimize.StrongWolfeLineSearch.minimize(StrongWolfe.scala:141)

            at breeze.optimize.LBFGS.determineStepSize(LBFGS.scala:78)

            at breeze.optimize.LBFGS.determineStepSize(LBFGS.scala:40)

            at breeze.optimize.FirstOrderMinimizer$$anonfun$infiniteIterations$1.apply(FirstOrderMinimizer.scala:64)

            at breeze.optimize.FirstOrderMinimizer$$anonfun$infiniteIterations$1.apply(FirstOrderMinimizer.scala:62)

            at scala.collection.Iterator$$anon$7.next(Iterator.scala:129)

            at breeze.util.IteratorImplicits$RichIterator$$anon$2.next(Implicits.scala:71)

            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)

            at scala.collection.immutable.Range.foreach(Range.scala:160)

            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)

            at app.package.AppClass.main(AppClass.scala)

            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

            at java.lang.reflect.Method.invoke(Method.java:497)

            at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)

    可见正在runJob,并且等待executor执行结果;

    有问题的executor上堆栈信息有一个可疑的thread长时间一直在running:

    "shuffle-client-5-4" #94 daemon prio=5 os_prio=0 tid=0x00007fbae0e42800 nid=0x2a3a runnable [0x00007fbae4760000]

       java.lang.Thread.State: RUNNABLE

            at io.netty.util.Recycler$Stack.scavengeSome(Recycler.java:476)

            at io.netty.util.Recycler$Stack.scavenge(Recycler.java:454)

            at io.netty.util.Recycler$Stack.pop(Recycler.java:435)

            at io.netty.util.Recycler.get(Recycler.java:144)

            at io.netty.buffer.PooledUnsafeDirectByteBuf.newInstance(PooledUnsafeDirectByteBuf.java:39)

            at io.netty.buffer.PoolArena$DirectArena.newByteBuf(PoolArena.java:727)

            at io.netty.buffer.PoolArena.allocate(PoolArena.java:140)

            at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)

            at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)

            at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)

            at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)

            at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)

            at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)

            at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)

            at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)

            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)

            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)

            at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)

            at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)

            at java.lang.Thread.run(Thread.java:745)

    ps:出问题的executor上当时的内存资源很空闲,进程状态也正常:

    -bash-4.2$ free -m

                  total        used        free      shared  buff/cache   available

    Mem:         257676       29251        5274         517      223150      226669

    Swap:             0           0           0

    怀疑此处可能有死循环,spark2.1.1使用的netty版本是4.0.42,查看netty代码:

    io.netty.util.Recycler

            boolean scavengeSome() {
    
                WeakOrderQueue cursor = this.cursor;
    
                if (cursor == null) {
    
                    cursor = head;
    
                    if (cursor == null) {
    
                        return false;
    
                    }
    
                }
    
     
    
                boolean success = false;
    
                WeakOrderQueue prev = this.prev;
    
                do {
    
                    if (cursor.transfer(this)) {
    
                        success = true;
    
                        break;
    
                    }
    
     
    
                    WeakOrderQueue next = cursor.next;
    
                    if (cursor.owner.get() == null) {
    
                        // If the thread associated with the queue is gone, unlink it, after
    
                        // performing a volatile read to confirm there is no data left to collect.
    
                        // We never unlink the first queue, as we don't want to synchronize on updating the head.
    
                        if (cursor.hasFinalData()) {
    
                            for (;;) {
    
                                if (cursor.transfer(this)) {
    
                                    success = true;
    
                                } else {
    
                                    break;
    
                                }
    
                            }
    
                        }
    
                        if (prev != null) {
    
                            prev.next = next;
    
                        }
    
                    } else {
    
                        prev = cursor;
    
                    }
    
     
    
                    cursor = next;
    
     
    
                } while (cursor != null && !success);
    
     
    
                this.prev = prev;
    
                this.cursor = cursor;
    
                return success;
    
            }
    
     

    问题在于cursor初始化的时候没有清空prev:

                if (cursor == null) {

                    cursor = head;

    该问题在4.0.43中被修复,升级spark2.1.1中的netty到4.0.43或以上版本可以修复问题;

    官方issues位于:https://github.com/netty/netty/issues/6153

  • 相关阅读:
    Codechef EDGEST 树套树 树状数组 线段树 LCA 卡常
    BZOJ4319 cerc2008 Suffix reconstruction 字符串 SA
    Codechef STMINCUT S-T Mincut (CodeChef May Challenge 2018) kruskal
    Codeforces 316G3 Good Substrings 字符串 SAM
    Codechef CHSIGN Change the Signs(May Challenge 2018) 动态规划
    BZOJ1396 识别子串 字符串 SAM 线段树
    CodeForces 516C Drazil and Park 线段树
    CodeForces 516B Drazil and Tiles 其他
    CodeForces 516A Drazil and Factorial 动态规划
    SPOJ LCS2
  • 原文地址:https://www.cnblogs.com/barneywill/p/10060079.html
Copyright © 2011-2022 走看看