zoukankan      html  css  js  c++  java
  • Spark on Yarn遇到的几个问题

    本文转自:http://www.cnblogs.com/Scott007/p/3889959.html

    1 概述

        Spark的on Yarn模式,其资源分配是交给Yarn的ResourceManager来进行管理的,但是目前的Spark版本,Application日志的查看,只能通过Yarn的yarn logs命令实现。

        在部署和运行Spark Application的过程中,如果不注意一些小的细节,也许会导致一些问题的出现。

    2 防火墙

        部署好Spark的包和配置文件,on yarn的两种模式都无法运行,在NodeManager端的日志都是说Connection Refused,连接不上Driver所在的客户端节点,但是客户端的80端口可以正常访问!同时,在日志中有类似信息出现:

    Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

    内存肯定是够的,但就是无法获取资源!检查防火墙,果然客户端只开启的对80端口的访问,其他都禁止了!如果你的程序在运行的时候也有类似连接被拒绝的情况,最好也是先检查下防火墙的配置!

    3 Spark Driver程序host的指定

        部署完Spark后,分别使用yarn-cluster模式和yarn-client模式运行Spark自带的计算pi的示例。

        Spark的一些配置文件除了一些基本属性外,均未做配置,结果运行的时候两种运行模式出现了不同的状况。yarn-cluster模式可以正常运行,yarn-client模式总是运行失败。查看ResourceManager、NodeManager端的日志,发现程序总是找不到ApplicationMaster,这就奇怪了!并且,客户端的Driver程序开启的端口,在NodeManager端访问被拒绝!非Spark的其他MR任务,能够正常执行。

    检查客户端配置文件,发现原来在客户端的/etc/hosts文件中,客户端的一个IP对应了多个Host,Driver程序会默认去取最后对应的那个Host,比如是hostB,但是在NodeManager端是配置的其他Host,hostA,所以导致程序无法访问。为了不影响其他的程序使用客户端的Host列表,这里在Spark配置文件spark-defaults.conf中使用属性spark.driver.host来指定yarn-client模式运行中和Yarn通信的DriverHost,此时yarn-client模式可以正常运行。

    上面配置完了之后,发现yarn-cluster模式又不能运行了!想想原因,肯定是上面那个配置参数搞的鬼,注释掉之后,yarn-cluster模式可以继续运行。原因是,yarn-cluster模式下,spark的入口函数是在客户端运行,但是Driver的其他功能是在ApplicationMaster中运行的,上面的那个配置相当于指定了ApplicationMaster的地址,实际上的ApplicationMaster在yarn-master模式下是由ResourceManager随机指定的。

    4 on  Yarn日志的查看

        测试环境下,通过yarn logs -applicationId xxx可以查看运行结束的Application的日志,但是搞到另一个环境下发现使用上述命令查看日志时,总是提示如下信息:

    Logs not available at /tmp/nm/remote/logs/hadoop/logs/application_xxx_xxx

    Log aggregation has not completed or is not enabled.

        去对应的NodeManger目录下,确实找不到日志文件。但是/tmp/nm/remote/logs却是在yarn-site.xml中指定了的目录,这个是对的,到底什么原因呢?难道是Yarn的日志聚集没有起作用?

        去NodeManager上查看对应Application的日志:

    复制代码
    2014-08-04 09:14:47,513 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Starting aggregate log-file for app application_xxx_xxx at /tmp/nm/remote/logs/spark/logs/application_xxx_xxx/hostB.tmp
    
    2014-08-04 09:14:47,525 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Uploading logs for container container_xxx_xxx_01_000007. Current good log dirs are /data/nm/log
    
    2014-08-04 09:14:47,526 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Uploading logs for container container_xxx_xxx_000001. Current good log dirs are /data/nm/log
    
    2014-08-04 09:14:47,526 INFO org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Deleting path : /data/nm/log/application_xxx_xxx
    
    2014-08-04 09:14:47,607 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Finished aggregate log-file for app application_xxx_xxx
    复制代码

        可见,日志聚集确实起作用了,但是为什么通过命令不能查看!猛然见看到日志中“/tmp/nm/remote/logs/spark/logs/ application_xxx_xxx/hostB.tmp”,日志的路径有问题,在使用yarn logs命令查看的时候,用的是hadoop用户,实际Spark Application的提交执行用的是spark用户,而yarn logs命令默认去找的是当前用户的路径,这就是查看不到日志的原因。切换到spark用户再查看,日志终于出来了!

    5 LZO相关问题

        如果在Spark中使用了LZO作为EventLog的的压缩算法等,就得实现安装好LZO这个东东,否则会出现类似于如下的异常:

    复制代码
    Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
    
     at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:134)
    
     at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:174)
    
     at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
    
     ... 66 more
    
    Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
    
     at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1680)
    
     at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:127)
    
     ... 68 more
    复制代码

    或者

    [ERROR] [2014-08-05 10:34:41 933] com.hadoop.compression.lzo.GPLNativeCodeLoader [main] (GPLNativeCodeLoader.java:36) Could not load native gpl library
    
    java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path

        解决办法就是得安装好LZO,并且在HDFS、SPARK中配置好相关的包、文件等,具体步骤见:

    http://find.searchhub.org/document/a128707a98fe4ec6

    https://github.com/twitter/hadoop-lzo/blob/master/README.md

    http://hsiamin.com/posts/2014/05/03/enable-lzo-compression-on-hadoop-pig-and-spark/

    6 Spark Hive无法访问Mysql的问题

        生产环境下,节点之间肯定是有防火墙限制的,而且Hive的元数据库Mysql,更是对请求的IP和用户等限制的严格,如果在Spark集群中使用yarn-cluster模式进行提交Spark的Application,其运行时Driver是和ApplicationMaster运行在一起,由Yarn的ResourceManager负责分配到集群中的某个NodeManager节点上,如果在Hive-site.xml中只配置了Mysql数据库而没有配置MetaStore的话,也许会遇到连接元数据库失败的问题,此时,就得看下Hive-site.xml的配置,是否Mysql的相关权限配置正确、MetaStore服务是否可以正常连接。

    7 内存溢出问题

        在Spark中使用hql方法执行hive语句时,由于其在查询过程中调用的是Hive的获取元数据信息、SQL解析,并且使用Cglib等进行序列化反序列化,中间可能产生较多的class文件,导致JVM中的持久代使用较多,如果配置不当,可能引起类似于如下的OOM问题:

    Exception in thread "Thread-2" java.lang.OutOfMemoryError: PermGen space

        原因是实际使用时,如果用的是JDK1.6版本,Server模式的持久代默认大小是64M,Client模式的持久代默认大小是32M,而Driver端进行SQL处理时,其持久代的使用可能会达到90M,导致OOM溢出,任务失败。

    解决方法就是在Spark的conf目录中的spark-defaults.conf里,增加对Driver的JVM配置,因为Driver才负责SQL的解析和元数据获取。配置如下:

    spark.driver.extraJavaOptions -XX:PermSize=128M -XX:MaxPermSize=256M   

      但是,上述情况是在yarn-cluster模式下出现,yarn-client模式运行时倒是正常的,原来在$SPARK_HOME/bin/spark-class文件中已经设置了持久代大小:

    JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"

        当以yarn-client模式运行时,driver就运行在客户端的spark-submit进程中,其JVM参数是取的spark-class文件中的设置,所谓未出现持久代溢出现象。

        总结一下Spark中各个角色的JVM参数设置:    

    (1)Driver的JVM参数:
    -Xmx,-Xms,如果是yarn-client模式,则默认读取spark-env文件中的SPARK_DRIVER_MEMORY值,-Xmx,-Xms值一样大小;如果是yarn-cluster模式,则读取的是spark-default.conf文件中的spark.driver.extraJavaOptions对应的JVM参数值。
    PermSize,如果是yarn-client模式,则是默认读取spark-class文件中的JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"值;如果是yarn-cluster模式,读取的是spark-default.conf文件中的spark.driver.extraJavaOptions对应的JVM参数值。
    GC方式,如果是yarn-client模式,默认读取的是spark-class文件中的JAVA_OPTS;如果是yarn-cluster模式,则读取的是spark-default.conf文件中的spark.driver.extraJavaOptions对应的参数值。
    以上值最后均可被spark-submit工具中的--driver-java-options参数覆盖。

    (2)Executor的JVM参数:
    -Xmx,-Xms,如果是yarn-client模式,则默认读取spark-env文件中的SPARK_EXECUTOR_MEMORY值,-Xmx,-Xms值一样大小;如果是yarn-cluster模式,则读取的是spark-default.conf文件中的spark.executor.extraJavaOptions对应的JVM参数值。
    PermSize,两种模式都是读取的是spark-default.conf文件中的spark.executor.extraJavaOptions对应的JVM参数值。
    GC方式,两种模式都是读取的是spark-default.conf文件中的spark.executor.extraJavaOptions对应的JVM参数值。

    (3)Executor数目及所占CPU个数
    如果是yarn-client模式,Executor数目由spark-env中的SPARK_EXECUTOR_INSTANCES指定,每个实例的数目由SPARK_EXECUTOR_CORES指定;如果是yarn-cluster模式,Executor的数目由spark-submit工具的--num-executors参数指定,默认是2个实例,而每个Executor使用的CPU数目由--executor-cores指定,默认为1核。
    每个Executor运行时的信息可以通过yarn logs命令查看到,类似于如下:

    14/08/13 18:12:59 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Setting up executor with commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill %p', -Xms1024m -Xmx1024m , -XX:PermSize=256M -XX:MaxPermSize=256M -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -Xloggc:/tmp/spark_gc.log, -Djava.io.tmpdir=$PWD/tmp, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler, 1, sparktest2, 3, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)

        其中,akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler表示当前的Executor进程所在节点,后面的1表示Executor编号,sparktest2表示ApplicationMaster的host,接着的3表示当前Executor所占用的CPU数目。

    8 序列化异常

    在Spark上执行hive语句的时候,出现类似于如下的异常:

    复制代码
    org.apache.spark.SparkDriverExecutionException: Execution error
        at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:849)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    Caused by: java.lang.ClassCastException: scala.collection.mutable.HashSet cannot be cast to scala.collection.mutable.BitSet
        at org.apache.spark.sql.execution.BroadcastNestedLoopJoin$$anonfun$7.apply(joins.scala:336)
        at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:813)
        at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:810)
        at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:845)
    复制代码

      排查其前后的日志,发现大都是序列化的东西:

    14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Serialized task 8.0:3 as 20849 bytes in 0 ms
    14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Finished TID 813 in 25 ms on sparktest0 (progress: 3/200)

      而在spark-default.conf中,事先设置了序列化方式为Kryo:

    spark.serializer org.apache.spark.serializer.KryoSerializer

    根据异常信息,可见是HashSet转为BitSet类型转换失败,Kryo把松散的HashSet转换为了紧凑的BitSet,把序列化方式注释掉之后,任务可以正常执行。难道Spark的Kryo序列化做的还不到位?此问题需要进一步跟踪。

    9 Executor僵死问题

        运行一个Spark任务,发现其运行速度远远慢于执行同样SQL语句的Hive的执行,甚至出现了OOM的错误,最后卡住达几小时!并且Executor进程在疯狂GC。

        截取其一Task的OOM异常信息:

     

    可以看到这是在序列化过程中发生的OOM。根据节点信息,找到对应的Executor进程,观察其Jstack信息:

    复制代码
    Thread 36169: (state = BLOCKED)
     - java.lang.Long.valueOf(long) @bci=27, line=557 (Compiled frame)
     - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=5, line=113 (Compiled frame)
     - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=103 (Compiled frame)
     - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame)
     - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=158, line=338 (Compiled frame)
     - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=293 (Compiled frame)
     - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame)
     - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
     - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
     - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame)
     - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
     - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
     - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame)
     - org.apache.spark.serializer.KryoDeserializationStream.readObject(scala.reflect.ClassTag) @bci=8, line=118 (Compiled frame)
     - org.apache.spark.serializer.DeserializationStream$$anon$1.getNext() @bci=10, line=125 (Compiled frame)
     - org.apache.spark.util.NextIterator.hasNext() @bci=16, line=71 (Compiled frame)
     - org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext() @bci=4, line=1031 (Compiled frame)
     - scala.collection.Iterator$$anon$13.hasNext() @bci=4, line=371 (Compiled frame)
     - org.apache.spark.util.CompletionIterator.hasNext() @bci=4, line=30 (Compiled frame)
     - org.apache.spark.InterruptibleIterator.hasNext() @bci=22, line=39 (Compiled frame)
     - scala.collection.Iterator$$anon$11.hasNext() @bci=4, line=327 (Compiled frame)
     - org.apache.spark.sql.execution.HashJoin$$anonfun$execute$1.apply(scala.collection.Iterator, scala.collection.Iterator) @bci=14, line=77 (Compiled frame)
     - org.apache.spark.sql.execution.HashJoin$$anonfun$execute$1.apply(java.lang.Object, java.lang.Object) @bci=9, line=71 (Interpreted frame)
     - org.apache.spark.rdd.ZippedPartitionsRDD2.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=48, line=87 (Interpreted frame)
     - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=262 (Interpreted frame)
    复制代码

    有大量的BLOCKED线程,继续观察GC信息,发现大量的FULL GC。

        分析,在插入Hive表的时候,实际上需要写HDFS,在此过程的HashJoin时,伴随着大量的Shuffle写操作,JVM的新生代不断GC,Eden Space写满了就往Survivor Space写,同时超过一定大小的数据会直接写到老生代,当新生代写满了之后,也会把老的数据搞到老生代,如果老生代空间不足了,就触发FULL GC,还是空间不够,那就OOM错误了,此时线程被Blocked,导致整个Executor处理数据的进程被卡住。

     当处理大数据的时候,如果JVM配置不当就容易引起上述问题。解决的方法就是增大Executor的使用内存,合理配置新生代和老生代的大小,可以将老生代的空间适当的调大点。

    10 小节

        问题是比较严重,Application都直接无法运行了,但是引起问题的原因都比较小,归根结底还是部署的时候环境较为复杂,不够仔细!再接再砺!以后遇到相关的问题,会再这里持续更新,方便自己,也方便遇到类似问题的朋友们!

  • 相关阅读:
    [HNOI2002]营业额统计
    HDU 1374
    HDU 3345
    HDU 2089
    Graham扫描法
    Codeforces 1144D Deduction Queries 并查集
    Codeforces 916E Jamie and Tree 线段树
    Codeforces 1167F Scalar Queries 树状数组
    Codeforces 1167E Range Deleting
    Codeforces 749E Inversions After Shuffle 树状数组 + 数学期望
  • 原文地址:https://www.cnblogs.com/saratearing/p/5810443.html
Copyright © 2011-2022 走看看