1.自定义UDF
1、依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>FlinkUdf</artifactId> <version>1.0-SNAPSHOT</version> <name>test</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <flink.version>1.11.1</flink.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.0</scala.version> <hadoop.version>3.0.0</hadoop.version> <hive.version>3.0.0</hive.version> <hbase.version>2.3.0</hbase.version> <spark.version>3.0.0</spark.version> <jedis.version>3.0.0</jedis.version> </properties> <dependencies> <!-- 0、基本语言--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- 1、Flink modules--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>scala-library</artifactId> <groupId>org.scala-lang</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 2、CLI dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 3、alibaba的json依赖--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> <exclusions> <exclusion> <artifactId>javassist</artifactId> <groupId>org.javassist</groupId> </exclusion> <exclusion> <artifactId>scala-parser-combinators_2.11</artifactId> <groupId>org.scala-lang.modules</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>snappy-java</artifactId> <groupId>org.xerial.snappy</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <!-- 4、kafka依赖--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.3</version> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <!-- 5、数据库依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <!-- ①、excel依赖包--> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId> <version>3.14</version> </dependency> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>3.14</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.12</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>1.10.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.37</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.5</version> <exclusions> <exclusion> <artifactId>force-shading</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.5</version> </dependency> <dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.4.Final</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${jedis.version}</version> </dependency> <!-- Add connector dependencies here. They must be in the default scope (compile). --> <!-- Example: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency> --> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <!-- 5、log日志依赖--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <!-- 6、离线数仓hive依赖--> <!-- ①、hadoop依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <!-- ②、hive依赖--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>${hive.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency> <!-- ③、hbase依赖--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <!-- 7、spark依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <repositories> <!-- <repository> <id>nexus-aliyun</id> <name>Nexus aliyun</name> <layout>default</layout> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <snapshots> <enabled>false</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository>--> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <build> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> <plugin> <artifactId>maven-site-plugin</artifactId> <version>3.7.1</version> </plugin> <plugin> <artifactId>maven-project-info-reports-plugin</artifactId> <version>3.0.0</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
2、添加随机前缀
package Spark.Udf; import org.apache.spark.sql.api.java.UDF2; import java.util.Random; /** * @program: FlinkUdf * @description: 给字段添加随机前缀 * @author: BigCodeKing * @create: 2021-02-01 17:48 **/ public class RandomPrefixUDF implements UDF2<String, Integer, String> { private static final long serialVersionUID = 1L; @Override public String call(String val, Integer num) throws Exception { Random random=new Random(); int randNum = random.nextInt(num); return randNum+"_"+val; } }
3、去除随机前缀
package Spark.Udf; import org.apache.spark.sql.api.java.UDF1; /** * @program: FlinkUdf * @description: 去除随机前缀 * @author: BigCodeKing * @create: 2021-02-01 17:51 **/ public class RemoveRandomPrefixUDF implements UDF1<String,String> { private static final long serialVersionUID = 1L; @Override public String call(String val) throws Exception { String[] split = val.split("_"); return split[1]; } }
2.数据流程
不使用随机前缀的流程
A 1
A 1
A 1
A 1
B 1
结果:
A 4
B 1
使用随机前缀的流程
A 1
A 1
A 1
A 1
B 1
--加随机前缀
0_A 1
0_A 1
1_A 1
1_A 1
0_B 1
--第一次GroupBy
0_A 2
1_A 2
0_B 1
--去掉随机前缀
A 2
A 2
B 1
--第二次GroupBy
A 4
B 1
3.Spark程序
/** * 通过StructType直接指定Schema,转换为DataFrame */ object TestUDF { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("TestUDF") .master("local") .getOrCreate() val sc = spark.sparkContext sc.setLogLevel("WARN") spark.udf.register("random_prefix", new RandomPrefixUDF(), DataTypes.StringType) spark.udf.register("remove_random_prefix", new RemoveRandomPrefixUDF(), DataTypes.StringType) val personRDD = sc.parallelize(List("A", "A", "A", "A", "B"), 1) .map(x => (x, 1)) .map(x => Row(x._1, x._2.toInt)) // 创建Schema val schema: StructType = StructType(Seq( StructField("product", StringType, false), StructField("click", IntegerType, false) )) val personDF = spark.createDataFrame(personRDD, schema) //SQL语法操作 personDF.createOrReplaceTempView("t_product_click") // 加随机前缀 val sql1 = s""" |select | random_prefix(product, 2) product, | click |from | t_product_click """.stripMargin // 分组求和 val sql2 = s""" |select | product, | sum(click) click |from | ( | select | random_prefix(product, 2) product, | click | from | t_product_click | ) t1 |group by | product """.stripMargin // 去掉随机前缀 val sql3 = s""" |select | remove_random_prefix(product) product, | click |from | ( | select | product, | sum(click) click | from | ( | select | random_prefix(product, 2) product, | click | from | t_product_click | ) t1 | group by | product | ) t2 | """.stripMargin // 分组求和 val sql4 = s""" |select | product, | sum(click) click |from | ( | select | remove_random_prefix(product) product, | click | from | ( | select | product, | sum(click) click | from | ( | select | random_prefix(product, 2) product, | click | from | t_product_click | ) t1 | group by | product | ) t2 | ) t3 |group by | product """.stripMargin spark.sql(sql1).show() spark.sql(sql2).show() spark.sql(sql3).show() spark.sql(sql4).show() sc.stop() } }
4、sparksql程序
select product, sum(click) click from ( select remove_random_prefix(product) product, click from ( select product, sum(click) click from ( select random_prefix(product, 2) product, click from t_product_click ) t1 group by product ) t2 ) t3 group by product
执行结果:
+-------+-----+ |product|click| +-------+-----+ | 0_A| 1| | 1_A| 1| | 0_A| 1| | 0_A| 1| | 1_B| 1| +-------+-----+ +-------+-----+ |product|click| +-------+-----+ | 1_A| 3| | 1_B| 1| | 0_A| 1| +-------+-----+ +-------+-----+ |product|click| +-------+-----+ | A| 1| | B| 1| | A| 3| +-------+-----+ +-------+-----+ |product|click| +-------+-----+ | B| 1| | A| 4| +-------+-----+