zoukankan      html  css  js  c++  java
  • spark Lost executor on YARN

    执行脚本出现:

    15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on myhost1.com: remote Rpc client disassociated
    15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on myhost2.com: remote Rpc client disassociated

    出现此的原因就是yarn资源不够导致的。加大任务执行的资源

    ./spark-submit --class com.xyz.MySpark --conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=512M" --driver-java-options -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client --executor-memory 2G --executor-cores 8 --num-executors 12  /home/myuser/myspark-1.0.jar
    改成

    ./spark-submit --class com.xyz.MySpark --conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=1024M" --driver-java-options -XX:MaxPermSize=1024m --driver-memory 4g --master yarn-client --executor-memory 2G --executor-cores 8 --num-executors 15  /home/myuser/myspark-1.0.jar


    Spark job运行参数优化


    一般Spark Job很多问题都是来源于系统资源不够用,通过监控日志等判断是内存资源占用过高等导致的问题,因此尝试通过配置参数的方法来解决。 

    1、--conf spark.akka.frameSize=100
    此参数控制Spark中通信消息的最大容量(如task的输出结果),默认为10M,当处理大数据时,task的输出可能会大于这个值,需要根据实际数据设置一个更高的值。 

    2、--conf spark.yarn.executor.memoryOverhead=4096
    executor堆外内存设置,如果程序使用了大量的堆外内存,就该增大此配置。



    1. Lost executor on YARN ALS iterations

    debasish83 Q:

    During the 4th ALS iteration, I am noticing that one of the executor gets 
    disconnected: 
    
    14/08/19 23:40:00 ERROR network.ConnectionManager: Corresponding 
    SendingConnectionManagerId not found 
    
    14/08/19 23:40:00 INFO cluster.YarnClientSchedulerBackend: Executor 5 
    disconnected, so removing it 
    
    14/08/19 23:40:00 ERROR cluster.YarnClientClusterScheduler: Lost executor 5 
    on tblpmidn42adv-hdp.tdc.vzwcorp.com: remote Akka client disassociated 
    
    14/08/19 23:40:00 INFO scheduler.DAGScheduler: Executor lost: 5 (epoch 12) 
    Any idea if this is a bug related to akka on YARN ? 
    
    I am using master 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    ps:ALS(alternating leastsquares):交替最小二乘法

    Xiangrui Meng A: 
    我们知道Container被YARN killed是因为它使用了比它要求的更多的内存,但是这个问题的根源还没找到。

    We know that the 
    container got killed by YARN because it used much more memory that it requested. 
    But we haven't figured out the root cause yet. 
    • 1
    • 2
    • 3
    • 1
    • 2
    • 3

    debasish83 Q: 
    我可以用YARN1.0或1.1reproduce,所以这应该是和YARN版本相关的问题。 
    至少对我来说,现在可以的是使用standalone模式。 
    Sandy Ryza A: 
    解决方法是增加 Spark.yarn.executor.memoryOverhead直到这个错误消失。该配置控制JVM堆大小与从YARN得到的内存大小之间的缓冲(JVM可以占用内存超出他们的堆大小)。你还应该确保,在YARN NodeManager(节点管理器)配置中,yarn.nodemanager.vmem-check-enabled设置为false

    The fix is to raise spark.yarn.executor.memoryOverhead until this goes away.  
    This controls the buffer between the JVM heap size and the amount of 
    memory requested from YARN (JVMs can take up memory beyond their heap size).
    You should also make sure that, in the YARN NodeManager 
    configuration, yarn.nodemanager.vmem-check-enabled is set to false. 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 1
    • 2
    • 3
    • 4
    • 5

    debasish83 Q: 
    我在spark-defaults.confspark.yarn.executor.memoryOverhead 1024,但是没有在webUI->environment中看到spark properties 中的environment variable.它需要在spark-env.sh中设置吗 
    Sandy Ryza A: 
    当前情况就是增加 
    spark.yarn.executor.memoryOverhead直到job停止失败。我们确实有计划尝试自动缩放此基础上的内存量 
    要求,但它仍然只是一个启发。 
    debasish83 Q: 
    如果我使用40个execotor,内存16GB,对100M(1亿)x10M(1千万)的大矩阵,即数十亿评分,典型的spark.yarn.executor.memoryOverhead是什么呢? 
    Sandy Ryza A: 
    我预计2GB就够了,更不要说16GB(unless ALS is using a bunch of off-heap memory?)你之前提到说在县城中property没有在environment选项中显示,你确定它生效了吗?如果生效了它会在environment中出现的。

    2. Getting error in Spark: Executor lost

    Q: 
    一个master和2个slaves, 每个的RAM是32GB,读取一个大小为18million records的csv文件(第一行是列名)

    ./spark-submit --master yarn --deploy-mode client --executor-memory 10g <path/to/.py file>
    
    rdd = sc.textFile("<path/to/file>")
    h = rdd.first()
    header_rdd = rdd.map(lambda 1: h in l)
    data_rdd = rdd.substract(header_rdd)
    data_rdd.first()
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    错误信息:

    15/10/12 13:52:03 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
     ApplicationMaster has disassociated: 192.168.1.114:51058
    15/10/12 13:52:03 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
     ApplicationMaster has disassociated: 192.168.1.114:51058
    15/10/12 13:52:03 WARN remote.ReliableDeliverySupervisor: 
     Association with remote system [akka.tcp://sparkYarnAM@192.168.1.114:51058] has failed,
     address is now gated for [5000] ms. Reason: [Disassociated]
    15/10/12 13:52:03 ERROR cluster.YarnScheduler: Lost executor 1 on hslave2:
     remote Rpc client disassociated
    15/10/12 13:52:03 INFO scheduler.TaskSetManager: Re-queueing tasks for 1 from TaskSet 3.0
    15/10/12 13:52:03 WARN remote.ReliableDeliverySupervisor:
     Association with remote system [akka.tcp://sparkExecutor@hslave2:58555] has failed, 
     address is now gated for [5000] ms. Reason: [Disassociated]
    15/10/12 13:52:03 WARN scheduler.TaskSetManager:
     Lost task 6.6 in stage 3.0 (TID 208, hslave2): ExecutorLostFailure (executor 1 lost)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    该错误是在运行rdd.substract()时产生的。然后我改变了代码,删除了rdd.substract(),用rdd.filter()代替:

    rdd = sc.textFile("<path/to/file>")
    h = rdd.first()
    data_rdd = rdd.filter(lambda l: h not in l)
    • 1
    • 2
    • 3
    • 1
    • 2
    • 3

    得到相同的错误

    A: 
    这不是Spark的bug,应该适合你的Java,Yarn和Spark-config文件的配置有关。 
    你可以增加Java内存,增加akka的framesize和timeout设置等等。

    sapark-defaults.conf:
    spark.master                       yarn-cluster
    spark.yarn.historyServer.address   <your cluster url>
    spark.eventLog.enabled             true
    spark.eventLog.dir                 hdfs://<your history directory>
    spark.driver.extraJavaOptions      -Xmx20480m -XX:MaxPermSize=2048m XX:ReservedCodeCacheSize=2048m
    spark.checkpointDir                hdfs://<your checkpoint directory>
    yarn.log-aggregation-enable        true
    spark.shuffle.service.enabled      true
    spark.shuffle.service.port         7337
    spark.shuffle.consolidateFiles     true
    spark.sql.parquet.binaryAsString   true
    spark.speculation                  false
    spark.yarn.maxAppAttempts          1
    spark.akka.askTimeout              1000
    spark.akka.timeout                 1000
    spark.akka.frameSize               1000
    spark.rdd.compress true
    spark.storage.memoryFraction 1
    spark.core.connection.ack.wait.timeout 600
    spark.driver.maxResultSize         0
    spark.task.maxFailures             20
    spark.shuffle.io.maxRetries        20
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    你可能还想设置需要多少partitions在你的Spark程序里,也许想要增加一些partitionBy(partitioner)语句到你的RDD中,所以你的代码也许是这样的:

    myPartitioner = new HashPartitioner(<your number of partitions>)
    
    rdd = sc.textFile("<path/to/file>").partitionBy(myPartitioner)
    h = rdd.first()
    header_rdd = rdd.map(lambda l: h in l)
    data_rdd = rdd.subtract(header_rdd)
    data_rdd.first()
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    最后,你也许需要设置你的spark-submitcommand并增加参数:executor的数量,executor memory 和driver memory

    ./spark-submit 
    --master yarn 
    --deploy-mode client 
    --num-executors 100 
    --driver-memory 20G 
    --executor-memory 10g 
    <path/to/.py file>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3. spark job运行参数优化

    一般Spark Job很多问题都是来源于系统资源不够用,通过监控日志等判断是内存资源占用过高等导致的问题,因此尝试通过配置参数的方法来解决。 
    1.

    --conf spark.akka.frameSize=100
    
    • 1
    • 2
    • 1
    • 2

    此参数控制Spark中通信消息的最大容量(如task的输出结果),默认为10M,当处理大数据时,task的输出可能会大于这个值,需要根据实际数据设置一个更高的值。 
    2.

    --conf spark.shuffle.manager=SORT
    • 1
    • 1

    Spark默认的shuffle采用Hash模式,在Hash模式下,每一次shuffle会生成M*R数量的文件(M:Map的数目,R:Reduce的数目),当Map和Reduce的数目较大时,会产生相当规模的恩建,与此同时带来了大量的内存开销。为降低系统资源,可以采用Sort模式,只产生M数量的文件,但运行时间加长。 
    3.

    --conf spark.yarn.executor.memoryOverhead=4096
    • 1
    • 1

    executor堆外内存设置,如果程序使用了大量的堆外内存,就该增大此配置。

    正因为当初对未来做了太多的憧憬,所以对现在的自己尤其失望。生命中曾经有过的所有灿烂,终究都需要用寂寞来偿还。
  • 相关阅读:
    Django创建超级用户出现错误
    如何创建单例设计模式
    运行Spark-shell,解决Unable to load native-hadoop library for your platform
    在linux上安装spark详细步骤
    Spark源码编译,官网学习
    linux安装httpd,做文件服务器
    在linux上安装Scala详细步骤
    hadoop运行wordcount实例,hdfs简单操作
    hadoop-2.6.0源码编译问题汇总
    hadoop-2.6.0源码编译
  • 原文地址:https://www.cnblogs.com/candlia/p/11920332.html
Copyright © 2011-2022 走看看