zoukankan      html  css  js  c++  java
  • spark双重key聚合

    1、双重key聚合解决数据倾斜:个别key数据量过大,导致数据倾斜。在key前加随机前缀,先进行一次预聚合,然后再将随机前缀去掉,再进行一次聚合。

    // 注册自定义函数
            spark.udf().register("concat_String_string", new ConcatStringStringUDF(), DataTypes.StringType);
            spark.udf().register("random_prefix", new RandomPrefixUDF(), DataTypes.StringType);
            spark.udf().register("remove_random_prefix", new RemoveRandomPrefixUDF(), DataTypes.StringType);
    
            spark.udf().register("group_concat_distinct",new GroupConcatDistinctUDAF());
    package com.bjsxt.spark.areaRoadFlow;
    
    import java.util.Random;
    
    import org.apache.spark.sql.api.java.UDF2;
    
    public class RandomPrefixUDF implements UDF2<String, Integer, String>{
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
    
        @Override
        public String call(String area_name_road_id, Integer ranNum) throws Exception {
            Random random = new Random();
            int prefix = random.nextInt(ranNum);
            return prefix+"_"+area_name_road_id;
        }
    
    }
    private static void generateTempAreaRoadFlowTable(SparkSession spark) {
            /**
             *     structFields.add(DataTypes.createStructField("area_id", DataTypes.StringType, true));  
             *    structFields.add(DataTypes.createStructField("area_name", DataTypes.StringType, true));
             *    structFields.add(DataTypes.createStructField("road_id", DataTypes.StringType, true));
             *    structFields.add(DataTypes.createStructField("monitor_id", DataTypes.StringType, true));  
             *    structFields.add(DataTypes.createStructField("car", DataTypes.StringType, true));  
             */
            String sql = 
                    "SELECT "
                        + "area_name,"
                        + "road_id,"
                        + "count(*) car_count,"
                        //group_concat_distinct 统计每一条道路中每一个卡扣下的车流量
                        + "group_concat_distinct(monitor_id) monitor_infos "//0001=20|0002=30
                    + "FROM tmp_car_flow_basic "
                    + "GROUP BY area_name,road_id";
            /**
             * 下面是当遇到区域下某个道路车辆特别多的时候,会有数据倾斜,怎么处理?random
             */
             String sqlText = ""
                    + "SELECT "
                        + "area_name_road_id,"
                        + "sum(car_count),"
                        + "group_concat_distinct(monitor_infos) monitor_infoss "
                    + "FROM ("
                        + "SELECT "
                            + "remove_random_prefix(prefix_area_name_road_id) area_name_road_id,"
                            + "car_count,"
                            + "monitor_infos "
                        + "FROM ("
                            + "SELECT "
                                + "prefix_area_name_road_id,"//1_鼓楼区:49
                                + "count(*) car_count,"
                                + "group_concat_distinct(monitor_id) monitor_infos "
                            + "FROM ("
                                + "SELECT "
                                + "monitor_id,"
                                + "car,"
                                + "random_prefix(concat_String_string(area_name,road_id,':'),10) prefix_area_name_road_id "
                                + "FROM tmp_car_flow_basic "
                            + ") t1 "
                            + "GROUP BY prefix_area_name_road_id "
                        + ") t2 "
                    + ") t3 "
                    + "GROUP BY area_name_road_id";
    
    
            Dataset<Row> df = spark.sql(sql);
        
            df.registerTempTable("tmp_area_road_flow_count"); 
        }
  • 相关阅读:
    【SpringBoot】 理解SpringBoot的启动原理
    【SpringBoot】SpringBoot的基础,全面理解bean的生命周期
    【转】 Linux 命令解释(Linux基础二)
    【转】 Linux 的目录详解 (Linux基础一)
    【SpringBoot】 一种解决接口返回慢的方式
    【Jmeter基础】 Linux上运行Jmeter
    【SpingBoot】 测试如何使用SpringBoot搭建一个简单后台1
    单例模式@Singleton在测试中的运用
    性能测试基础 ---TCP通信过程的状态码与过程,以及出现错误码的分析(TIME_WAIT,CLOSE_WAIT)
    Spring5源码分析(007)——IoC篇之加载BeanDefinition总览
  • 原文地址:https://www.cnblogs.com/guoyu1/p/12305221.html
Copyright © 2011-2022 走看看