zoukankan      html  css  js  c++  java
  • Spark工作中遇到的问题

    1、java: 找不到符号

    map(o->o._2)处提示找不到符号

    SparkSession spark = SparkSession.builder().appName(appName).getOrCreate();
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
    JavaRDD<ObjectArrayWritable> rdd = jsc.newAPIHadoopFile(dir, SequenceFileInputFormat.class, ObjectArrayWritable.class, ObjectArrayWritable.class, hdConf).map(o->o._2);

    使用sequenceFile代替newAPIHadoopFile

    或者将原先代码分为两行

    JavaPairRDD<ObjectArrayWritable,ObjectArrayWritable> pairRDD = jsc.newAPIHadoopFile(dir, SequenceFileInputFormat.class, ObjectArrayWritable.class, ObjectArrayWritable.class, hdConf);
    JavaRDD<ObjectArrayWritable> rdd = pairRDD.map(o->o._2);

     

    2、java.lang.ClassCastException: java.util.Arrays$ArrayList cannot be cast to scala.collection.Seq

    原因:在创建Row时突发奇想把java的List当做元素放入

    List<Object> list = new ArrayList<>();
    Row r = RowFactory.create(list);

    然后在shuffle后取出时报错

    Row r = ...
    List<Object> list = r.getList(0);

    看了下scala的代码发现,虽然放入时是按照java类型放入的;但是使用getList取出时是按照scala的seq序列取出,所以导致类型转换异常

    根据源码看到getAs方法,将getList替换为getAs,这次没有报ClassCastException;但是出现另外一个错误!

    Caused by: java.lang.UnsupportedOperationException

    原因是因为我获取到List后是没有具体子类实现的,所以在调用addAll时,最后使用了AbstractList的add方法

    创建新的list,将getAs的list遍历放入新的list。

     

    3、spark Container killed on request. Exit code is 143

    很大可能是由于物理内存达到限制,导致container被kill掉报错。

    粗暴简单的解决方式,增加executor内存大小

    spark.executor.memory 4g

    再增加内存后依然不行,查看application的log日志发现(yarn logs -applicationId …)

    ERROR RetryingBlockFetcher: Exception while beginning fetch of 5 outstanding blocks (after 2 retries) java.io.IOException: Failed to connect to 主机:端口

    原来是某个executor挂了,某个exetutor想要fetch数据(应该是shuffle read),但那个有数据的executor挂了,导致fetch失败

    shuffle分为shuffle write和shuffle read两部分。

    shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。

    shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。

    如果shuffle read的量很大,那么将会导致一个task需要处理的数据非常大,从而导致JVM crash以及取shuffle数据失败,最后executor也丢失了,看到Failed to connect to host的错误(executor lost)或者造成长时间的gc。

     

    解决方案:

    (a) 减少shuffle数据和操作 思考是否可以使用map side join或是broadcast join来规避shuffle的产生。 将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

    (b) 控制分区数 对于SparkSQL和DataFrame的join,group by等操作 通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。 对于Rdd的join,groupBy,reduceByKey等操作 通过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。

    (c)提高executor的内存 通过spark.executor.memory适当提高executor的memory值。

    (d)增加并行task的数目 通过增加并行task的数目,从而减小每个task的数据量。(spark.default.parallelism)

    (e)查看是否存在数据倾斜的问题 是否存在某个key数据特别大导致倾斜?如果存在可以单独处理或者考虑改变数据分区规则。

  • 相关阅读:
    Code Forces Gym 100886J Sockets(二分)
    CSU 1092 Barricade
    CodeChef Mahesh and his lost array
    CodeChef Gcd Queries
    CodeChef GCD2
    CodeChef Sereja and LCM(矩阵快速幂)
    CodeChef Sereja and GCD
    CodeChef Little Elephant and Balance
    CodeChef Count Substrings
    hdu 4001 To Miss Our Children Time( sort + DP )
  • 原文地址:https://www.cnblogs.com/java-meng/p/15189266.html
Copyright © 2011-2022 走看看