zoukankan      html  css  js  c++  java
  • 大数据基础---大数据调优汇总

    前言

    不进行优化的代码就是耍流氓。

    总体来说大数据优化主要分为三点,一是充分利用CPU,二是节省内存,三是减少网络传输。

    一、Hive/MapReduce调优

    1.1 本地模式

    Hive默认采用集群模式进行计算,如果对于小数据量,可以设置为单台机器进行计算,这样可以大大缩减查询触发任务时间。

    用户可以通过设置hive.exec.mode.local.auto 的值为true,来让Hive在适当的时候自动启动这个优化。

    set hive.exec.mode.local.auto=true; //开启本地 mr
    //设置 local mr 的最大输入数据量,当输入数据量小于这个值时采用 local mr 的方式,
    默认为 134217728,即 128M
    set hive.exec.mode.local.auto.inputbytes.max=50000000;
    //设置 local mr 的最大输入文件个数,当输入文件个数小于这个值时采用 local mr 的方
    式,默认为 4
    set hive.exec.mode.local.auto.input.files.max=10;
    

    1.2 null值过滤OR随机分配null值

    • null值过滤

      对于key值倾斜,有的时候是无效的null导致的,这个时候可以考虑过滤掉。

      hive (default)> insert overwrite table jointable 
      select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id = 
      o.id;
      
    • null值随机分配

      如果null不是异常数据,那么可以采用随机分配将null值分到不同分区,解决数据倾斜。

      insert overwrite table jointable
      select n.* from nullidtable n full join ori o on 
      case when n.id is null then concat('hive', rand()) else n.id end = o.id;
      

    1.3 Count(distinct)去重统计优化

    对于大数据量去重,可以采用分组的方式进行优化。

    hive (default)> select count(id) from (select id from bigtable group by id) a;
    

    1.4 行列过滤

    对关联表进行过滤时,可以考虑在关联时就进行过滤,提高查询时间。

    hive (default)> select b.id from bigtable b
    join (select id from ori where id <= 10 ) o on b.id = o.id;
    

    1.5 数据倾斜

    小文件合并

    set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    

    复杂文件增加Map数

    增加 map 的方法为:根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式,调整 maxSize 最大值。让 maxSize 最大值低于 blocksize 就可以增加 map 的个数。

    设置最大切片值为100个字节

    hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100;
    hive (default)> select count(*) from emp;
    

    合理设置Reduce数

    • 调整 reduce 个数方法一

      (1)每个 Reduce 处理的数据量默认是 256MB

      hive.exec.reducers.bytes.per.reducer=256000000

      (2)每个任务最大的 reduce 数,默认为 1009

      hive.exec.reducers.max=1009

      (3)计算 reducer 数的公式

    • 调整 reduce 个数方法二

      在 hadoop 的 mapred-default.xml 文件中修改,设置每个 job 的 Reduce 个数

      set mapreduce.job.reduces = 15;

    1.6 并行执行

    在共享集群中设置并发执行可以提高运行速度。

    set hive.exec.parallel=true; //打开任务并行执行
    set hive.exec.parallel.thread.number=16; //同一个 sql 允许最大并行度,默认为 8。
    

    当然,得是在系统资源比较空闲的时候才有优势,否则,没资源,并行也起不来。

    1.7 严格模式

    Hive 提供了一个严格模式,可以防止用户执行那些可能意向不到的不好的影响的查询。通过设置属性 hive.mapred.mode 值为默认是非严格模式 nonstrict 。开启严格模式需要修改 hive.mapred.mode 值为 strict,开启严格模式可以禁止 3 种类型的查询

    1).对于分区表,除非 where 语句中含有分区字段过滤条件来限制范围,否则不允许执行。

    2).对于使用了 order by 语句的查询,要求必须使用 limit 语句。

    3).限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行 JOIN 查询的时候不使用 ON 语句而是使用 where 语句,这样关系数据库的执行优化器就可以高效地将WHERE 语句转化成那个 ON 语句。

    1.8 JVM重用

    在小文件场景或者task特别多的情况下,执行时间都很短。JVM重用可以使用同一个JVM在同一个Job里面重复使用N次。N值在mapred-site.xml文件中进行配置。

    <property>
     <name>mapreduce.job.jvm.num.tasks</name>
     <value>10</value> 
     <description>How many tasks to run per jvm. If set to -1, there is
     no limit. 
     </description>
    </property>
    

    PS:value值设置-1表示重用次数不受限制。

    缺点:JVM重用会一直占用使用到的task插槽,以便进行重用,如果遇到了某个job里面的reduce task分配不均匀,导致出现某几个task占用task时间很长,其它task空闲也没法被其它job使用,只有所有的task都执行完后才会释放。

    1.9 推测执行

    对于某些耗时的任务,可以启动推测执行,这样就会把“拖后腿”的任务找出来,然后启动个备份任务执行相同的数据。最后选出执行最快的为最终结果。

    设置开启推测执行参数:Hadoop 的 mapred-site.xml 文件中进行配置:

    <property>
     <name>mapreduce.map.speculative</name>
     <value>true</value>
     <description>If true, then multiple instances of some map tasks 
     may be executed in parallel.</description>
    </property>
    <property>
     <name>mapreduce.reduce.speculative</name>
     <value>true</value>
     <description>If true, then multiple instances of some reduce tasks 
     may be executed in parallel.</description>
    </property>
    

    不过 hive 本身也提供了配置项来控制 reduce-side 的推测执行:

    <property>
     <name>hive.mapred.reduce.tasks.speculative.execution</name>
     <value>true</value>
     <description>Whether speculative execution for reducers should be turned on. 
    </description>
     </property>
    

    PS:对于时差要求很苛刻的建议关闭掉推测执行。对于执行很长的任务也不建议开启,因为会浪费很大资源。

    1.10 HDFS小文件解决方案

    1)Hadoop Archive:

    是一个高效地将小文件放入 HDFS 块中的文件存档工具,它能够将多个小文件打包成

    一个 HAR 文件,这样就减少了 namenode 的内存使用。

    2)Sequence file:

    sequence file 由一系列的二进制 key/value 组成,如果 key 为文件名,value 为文件内容,

    则可以将大批小文件合并成一个大文件。

    3)CombineFileInputFormat:

    CombineFileInputFormat 是一种新的 inputformat,用于将多个文件合并成一个单独的

    split,另外,它会考虑数据的存储位置。

    二、Spark调优

    2.1 性能监控方式

    Spark Web UI

    通过 http://master:4040我们可以获得运行中的程序信息。

    (1)stages和tasks调度情况;

    (2)RDD大小和内存使用情况;

    (3)系统环境信息;

    (4)正在执行的executor信息;

    设置历史服务器记录历史信息:

    (1)在$SPARK_HOME/conf/spark-env.sh中设置:

    export SPARK_HISTORY_OPTS="-Dspark.history.retainedApplications=50 Dspark.history.fs.logDirectory=hdfs://master01:9000/directory"

    说明:spark.history.retainedApplica-tions仅显示最近50个应用。

    spark.history.fs.logDirectory:Spark History Server页面只显示该路径下的信息。

    (2)$SPARK_HOME/conf/spark-defaults.conf

    spark.eventLog.enabled true

    spark.eventLog.dir hdfs://hadoop000:8020/directory #应用在运行过程中所有的信息均记录在该属性指定的路径下

    spark.eventLog.compress true

    (3)HistoryServer 启动

    $SPARK_HOMR/bin/start-histrory-server.sh

    (4)HistoryServer 停止

    $SPARK_HOMR/bin/stop-histrory-server.sh

    --同样executor的logs也是查看的一个出处:

    Standalone 模式:$SPARK_HOME/logs

    YARN 模式:在 yarn-site.xml 文件中配置了 YARN 日志的存放位置:yarn.nodemanager.log-dirs,或使用命令获取 yarn logs -applicationId。

    其它监控工具

    Nmon

    Jmeter

    Jprofiler

    2.2 调优要点

    内存调优要点

    1.对象占内存,优化数据结构

    (1)使用对象数组以及原始类型(primitive type)数组以替代 Java 或 者 Scala 集合类(collection class)。fastutil 库为原始数据类型提供了非常方便的集合类,且兼容 Java 标准类库。

    (2)尽可能地避免采用含有指针的嵌套数据结构来保存小对象。

    (3)考虑采用数字 ID 或者枚举类型以便替代 String 类型的主键。

    (4)如果内存少于 32GB,设置 JVM 参数-XX:+UseCom-pressedOops以便将 8 字节指针修改成 4 字节。与此同时,在 Java 7 或者更高版本,设置 JVM 参数-XX:+UseC-----ompressedStrings 以便采用 8 比特来编码每一个 ASCII 字符。

    2.频繁 GC 或者 OOM

    针对这种情况,首先要确定现象是发生在 Driver 端还是在 Executor 端,然后在分别处理。

    Driver 端:通常由于计算过大的结果集被回收到 Driver 端导致,需要调大 Driver 端的内存解决,或者进一步减少结果集的数量。

    Executor 端:

    (1)以外部数据作为输入的 Stage:可以增加 partition 的数量(即 task 的数量)来减少每个 task 要处理的数据,来减少 GC 的可能性。

    (2)以 shuffle 作为输入的 Stage:解决数据倾斜问题。

    开启推测机制

    在 spark-default.conf 中添加:spark.speculation true

    推测机制与以下几个参数有关:

    1. spark.speculation.interval 100:检测周期,单位毫秒;
    2. spark.speculation.quantile 0.75:完成 task 的百分比时启动推测;
    3. spark.speculation.multiplier 1.5:比其他的慢多少倍时启动推测。

    数据倾斜优化

    • 查找数据倾斜代码

      根据shuffler确定数据倾斜代码,然后通过随机取样找到倾斜数据。

      val sampledPairs = pairs.sample(false, 0.1)
      val sampledWordCounts = sampledPairs.countByKey()
      sampledWordCounts.foreach(println(_))
      

    缓解/消除数据倾斜

    避免数据源倾斜

    比如数据源是Kafka,通常一个分区对应一个Task,所以如果分区数据不均衡,则导致spark处理不均衡。

    比如数据源是Hive,如果Hive数据不均衡,也会导致Spark数据倾斜。

    解决方案是预处理或者其它。

    调整并行度

    比如reduceByKey(1000)。如果是group by,join需要设置参数即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说有点过小。设置完后不同的key就能分到不同的task去处理。

    将join中的shuffler避免掉

    针对一个大表一个小表的join操作,使用广播变量将较小的数据进行广播,这样就可以把join改为map操作。

    两阶段聚合

    针对RDD执行ReduceByKey等聚合shuffler算子,以及Spark Sql执行GroupByKey等聚合算子,针对数据倾斜,可以先在key前面打上随机前缀,进行聚合,然后再把前缀去掉进行聚合,有效解决值分配不均匀问题。

    示例如下:

    // 第一步,给 RDD 中的每个 key 都打上一个随机前缀。
    JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
     new PairFunction<Tuple2<Long,Long>, String, Long>() {
     private static final long serialVersionUID = 1L;
     @Override
     public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
     throws Exception {
     Random random = new Random();
     int prefix = random.nextInt(10);
     return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
     }
     });
    // 第二步,对打上随机前缀的 key 进行局部聚合。
    JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
     new Function2<Long, Long, Long>() {
     private static final long serialVersionUID = 1L;
     @Override
     public Long call(Long v1, Long v2) throws Exception {
     return v1 + v2;
     }
     });
    // 第三步,去除 RDD 中每个 key 的随机前缀。
    JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
     new PairFunction<Tuple2<String,Long>, Long, Long>() {
     private static final long serialVersionUID = 1L;
     @Override
     public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
     throws Exception {
     long originalKey = Long.valueOf(tuple._1.split("_")[1]);
     return new Tuple2<Long, Long>(originalKey, tuple._2);
     }
     });
    // 第四步,对去除了随机前缀的 RDD 进行全局聚合。
    JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
     new Function2<Long, Long, Long>() {
     private static final long serialVersionUID = 1L;
     @Override
     public Long call(Long v1, Long v2) throws Exception {
     return v1 + v2;
     }
     });
    

    两阶段聚合案例

    1. 通过如下 SQL,将 id 为 9 亿到 9.08 亿共 800 万条数据的 id 转为9500048 或者 9500096,其它数据的 id 除以 100 取整。从而该数据集中,id 为 9500048 和 9500096 的数据各 400 万,其它 id 对应的数据记录数均为 100 条。这些数据存于名为 test 的表中。
    2. 对于另外一张小表 test_new,取出 50 万条数据,并将 id(递增且唯一)除以 100 取整,使得所有 id 都对应 100 条数据。
    3. 通过如下操作,实现倾斜 Key 的分散处理:
    4. 将 leftRDD 中倾斜的 key(即 9500048 与 9500096)对应的数据单独过滤出来,且加上 1 到 24 的随机前缀,并将前缀与原数据用逗号分隔(以方便之后去掉前缀)形成单独的 leftSkewRDD。
    5. 将 rightRDD 中倾斜 key 对应的数据抽取出来,并通过 flatMap 操作将该数据集中每条数据均转换为 24 条数据(每条分别加上 1 到 24 的随机前缀),形成单独的 rightSkewRDD。
    6. 将 leftSkewRDD 与 rightSkewRDD 进行 Join,并将并行度设置为 48,且 在 Join 过 程 中 将 随 机 前 缀 去 掉 , 得 到 倾 斜 数 据集的 Join 结 果skewedJoinRDD。
    7. 将 leftRDD 中不包含倾斜 Key 的 数 据 抽 取 出 来 作 为 单 独 的leftUnSkewRDD。
    8. 对 leftUnSkewRDD 与原始的 rightRDD 进行 Join,并行度也设置为 48,得到 Join 结果 unskewedJoinRDD。
    9. 通过 union 算子将 skewedJoinRDD 与 unskewedJoinRDD 进行合并,从而得到完整的 Join 结果集。

    具体实现代码如下:

    public class SparkDataSkew{
     public static void main(String[] args) {
     int parallelism = 48;
     SparkConf sparkConf = new SparkConf();
     sparkConf.setAppName("SolveDataSkewWithRandomPrefix");
     sparkConf.set("spark.default.parallelism", parallelism + "");
     JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
     JavaPairRDD<String, String> leftRDD = 
    javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
     .mapToPair((String row) -> {
     String[] str = row.split(",");
     return new Tuple2<String, String>(str[0], str[1]);
     });
     JavaPairRDD<String, String> rightRDD = 
    javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
     .mapToPair((String row) -> {
     String[] str = row.split(",");
     return new Tuple2<String, String>(str[0], str[1]);
     });
     String[] skewedKeyArray = new String[]{"9500048", "9500096"};
     Set<String> skewedKeySet = new HashSet<String>();
     List<String> addList = new ArrayList<String>();
     for(int i = 1; i <=24; i++) {
     addList.add(i + "");
     }
     for(String key : skewedKeyArray) {
     skewedKeySet.add(key);
     }
     Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet);
     Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);
     JavaPairRDD<String, String> leftSkewRDD = leftRDD
     .filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
     .mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new 
    Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2()));
     JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> 
    tuple) -> skewedKeys.value().contains(tuple._1()))
     .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
     .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
     .collect(Collectors.toList())
     .iterator()
     );
     JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD
     .join(rightSkewRDD, parallelism)
     .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, 
    String>(tuple._1().split(",")[1], tuple._2()._2()));
     JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> 
    tuple) -> !skewedKeys.value().contains(tuple._1()));
     JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, 
    parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, 
    String>(tuple._1(), tuple._2()._2()));
     skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, 
    String>> iterator) -> {
     AtomicInteger atomicInteger = new AtomicInteger();
     iterator.forEachRemaining((Tuple2<String, String> tuple) -> 
    atomicInteger.incrementAndGet());
     });
     javaSparkContext.stop();
     javaSparkContext.close();
     } 
    }
    

    大表随机添加 N 种随机前缀,小表扩大 N 倍

    过滤少数导致倾斜的 key

    2.3 Shuffle调优

    调优概述

    代码开发,资源分配和数据倾斜是重中之重,除此之外,Shuffler作为一个补充,也需要学习下。

    shuffler相关参数调优

    • spark.shuffle.file.buffer

      默认值:32K

      参数说明:缓冲大小,超过缓冲大小才会写入磁盘。

      调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(),从而减少 shuffle write 过程中溢写磁盘文件的次数,也就可以减少磁盘 IO 次数,进而提升性能。在实践中发现,合理调节该参数,性能会有 1%~5%的提升。

    • spark.reducer.maxSizeInFlight

      默认值:48m

      参数说明:这个 buffer 缓冲决定了每次能够拉取多少数据。

      调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如 96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

    • spark.shuffle.io.maxRetries

      默认值:3

      参数说明:拉去失败重试次数。

      调优建议:对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最大次数(比如 60 次),以避免由于 JVM 的 full gc 或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性。

    • spark.shuffle.io.retryWait

      默认值:5s

      参数说明:重试拉取数据的等待时间,默认是5s。

      调优建议:建议加大间隔时长(比如 60s),以增加 shuffle 操作的稳定性。

    • spark.shuffle.memoryFraction

      默认值:0.2

      参数说明:分配给聚合操作的内存比例,默认是20%。

    • spark.shuffle.manager

    默认值:sort

    2.4 程序开发调优

    原则一:避免创建重复的 RDD

    对同一个数据源不要建立多个RDD。

    原则二:尽可能复用同一个 RDD

    数据有包含关系的RDD能重用的就重用。

    原则三:对多次使用的 RDD 进行持久化

    每次你对RDD执行算子操作时,都会从源头处重新计算一遍,所以一般会采取持久化方式,这样就直接从内存取了。

    对多次使用的RDD进行持久化示例:

    // 如果要对一个 RDD 进行持久化,只要对这个 RDD 调用 cache()和 persist()即可。
    // 正确的做法。
    // cache()方法表示:使用非序列化的方式将 RDD 中的数据全部尝试持久化到内存中。
    // 此时再对 rdd1 执行两次算子操作时,只有在第一次执行 map 算子时,才会将这个 rdd1 从源头处计
    算一次。
    // 第二次执行 reduce 算子时,就会直接从内存中提取数据进行计算,不会重复计算一个 rdd。
    val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
    rdd1.map(...)
    rdd1.reduce(...)
    // persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
    // 比如说,StorageLevel.MEMORY_AND_DISK_SER 表示,内存充足时优先持久化到内存中,
    //内存不充足时持久化到磁盘文件中。
    // 而且其中的_SER 后缀表示,使用序列化的方式来保存 RDD 数据,此时 RDD 中的每个 partition
    //都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中
    // 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,
    //从而发生频繁 GC。
    val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
     .persist(StorageLevel.MEMORY_AND_DISK_SER)
    rdd1.map(...)
    rdd1.reduce(...)
    

    原则四:尽量避免使用 shuffle 类算子

    开发过程中,能避免则尽可能避免使用 reduceByKey、join、distinct、repartition 等会进行 shuffle 的算子,尽量使用 map 类的非shuffle 算子。这样的话,没有 shuffle 操作或者仅有较少 shuffle 操作的Spark 作业,可以大大减少性能开销。

    Broadcast 与 map 进行 join 代码示例:

    // 传统的 join 操作会导致 shuffle 操作。
    // 因为两个 RDD 中,相同的 key 都需要通过网络拉取到一个节点上,由一个 task 进行 join 操作。
    val rdd3 = rdd1.join(rdd2)
    // Broadcast+map 的 join 操作,不会导致 shuffle 操作。
    // 使用 Broadcast 将一个数据量较小的 RDD 作为广播变量。
    val rdd2Data = rdd2.collect()
    val rdd2DataBroadcast = sc.broadcast(rdd2Data)
    // 在 rdd1.map 算子中,可以从 rdd2DataBroadcast 中,获取 rdd2 的所有数据。
    // 然后进行遍历,如果发现 rdd2 中某条数据的 key 与 rdd1 的当前数据的 key 是相同的,
    //那么就判定可以进行 join。
    // 此时就可以根据自己需要的方式,将 rdd1 当前数据与 rdd2 中可以连接的数据,
    //拼接在一起(String 或 Tuple)。
    val rdd3 = rdd1.map(rdd2DataBroadcast...)
    // 注意,以上操作,建议仅仅在 rdd2 的数据量比较少(比如几百 M,或者一两 G)的情况下使用。
    // 因为每个 Executor 的内存中,都会驻留一份 rdd2 的全量数据。
    

    原则五:使用 map-side 预聚合的 shuffle 操作

    如果因为业务需要,一定要使用 shuffle 操作,无法用 map 类的算子来替代,那么尽量使用可以 map-side 预聚合的算子。

    使用reduceByKey,aggregateByKey代替groupByKey,因为reduceByKey和aggregateByKey会进行预聚合,groupByKey不会。

    原则六:使用高性能的算子

    使用 reduceByKey/aggregateByKey 替代 groupByKey,详情见“原则五:使用 map-side 预聚合的 shuffle 操作”。

    使用 mapPartitions 替代普通 map。

    使用 filter 之后进行 coalesce 操作。

    使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作。

    原则七:广播大变量

    有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景,那么此时就应该使用 Spark的广播(Broadcast)功能来提升性能。因为如果不使用广播变量,那么每个任务会拉取数据并创建一个副本,这样会大大增加网络开销,并占用系统内存。如果使用广播变量的话,数据就会保留一份。

    广播大变量代码示例:

    // 以下代码在算子函数中,使用了外部的变量。
    // 此时没有做任何特殊操作,每个 task 都会有一份 list1 的副本。
    val list1 = ...
    rdd1.map(list1...)
    // 以下代码将 list1 封装成了 Broadcast 类型的广播变量。
    // 在算子函数中,使用广播变量时,首先会判断当前 task 所在 Executor 内存中,是否有变量副本。
    // 如果有则直接使用;如果没有则从 Driver 或者其他 Executor 节点上远程拉取一份放到本地 Executor
    内存中。
    // 每个 Executor 内存中,就只会驻留一份广播变量副本。
    val list1 = ...
    val list1Broadcast = sc.broadcast(list1)
    rdd1.map(list1Broadcast...)
    

    原则八:使用 Kryo 优化序列化性能

    代码示例:

    // 创建 SparkConf 对象。
    val conf = new SparkConf().setMaster(...).setAppName(...)
    // 设置序列化器为 KryoSerializer。
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    // 注册要序列化的自定义类型。
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
    

    原则九:优化数据结构

    Java 中,有三种类型比较耗费内存:

    1.对象。 2.集合类型,比如HashMap,LinedList等。3.字符串,每个字符串内部都有一个字符数组以及长度等额外信息。

    2.5 运行资源调优

    在spark-submit调节资源参数来提高资源利用率。

    • num-executors

      参数说明:设置spark作业总共用多少个executor来执行。

      参数调优建议:每个spark作业一般设置50~100个左右的Executor进程比较合适。太小不能充分利用资源,太大队列无法提供足够的资源。

    • executor-memory

      参数说明:设置每个Executor进程的内存。

      参数调优建议:每个 Executor 进程的内存设置 4G~8G 较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors 乘 以 executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的 1/3~1/2,避免你自己的 Spark 作业占用了队列所有的资源,导致别的同学的作业无法运行。

    • executor-cores

      参数说明:设置每个Executor进程CUP core数量。因为每个cpu core一个时间只能执行一个task,所以cpu core数量越多,执行速度越快。

      参数调优建议:Executor 的 CPU core 数量设置为 2~4 个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core 限制是多少,再依据设置的 Executor 数量,来决定每个 Executor进程可以分配到几个 CPU core。同样建议,如果是跟他人共享这个队列,那 么 num-executors * executor-cores 不 要 超 过 队 列 总 CPU core 的1/3~1/2 左右比较合适,也是避免影响其他同学的作业运行。

    • driver-memory

      参数说明:设置Driver进程的内存。

      参数调优建议:Driver 的内存通常来说不设置,或者设置 1G 左右应该就够了。

    • spark.default.parallelism

      参数说明:该参数用于设置每个 stage 的默认 task 数量。这个参数极为重要,如果不设置可能会直接影响你的 Spark 作业性能。

      参数调优建议:Spark 作业的默认 task 数量为 500~1000 个较为合适。很 多 同 学 常 犯 的 一 个 错 误 就 是 不 去 设 置 这 个 参 数 , 那 么 此 时 就 会 导 致Spark 自己根据底层 HDFS 的 block 数量来设置 task 的数量,默认是一个HDFS block 对应一个 task。通常来说,Spark 默认设置的数量是偏少的(比如就几十个 task),如果 task 数量偏少的话,就会导致你前面设置好的Executor 的参数都前功尽弃。试想一下,无论你的 Executor 进程有多少个,内存和 CPU 有多大,但是 task 只有 1 个或者 10 个,那么 90%的 Executor进程可能根本就没有 task 执行,也就是白白浪费了资源!因此 Spark 官网建议的设置原则是,设置该参数为 num-executors * executor-cores 的 2~3倍较为合适,比如 Executor 的总 CPU core 数量为 300 个,那么设置 1000个 task 是可以的,此时可以充分地利用 Spark 集群的资源。

    • spark.storage.memoryFraction

      参数说明:设置持久化数据在Executor占比,默认是0.6。

      根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

      参数调优建议:根据实际,可以适当提高,让数据写入内存。

    • spark.shuffle.memoryFraction

      参数说明:该参数用于设置 shuffle 过程中一个 task 拉取到上个 stage的 task 的输出后,进行聚合操作时能够使用的 Executor 内存的比例,默认是 0.2。

      参数调优建议:如果 Spark 作业中的 RDD 持久化操作较少,shuffle 操作较多时,建议降低持久化操作的内存占比,提高 shuffle 操作的内存占比比例,避免 shuffle 过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的 gc 导致运行缓慢,意味着 task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

      资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括 Spark 作业中的 shuffle 操作数量、RDD 持久化操作数量以及 spark web ui 中显示的作业 gc 情况),同时参考给出的原理以及调优建议,合理地设置上述参数。

      资源参数参考示例:

      以下是一份 spark-submit 命令的示例,大家可以参考一下,并根据自己的实际情况进行调节。

      ./bin/spark-submit 
      --master yarn-cluster 
      --num-executors 100 
      --executor-memory 6G 
      --executor-cores 4 
      --driver-memory 1G 
      --conf spark.default.parallelism=1000 
      --conf spark.storage.memoryFraction=0.5 
      --conf spark.shuffle.memoryFraction=0.3 
      

    三、Flink调优

    3.1 Backpressure调优

    • web.backpressure.cleanup-interval

      说明:当启动反压数据采集后,获取反压前等待时间,默认是60s。

    • web.backpressure.delay-between-samples:Stack Trace

      说明:抽样到确认反压状态之间的时延,默认为50ms。

    • web.backpressure.num-samples

      说明:设定Stack Trace抽样数以确定反压状态,默认为100。

    3.2 Checkpointing优化

    通过调整Checkpointing之间的时间间隔进行优化。

    val env=StreamExecutionEnvironment.getExecutionEnvironment
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(milliseconds)
    

    3.3 状态数据压缩

    目前可用的压缩算法是Snappy,设置如下:

    val env=StreamExecutionEnvironment.getExecutionEnvironment
    val config = env.getConfig
    config.setUseSnapshotCompression(true)
    

    3.4 Flink内存优化

    Flink,Spark等大数据引擎都实现了自己的内存管理,有效解决JVM内存溢出问题。

    • JobManager配置

      jobmanager.heap.size:设定JobManager堆内存大小,默认为1024MB。
      
    • TaskManager配置

      TaskManager作为Flink集群中的工作节点,所有任务的计算逻辑均执行在TaskManager之上,因此对TaskManager内存配置显得尤为重要,可以通过以下参数配置对TaskManager进行优化和调整。

      taskmanager.heap.size

      说明:设定TaskManager堆内存大小,默认值为1024M,如果在Yarn的集群中,TaskManager取决于Yarn分配给TaskManager Container的内存大小,且Yarn环境下一般会减掉一部分内存用于Container的容错。

      taskmanager.jvm-exit-on-oom

      说明:设定TaskManager是否会因为JVM发生内存溢出而停止,默认为false,当TaskManager发生内存溢出时,也不会导致TaskManager停止。

      taskmanager.memory.size

      说明:设定TaskManager内存大小,默认为0,如果不设定该值将会使用taskmanager.memory.fraction作为内存分配依据。

      taskmanager.memory.fraction

      说明:设定TaskManager堆中去除Network Buffers内存后的内存分配比例。该内存主要用于TaskManager任务排序、缓存中间结果等操作。例如,如果设定为0.8,则代表TaskManager保留80%内存用于中间结果数据的缓存,剩下20%的内存用于创建用户定义函数中的数据对象存储。注意,该参数只有在taskmanager.memory.size不设定的情况下才生效。

      taskmanager.memory.off-heap

      说明:设置是否开启堆外内存供Managed Memory或者Network Buffers使用。

      taskmanager.memory.preallocate

      说明:设置是否在启动TaskManager过程中直接分配TaskManager管理内存。

      taskmanager.numberOfTaskSlots

      说明:每个TaskManager分配的slot数量。

    3.5 设定Network内存比例

    taskmanager.network.memory.fraction

    说明:JVM中用于Network Buffers的内存比例。

    taskmanager.network.memory.min

    说明:最小的Network Buffers内存大小,默认为64MB。

    taskmanager.network.memory.max

    说明:最大的Network Buffers内存大小,默认1GB。

    taskmanager.memory.segment-size

    说明:内存管理器和Network栈使用的Buffer大小,默认为32KB。

    3.6 堆内存调优

    默认Flink使用的Parallel Scavenge的垃圾回收器,可以改用G1垃圾回收器。

    启动参数:

    env.java.opts= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:+PrintGCDetails

    • -XX:MaxGCPauseMillis:设置允许的最大GC停顿时间,默认是200ms。
    • -XX:G1HeapRegionSize:每个分区的大小,默认值会根据整个堆区的大小计算出来,范围是1M~32M,取值是2的幂,计算的倾向是尽量有2048个分区数。
    • -XX:MaxTenuringThreshold=n:晋升到老年代的“年龄”阈值,默认值为15。
    • -XX:InitiatingHeapOccupancyPercent:一般会简写IHOP,默认是45%,这个占比跟并发周期的启动相关,当空间占比达到这个值时,会启动并发周期。如果经常出现FullGC,可以调低该值,今早的回收可以减少FullGC的触发,但如果过低,则并发阶段会更加频繁,降低应用的吞吐。
    • -XX:G1NewSizePercent:年轻代最小的堆空间占比,默认是5%。
    • -XX:G1MaxNewSizePercent:年轻代最大的堆空间占比,默认是60%。
    • -XX:ConcGCThreads:并发执行的线程数,默认值接近整个应用程序数的1/4。
    • -XX:-XX:G1HeapWastePercent:允许的浪费空间的占比,默认是5%。如果并发标记可回收的空间小于5%,则不会抛出MixedGC。
    • -XX:G1MixedGCCountTarget:一次全局并发标记之后,后续最多执行的MixedGC次数。默认值是8。

    四、Hadoop调优

    4.1 HDFS参数调优

    在hdfs-site.xml里面配置:

    (1)dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为8台时,此参数设置为60

    NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。对于大集群 或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。设置该值的一般原则是将其设置为集群大小的自然对数乘以20,即20logN,N为集群大小。

    (2)编辑日志存储路径dfs.namenode.edits.dir设置与镜像文件存储路径dfs.namenode.name.dir尽量分开,达到最低写入延迟

    4.2 YARN参数调优yarn-site.xml

    (1)情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive

    面临问题:数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。

    (2)解决办法:

    内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。

    (a)yarn.nodemanager.resource.memory-mb

    表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。

    (b)yarn.scheduler.maximum-allocation-mb

    单个任务可申请的最多物理内存量,默认是8192(MB)。

    4.3 Hadoop宕机

    (1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)

    (2)如果写入文件过量造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。高峰期的时候用Kafka进行缓存,高峰期过去数据同步会自动跟上。

    五、Flume调优

    5.1内存优化

    1)问题描述:如果启动消费Flume抛出如下异常

    ERROR hdfs.HDFSEventSink: process failed
    java.lang.OutOfMemoryError: GC overhead limit exceeded
    

    2)解决方案步骤:

    (1)在flume-env.sh文件中增加如下配置

    export JAVA_OPTS="-Xms2000m -Xmx2000m -Dcom.sun.management.jmxremote"
    

    同步配置到其它服务器。

    PS:Xms代表程序启动的时候从操作系统中获取的内存数量,Xmx代表程序最大可以从操作系统中获取的内存数量。

    注意:-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

    5.2FileChannel优化

    通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

    checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据。

    5.3HDFS Sink优化

    官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval(时间间隔)、hdfs.rollSize(滚动大小)、hdfs.rollCount(event策略)。

    举例:

    hdfs.rollInterval=60(单位:秒,如果设置为0则代表禁用此策略)

    hdfs.rollSize=134217728(单位:字节,如果设置为0则代表禁用此策略)

    hdfs.rollCount =0(event数量,如果设置为0则代表禁用此策略)

    上面的设置代表:

    • 如果60秒内文件达到128M时会滚动生成正式文件。

    • 如果超过60秒未达到128M也会滚动生成正式文件。

      系列传送门

  • 相关阅读:
    Connection parameters are correct , SSL not enabled
    log4j配置文件的详解
    java.lang.IllegalArgumentException: addChild: Child name '/SSHE' is not unique
    MYSQL的三种注释
    Oracle19c 单节点ASM 存储模式数据库实例搭建过程
    [专题]中立遭质疑,提价遭反对,ARM的生存难题怎么破?
    快速排序的理解
    chrome审查元素功能,web开发强大帮手
    MyEclipse Server view报错解决方法
    把Java程序打包成jar文件包并执行
  • 原文地址:https://www.cnblogs.com/shun7man/p/13308839.html
Copyright © 2011-2022 走看看