2.1 函数定义
2.2 函数注册
2.3 示例
3.1 JSON
3.2 JDBC
3.3 ParQuet
3.4 CSV
正文
一,简介
很多时候sql中的内置函数无法满足我们的日常开发需求,这就需要我们进行函数的自定义。同时Spark的数据源来源广泛,如JSON,MYSQL等都可以作为我们的数据源。
二,SparkSQL 的函数自定义
2.1 定义函数
val fun1 = (arg: String) => { arg + "aaa" }
2.2 注册函数
spark.udf.register("addString", fun1)
2.3 示例
package cn.edu360.spark07 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.types._ object AutoFun { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("sparkDateSet1") .master("local[2]") .getOrCreate() val lines: Dataset[String] = spark.read.textFile("hdfs://hd1:9000/wordcount/input/") import spark.implicits._ val words: Dataset[String] = lines.flatMap(_.split(" ")) // 注册视图操作SQL形式 words.createTempView("v_wc") // 定义函数 val fun1 = (arg: String) => { arg + "aaa" } // 对函数进行注册 spark.udf.register("addString", fun1) val result: DataFrame = spark.sql("select addString(value), count(*) from v_wc group by value") result.show() spark.stop() } }
三,spark的数据源读取
3.1 JSON
object JsonDataSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("JdbcDataSource") .master("local[*]") .getOrCreate() import spark.implicits._ //指定以后读取json类型的数据(有表头) val jsons: DataFrame = spark.read.json("/Users/zx/Desktop/json") val filtered: DataFrame = jsons.where($"age" <=500) filtered.printSchema() filtered.show() spark.stop() } }
3.2 JDBC
package cn.edu360.spark07 import org.apache.spark.sql.{DataFrame, SparkSession} object JDBCSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("JdbcDataSource") .master("local[*]") .getOrCreate() import spark.implicits._ val log: DataFrame = spark.read.format("jdbc").options( Map("url" -> "jdbc:mysql://localhost:3306/test?useSSL=true", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "log", "user" -> "root", "password" -> "qwe123" ) ).load() val result: DataFrame = log.select($"id", $"name", $"age") result.show() spark.stop() } }
3.3 ParQuet
package cn.edu360.day7 import org.apache.spark.sql.{DataFrame, SparkSession} /** * Created by zx on 2017/9/18. */ object ParquetDataSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("ParquetDataSource") .master("local[*]") .getOrCreate() //指定以后读取json类型的数据 val parquetLine: DataFrame = spark.read.parquet("/Users/zx/Desktop/parquet") //val parquetLine: DataFrame = spark.read.format("parquet").load("/Users/zx/Desktop/pq") parquetLine.printSchema() //show是Action parquetLine.show() spark.stop() } }
3.4 CSV
package cn.edu360.day7 import org.apache.spark.sql.{DataFrame, SparkSession} /** * Created by zx on 2017/9/18. */ object CsvDataSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("CsvDataSource") .master("local[*]") .getOrCreate() //指定以后读取json类型的数据 val csv: DataFrame = spark.read.csv("/Users/zx/Desktop/csv") csv.printSchema() val pdf: DataFrame = csv.toDF("id", "name", "age") pdf.show() spark.stop() } }