zoukankan      html  css  js  c++  java
  • spark性能调优04-算子调优

    1、使用MapPartitions代替map

      1.1 为什么要死使用MapPartitions代替map

        普通的map,每条数据都会传入function中进行计算一次;而是用MapPartitions时,function会一次接受所有partition的数据出入到function中计算一次,性能较高。

        但是如果内存不足时,使用MapPartitions,一次将所有的partition数据传入,可能会发生OOM异常

      1.2 如何使用

        有map的操作的地方,都可以使用MapPartitions进行替换

            /**
             * 使用mapPartitionsToPair代替mapToPair
             */
            JavaPairRDD<String, Row> sessionRowPairRdd =dateRangeRdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, String, Row>() {
    
                private static final long serialVersionUID = 1L;
    
                public Iterable<Tuple2<String, Row>> call(Iterator<Row> rows) throws Exception {
                    List<Tuple2<String, Row>> list=new ArrayList<Tuple2<String, Row>>();
                    while (rows.hasNext()) {
                        Row row=rows.next();
                        list.add(new Tuple2<String, Row>(row.getString(2), row));
                    }
                    return list;
                }
            });
            
            /*JavaPairRDD<String, Row> sessionRowPairRdd = dateRangeRdd
                    .mapToPair(new PairFunction<Row, String, Row>() {
    
                        private static final long serialVersionUID = 1L;
                        // 先将数据映射为<sessionId,row>
                        public Tuple2<String, Row> call(Row row) throws Exception {
                            return new Tuple2<String, Row>(row.getString(2), row);
                        }
                    });*/    

    2、使用coalesce对过滤后的Rdd进行重新分区和压缩

      2.1 为什么使用coalesce

        默认情况下,经过过滤后的数据的分区数和原分区数是一样的,这就导致过滤后各个分区中的数据可能差距很大,在之后的操作中造成数据倾斜

        使用coalesce可以使过滤后的Rdd的分区数减少,并让每个分区中的数据趋于平等

      2.2 如何使用   

           //过滤符合要求的ClickCategoryIdRow    
        filteredSessionRdd
    .filter(new Function<Tuple2<String,Row>, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Tuple2<String, Row> tuple2) throws Exception { return (Long.valueOf(tuple2._2.getLong(6))!=null)?true:false; } }) //使用coalesce将过滤后的数据重新分区和压缩,时新的分区中的数据大致相等 .coalesce(100)

    3、使用foreachPartition替代foreach

      3.1 为什么使用foreachPartition

        默认使用的foreach,每条数据都会传入function进行计算;如果操作数据库,每条数据都会获取一个数据库连接并发送sql进行保存,消耗资源比较大,性能低。

        使用foreachPartition,会把所用partition的数据一次出入function,只需要获取一次数据库连接,性能高。

      3.2 如何使用

            /**
             * 使用foreachPartition替代foreach
             */
            sessionRdd.join(sessionRowPairRdd).foreachPartition(new VoidFunction<Iterator<Tuple2<String,Tuple2<String,Row>>>>() {
                private static final long serialVersionUID = 1L;
                public void call(Iterator<Tuple2<String, Tuple2<String, Row>>> iterator)
                        throws Exception {
                    List<SessionDetail> sessionDetails=new ArrayList<SessionDetail>();
                    if (iterator.hasNext()) {
                        Tuple2<String, Tuple2<String, Row>> tuple2=iterator.next();
                        String sessionId=tuple2._1;
                        Row row=tuple2._2._2;
                        SessionDetail sessionDetail=new SessionDetail();
                        sessionDetail.setSessionId(sessionId);
                        sessionDetail.setTaskId((int)taskId);
                        sessionDetail.setUserId((int)row.getLong(1));
                        sessionDetails.add(sessionDetail);
                    }
                    DaoFactory.getSessionDetailDao().batchInsertSessionDao(sessionDetails);
                }
            });
            
           /* sessionRdd.join(sessionRowPairRdd).foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {
                private static final long serialVersionUID = 1L;
                public void call(Tuple2<String, Tuple2<String, Row>> tuple2) throws Exception {
                    String sessionId=tuple2._1;
                    Row row=tuple2._2._2;
                    SessionDetail sessionDetail=new SessionDetail();
                    sessionDetail.setSessionId(sessionId);
                    sessionDetail.setTaskId((int)taskId);
                    sessionDetail.setUserId((int)row.getLong(1));
                  DaoFactory.getSessionDetailDao().insertSessionDao(sessionDetail);
                }
            });*/   

     4、使用repartition进行调整并行度

      4.1 为什么要使用repartition

        spark.default.parallelism设置的并行度只能对没有Spark SQL(DataFrame)的阶段有用,对Spark SQL的并行度是无法设置的,该并行度是通过hdfs文件所在的block块决定的。

        可以通过repartition调整之后的并行度

      4.2 如何使用 

    sqlContext.sql("select * from user_visit_action where date >= '" + startDate + "' and date <= '" + endDate + "'").javaRDD()
        //使用repartition调整并行度
        .repartition(100)

     5、使用reduceByKey进行本地聚合

      5.1 reduceByKey有哪些优点

        reduceByKey相对于普通的shuffle操作(如groupByKey)的一个最大的优点,会进行map端的本地聚合,从而减少文件的输出,减少磁盘IO,网络传输,内存占比以及reduce端的聚合操作数据。

      5.2 使用场景

        只有是针对每个不同的key进行相应的操作都可以使用reduceByKey进行处理

  • 相关阅读:
    css word-wrap与word-break区别
    input输入框光标位置问题
    正则表达式(二)- 位置匹配攻略
    正则表达式(一)- 字符匹配攻略
    mac电脑重启nginx报错nginx: [error] invalid PID number "" in "/usr/local/var/run/nginx.pid"
    指定js文件不使用 eslint 语法检查
    管理github/gitlab生成多个ssh key
    前端切图两种方法整理
    梳理:移动端Viewport的知识
    切图 — Photoshop(转载)
  • 原文地址:https://www.cnblogs.com/lifeone/p/6439130.html
Copyright © 2011-2022 走看看