zoukankan      html  css  js  c++  java
  • Spark SQL入门到实战之(8)数据倾斜优化

    1.自定义UDF

    1、依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.example</groupId>
        <artifactId>FlinkUdf</artifactId>
        <version>1.0-SNAPSHOT</version>
        <name>test</name>
        <!-- FIXME change it to the project's website -->
        <url>http://www.example.com</url>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.7</maven.compiler.source>
            <maven.compiler.target>1.7</maven.compiler.target>
            <flink.version>1.11.1</flink.version>
            <scala.binary.version>2.11</scala.binary.version>
            <scala.version>2.11.0</scala.version>
            <hadoop.version>3.0.0</hadoop.version>
            <hive.version>3.0.0</hive.version>
            <hbase.version>2.3.0</hbase.version>
            <spark.version>3.0.0</spark.version>
            <jedis.version>3.0.0</jedis.version>
        </properties>
        <dependencies>
            <!--        0、基本语言-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <!--        1、Flink modules-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>scala-library</artifactId>
                        <groupId>org.scala-lang</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- 2、CLI dependencies -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--        3、alibaba的json依赖-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.58</version>
                <exclusions>
                    <exclusion>
                        <artifactId>javassist</artifactId>
                        <groupId>org.javassist</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>scala-parser-combinators_2.11</artifactId>
                        <groupId>org.scala-lang.modules</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>snappy-java</artifactId>
                        <groupId>org.xerial.snappy</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--        4、kafka依赖-->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.11.0.3</version>
                <exclusions>
                    <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>kafka-clients</artifactId>
                        <groupId>org.apache.kafka</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!--        5、数据库依赖-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--        ①、excel依赖包-->
            <dependency>
                <groupId>org.apache.poi</groupId>
                <artifactId>poi</artifactId>
                <version>3.14</version>
            </dependency>
            <dependency>
                <groupId>org.apache.poi</groupId>
                <artifactId>poi-ooxml</artifactId>
                <version>3.14</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-json</artifactId>
                <version>1.10.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-hbase_2.12</artifactId>
                <version>1.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_2.12</artifactId>
                <version>1.10.2</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.37</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.1.5</version>
                <exclusions>
                    <exclusion>
                        <artifactId>force-shading</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.9.5</version>
            </dependency>
            <dependency>
                <groupId>io.lettuce</groupId>
                <artifactId>lettuce-core</artifactId>
                <version>5.0.5.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.4.Final</version>
            </dependency>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>${jedis.version}</version>
            </dependency>
            <!-- Add connector dependencies here. They must be in the default scope (compile). -->
            <!-- Example:
    
            <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
            -->
            <!-- Add logging framework, to produce console output when running in the IDE. -->
            <!-- These dependencies are excluded from the application JAR by default. -->
            <!--        5、log日志依赖-->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.7</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>net.sf.json-lib</groupId>
                <artifactId>json-lib</artifactId>
                <version>2.4</version>
                <classifier>jdk15</classifier>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
            <!--        6、离线数仓hive依赖-->
            <!--        ①、hadoop依赖-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <!--        ②、hive依赖-->
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <version>${hive.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-exec</artifactId>
                <version>${hive.version}</version>
            </dependency>
            <!--        ③、hbase依赖-->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <!--        7、spark依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.12</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>
        </dependencies>
        <repositories>
            <!--    <repository>
                  <id>nexus-aliyun</id>
                  <name>Nexus aliyun</name>
                  <layout>default</layout>
                  <url>http://maven.aliyun.com/nexus/content/groups/public</url>
                  <snapshots>
                    <enabled>false</enabled>
                  </snapshots>
                  <releases>
                    <enabled>true</enabled>
                  </releases>
                </repository>-->
            <repository>
                <id>apache.snapshots</id>
                <name>Apache Development Snapshot Repository</name>
                <url>https://repository.apache.org/content/repositories/snapshots/</url>
                <releases>
                    <enabled>false</enabled>
                </releases>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
            </repository>
        </repositories>
    
        <build>
            <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
                <plugins>
                    <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
                    <plugin>
                        <artifactId>maven-clean-plugin</artifactId>
                        <version>3.1.0</version>
                    </plugin>
                    <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
                    <plugin>
                        <artifactId>maven-resources-plugin</artifactId>
                        <version>3.0.2</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.8.0</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-surefire-plugin</artifactId>
                        <version>2.22.1</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-jar-plugin</artifactId>
                        <version>3.0.2</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-install-plugin</artifactId>
                        <version>2.5.2</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-deploy-plugin</artifactId>
                        <version>2.8.2</version>
                    </plugin>
                    <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
                    <plugin>
                        <artifactId>maven-site-plugin</artifactId>
                        <version>3.7.1</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-project-info-reports-plugin</artifactId>
                        <version>3.0.0</version>
                    </plugin>
                </plugins>
            </pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>8</source>
                        <target>8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>

    2、添加随机前缀

    package Spark.Udf;
    import org.apache.spark.sql.api.java.UDF2;
    import java.util.Random;
    
    /**
     * @program: FlinkUdf
     * @description: 给字段添加随机前缀
     * @author: BigCodeKing
     * @create: 2021-02-01 17:48
     **/
    public class RandomPrefixUDF implements UDF2<String, Integer, String> {
        private static final long serialVersionUID = 1L;
        @Override
        public String call(String val, Integer num) throws Exception {
            Random random=new Random();
            int randNum = random.nextInt(num);
            return randNum+"_"+val;
        }
    }

    3、去除随机前缀

    package Spark.Udf;
    
    import org.apache.spark.sql.api.java.UDF1;
    
    /**
     * @program: FlinkUdf
     * @description: 去除随机前缀
     * @author: BigCodeKing
     * @create: 2021-02-01 17:51
     **/
    public class RemoveRandomPrefixUDF implements UDF1<String,String> {
        private static final long serialVersionUID = 1L;
        @Override
        public String call(String val) throws Exception {
            String[] split = val.split("_");
            return split[1];
        }
    }

    2.数据流程

    不使用随机前缀的流程

    A 1
    A 1
    A 1
    A 1
    B 1
    
    结果:
    
    A 4
    B 1

    使用随机前缀的流程

    A 1
    A 1
    A 1
    A 1
    B 1
    
    --加随机前缀
    
    0_A 1
    0_A 1
    1_A 1
    1_A 1
    0_B 1
    
    --第一次GroupBy
    
    0_A 2
    1_A 2
    0_B 1
    
    --去掉随机前缀
    
    A 2
    A 2
    B 1
    
    --第二次GroupBy
    
    A 4
    B 1

    3.Spark程序

    /**
      * 通过StructType直接指定Schema,转换为DataFrame
      */
    object TestUDF {
      def main(args: Array[String]): Unit = {
        val spark =
          SparkSession.builder()
            .appName("TestUDF")
            .master("local")
            .getOrCreate()
    
        val sc = spark.sparkContext
        sc.setLogLevel("WARN")
    
        spark.udf.register("random_prefix", new RandomPrefixUDF(), DataTypes.StringType)
        spark.udf.register("remove_random_prefix", new RemoveRandomPrefixUDF(), DataTypes.StringType)
    
        val personRDD =
          sc.parallelize(List("A", "A", "A", "A", "B"), 1)
            .map(x => (x, 1))
            .map(x => Row(x._1, x._2.toInt))
    
        // 创建Schema
        val schema: StructType = StructType(Seq(
          StructField("product", StringType, false),
          StructField("click", IntegerType, false)
        ))
    
        val personDF = spark.createDataFrame(personRDD, schema)
    
        //SQL语法操作
        personDF.createOrReplaceTempView("t_product_click")
    
        // 加随机前缀
        val sql1 =
          s"""
             |select
             |  random_prefix(product, 2) product,
             |  click
             |from
             |  t_product_click
           """.stripMargin
    
        // 分组求和
        val sql2 =
          s"""
             |select
             |  product,
             |  sum(click) click
             |from
             |  (
             |    select
             |      random_prefix(product, 2) product,
             |      click
             |    from
             |      t_product_click
             |  ) t1
             |group by
             |  product
           """.stripMargin
    
        // 去掉随机前缀
        val sql3 =
          s"""
             |select
             |  remove_random_prefix(product) product,
             |  click
             |from
             |  (
             |    select
             |      product,
             |      sum(click) click
             |    from
             |      (
             |        select
             |          random_prefix(product, 2) product,
             |          click
             |        from
             |          t_product_click
             |      ) t1
             |    group by
             |      product
             |  ) t2
             |
           """.stripMargin
    
        // 分组求和
        val sql4 =
          s"""
             |select
             |  product,
             |  sum(click) click
             |from
             |  (
             |    select
             |      remove_random_prefix(product) product,
             |      click
             |    from
             |      (
             |        select
             |          product,
             |          sum(click) click
             |        from
             |          (
             |            select
             |              random_prefix(product, 2) product,
             |              click
             |            from
             |              t_product_click
             |          ) t1
             |        group by
             |          product
             |      ) t2
             |  ) t3
             |group by
             |  product
           """.stripMargin
    
        spark.sql(sql1).show()
        spark.sql(sql2).show()
        spark.sql(sql3).show()
        spark.sql(sql4).show()
        sc.stop()
      }
    }

    4、sparksql程序

    select
    product,
    sum(click) click
    from
    (
    select
    remove_random_prefix(product) product,
    click
    from
    (
    select
    product,
    sum(click) click
    from
    (
    select
    random_prefix(product, 2) product,
    click
    from t_product_click
    ) t1
    group by
    product
    ) t2
    ) t3
    group by
    product

    执行结果:

    +-------+-----+
    |product|click|
    +-------+-----+
    |    0_A|    1|
    |    1_A|    1|
    |    0_A|    1|
    |    0_A|    1|
    |    1_B|    1|
    +-------+-----+
    
    +-------+-----+
    |product|click|
    +-------+-----+
    |    1_A|    3|
    |    1_B|    1|
    |    0_A|    1|
    +-------+-----+
    
    +-------+-----+
    |product|click|
    +-------+-----+
    |      A|    1|
    |      B|    1|
    |      A|    3|
    +-------+-----+
    
    +-------+-----+
    |product|click|
    +-------+-----+
    |      B|    1|
    |      A|    4|
    +-------+-----+

     

    作者:大码王

    -------------------------------------------

    个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!

    如果觉得这篇文章对你有小小的帮助的话,记得在右下角点个“推荐”哦,博主在此感谢!

    万水千山总是情,打赏一分行不行,所以如果你心情还比较高兴,也是可以扫码打赏博主,哈哈哈(っ•?ω•?)っ???!

  • 相关阅读:
    Cookie和Session的区别
    CSRF攻击与防御(写得非常好)
    AcWing397 逃不掉的路(边双)
    CF1345D Monopole Magnets(构造)
    AcWing1175 最大半连通子图(tarjan)
    西安邮电大学第五届ACM-ICPC校赛 C题 异或生成树(树形dp)
    AcWing368 银河(差分约束)
    AcWing401 从u到v还是从v到u? (tarjan)
    牛客 位数差(二分)
    AcWing367 学校网络(tarjan)
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/14358163.html
Copyright © 2011-2022 走看看