zoukankan      html  css  js  c++  java
  • Spark工作中遇到的问题

    1、java: 找不到符号

    map(o->o._2)处提示找不到符号

    SparkSession spark = SparkSession.builder().appName(appName).getOrCreate();
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
    JavaRDD<ObjectArrayWritable> rdd = jsc.newAPIHadoopFile(dir, SequenceFileInputFormat.class, ObjectArrayWritable.class, ObjectArrayWritable.class, hdConf).map(o->o._2);

    使用sequenceFile代替newAPIHadoopFile

    或者将原先代码分为两行

    JavaPairRDD<ObjectArrayWritable,ObjectArrayWritable> pairRDD = jsc.newAPIHadoopFile(dir, SequenceFileInputFormat.class, ObjectArrayWritable.class, ObjectArrayWritable.class, hdConf);
    JavaRDD<ObjectArrayWritable> rdd = pairRDD.map(o->o._2);

     

    2、java.lang.ClassCastException: java.util.Arrays$ArrayList cannot be cast to scala.collection.Seq

    原因:在创建Row时突发奇想把java的List当做元素放入

    List<Object> list = new ArrayList<>();
    Row r = RowFactory.create(list);

    然后在shuffle后取出时报错

    Row r = ...
    List<Object> list = r.getList(0);

    看了下scala的代码发现,虽然放入时是按照java类型放入的;但是使用getList取出时是按照scala的seq序列取出,所以导致类型转换异常

    根据源码看到getAs方法,将getList替换为getAs,这次没有报ClassCastException;但是出现另外一个错误!

    Caused by: java.lang.UnsupportedOperationException

    原因是因为我获取到List后是没有具体子类实现的,所以在调用addAll时,最后使用了AbstractList的add方法

    创建新的list,将getAs的list遍历放入新的list。

     

    3、spark Container killed on request. Exit code is 143

    很大可能是由于物理内存达到限制,导致container被kill掉报错。

    粗暴简单的解决方式,增加executor内存大小

    spark.executor.memory 4g

    再增加内存后依然不行,查看application的log日志发现(yarn logs -applicationId …)

    ERROR RetryingBlockFetcher: Exception while beginning fetch of 5 outstanding blocks (after 2 retries) java.io.IOException: Failed to connect to 主机:端口

    原来是某个executor挂了,某个exetutor想要fetch数据(应该是shuffle read),但那个有数据的executor挂了,导致fetch失败

    shuffle分为shuffle write和shuffle read两部分。

    shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。

    shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。

    如果shuffle read的量很大,那么将会导致一个task需要处理的数据非常大,从而导致JVM crash以及取shuffle数据失败,最后executor也丢失了,看到Failed to connect to host的错误(executor lost)或者造成长时间的gc。

     

    解决方案:

    (a) 减少shuffle数据和操作 思考是否可以使用map side join或是broadcast join来规避shuffle的产生。 将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

    (b) 控制分区数 对于SparkSQL和DataFrame的join,group by等操作 通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。 对于Rdd的join,groupBy,reduceByKey等操作 通过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。

    (c)提高executor的内存 通过spark.executor.memory适当提高executor的memory值。

    (d)增加并行task的数目 通过增加并行task的数目,从而减小每个task的数据量。(spark.default.parallelism)

    (e)查看是否存在数据倾斜的问题 是否存在某个key数据特别大导致倾斜?如果存在可以单独处理或者考虑改变数据分区规则。

  • 相关阅读:
    Django组件——分页器
    Django与Ajax
    Python常用模块——包&跨模块代码调用
    Python常用模块——正则表达式re模块
    Python常用模块——文件复制模块shutil
    Python常用模块——hashlib加密
    git小乌龟配置
    设计模式学习(27)- MVC模式
    设计模式学习(26)- 访问者模式
    设计模式学习(25)- 模板模式
  • 原文地址:https://www.cnblogs.com/java-meng/p/15189266.html
Copyright © 2011-2022 走看看