目录
Spark-Dataframe创建-读取json文件
jsData.js 数据
{"name":"json","age":23,"hobby":"running"}
{"name":"charles","age":32,"hobby":"basketball"}
{"name":"tom","age":28,"hobby":"football"}
{"name":"lili","age":24,"hobby":"running"}
{"name":"bob","age":20,"hobby":"swimming"}
// 创建spark对象
val spark = SparkSession.builder().getOrCreate()
// 读取 js文件来创建 DF
val df = spark.read.json("D:/jsData.js")
// 查看信息
df.show()
// 查看表结构
df.printSchema()
// 选择多列 ,并且对age+1
df.select(df("name"), df("age") + 1).show()
// 将name列重命名为username
df.select(df("name").as("username"), df("age")).show()
// 筛选age大于25的数据
df.filter(df("age") > 25).show()
// 分组统计age数量
df.groupBy("age").count().show()
// 降序排序age
df.sort(df("age").desc).show()
// 多列排序
df.sort(df("age").desc, df("name").desc).show()
+---+----------+-------+
|age| hobby| name|
+---+----------+-------+
| 23| running| json|
| 32|basketball|charles|
| 28| football| tom|
| 24| running| lili|
| 20| swimming| bob|
+---+----------+-------+
root
|-- age: long (nullable = true)
|-- hobby: string (nullable = true)
|-- name: string (nullable = true)
+-------+---------+
| name|(age + 1)|
+-------+---------+
| json| 24|
|charles| 33|
| tom| 29|
| lili| 25|
| bob| 21|
+-------+---------+
+--------+---+
|username|age|
+--------+---+
| json| 23|
| charles| 32|
| tom| 28|
| lili| 24|
| bob| 20|
+--------+---+
+---+----------+-------+
|age| hobby| name|
+---+----------+-------+
| 32|basketball|charles|
| 28| football| tom|
+---+----------+-------+
+---+-----+
|age|count|
+---+-----+
| 32| 1|
| 28| 1|
| 23| 1|
| 20| 1|
| 24| 1|
+---+-----+
+---+----------+-------+
|age| hobby| name|
+---+----------+-------+
| 32|basketball|charles|
| 28| football| tom|
| 24| running| lili|
| 23| running| json|
| 20| swimming| bob|
+---+----------+-------+
+---+----------+-------+
|age| hobby| name|
+---+----------+-------+
| 32|basketball|charles|
| 28| football| tom|
| 24| running| lili|
| 23| running| json|
| 20| swimming| bob|
+---+----------+-------+
Spark-Dataframe创建-Rdd转Dataframe
数据
历史 4
java 2
C 8
C++ 1
python 3
PHP 5
import org.apache.spark.sql.{Row, SparkSession}
//创建 case class 用于包装每一行数据
case class Thing(name: String, num: Int)
// 创建 spark session 会话
val spark = SparkSession
.builder()
.appName("SparkSessionT")
.master("local[1]")
.getOrCreate()
// 包装每一行数据
val rowRdd = spark.sparkContext
.textFile("D:/data.txt")
.map(_.split(" "))
.map(words => Thing(words(0), words(1).trim.toInt)) //将数据包装入case class 里面
//导入 spark session 对象的隐式转换
import spark.implicits._
// 将包装后的数据转换成 Dataframe
val rddDF = rowRdd.toDF()
// 创建临时表
rddDF.createOrReplaceTempView("table")
// 查看所有数据
rddDF.show()
// 选择数据并保存为特定格式的数据
// rddDF.select("name","num").write.format("csv").save("D:/sava.csv")
//查看表结构
rddDF.printSchema()
//结束spark 会话进程
spark.stop()
+------+---+
| name|num|
+------+---+
| scala| 4|
| java| 2|
| C| 8|
| C++| 1|
|python| 3|
| PHP| 5|
+------+---+
root
|-- name: string (nullable = true)
|-- num: integer (nullable = false)
Spark-Dataframe创建-加载到SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
// 创建 spark session会话
val spark = SparkSession
.builder()
.appName("Dataframe")
.master("local")
.getOrCreate()
// 包装处理行数据
val rowRdd = spark.sparkContext
.textFile("D:/data.txt")
.map(_.split(" ")) // 分割每一列的数据
.map(words => Row(words(0), words(1))) // 包装每一行的数据为 Row
val fieldName = "name,num"
//定义列指定的结构
val fields = fieldName.split(",")
.map(words => StructField(words, StringType, true)) //设置字段结构
val columnFiled = StructType(fields) //创建列字段
spark.createDataFrame(rowRdd, columnFiled) //加载行rdd数据,列字段
.createOrReplaceTempView("thing") //创建 thing 表
// 使用SQL语句 查询数据
spark.sql("select * from thing").show()
//关闭 spark session 会话
spark.stop()
+------+---+
| name|num|
+------+---+
| scala| 4|
| java| 2|
| C| 8|
| C++| 1|
|python| 3|
| PHP| 5|
+------+---+
Spark-DataFrame数据读取和保存
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession}
val conf = new SparkConf()
.setAppName("hotel")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val spark = SparkSession
.builder()
.appName("SparkSessionT")
.master("local[1]")
.getOrCreate()
//读取 json 的数据
val df = spark.read.format("json").load("D:/jsData.js")
// 选取并保存为 csv 格式的数据
df.select("hobby", "name", "age").write
.format("csv").save("D:/js_to_csv") //csv 文件保存
df.write.parquet("D:/json_to_parquet.parquet") // parquest 文件保存
//读取 parquet 文件的信息
spark.read.parquet("D:/json_to_parquet.parquet")
.createOrReplaceTempView("parquetTable")
spark.sql("select name,age from parquetTable").show()
spark.sql("select * from parquetTable").foreach(row => println(row(0), row(1), row(2)))
// sc 可以一次性读取目录下的多个文件的内容组合起来
sc.textFile("D:/js_to_csv").foreach(println)
+-------+---+
| name|age|
+-------+---+
| json| 23|
|charles| 32|
| tom| 28|
| lili| 24|
| bob| 20|
+-------+---+
(23,running,json)
(32,basketball,charles)
(28,football,tom)
(24,running,lili)
(20,swimming,bob)
running,lili,24
swimming,bob,20
running,json,23
basketball,charles,32
football,tom,28
Spark-Dataframe-Mysql操作
import java.util.Properties
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
val spark = SparkSession
.builder()
.appName("SparkSessionT")
.master("local[1]")
.getOrCreate()
val jdbcDF = spark.read.format("jdbc") //利用jdbc读取MySQL数据库的数据
.option("url", "jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC") //连接的URL
.option("driver", "com.mysql.jdbc.Driver") //连接的驱动
.option("dbtable", "apps") //获取的 数据表
.option("user", "root") //用户名
.option("password", "123456") //密码
.load() //登录
// 查看 表的所有数据
jdbcDF.show()
//插入数据到MySQL(需要将数据转换为Dataframe)
val dataRdd = spark.sparkContext
.parallelize(Array("微信 https://wx.qq.com/ CN", "京东 https://www.jd.com/ CN")) // 处理插入的数据
.map(_.split(" "))
.map(words => Row(words(0), words(1), words(2))) //包装为行数据
//定义列字段和类型
val fields = StructType(List(StructField("app_name", StringType), StructField("url", StringType), StructField("country", StringType)))
//绑定 数据和字段创建 DF
val rddDF = spark.createDataFrame(dataRdd, fields)
//包装登录信息
val info = new Properties()
info.put("user", "root")
info.put("password", "123456")
info.put("driver", "com.mysql.jdbc.Driver")
//以追加的方式向 apps 表中插入数据
rddDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC", "testdb.apps", info)
// 查看 表的所有数据
jdbcDF.show()
+---+--------+--------------------+-------+
| id|app_name| url|country|
+---+--------+--------------------+-------+
| 1| QQ APP| http://im.qq.com/| CN|
| 2| 微博 APP| http://weibo.com/| CN|
| 3| 淘宝 APP|https://www.taoba...| CN|
+---+--------+--------------------+-------+
+---+--------+--------------------+-------+
| id|app_name| url|country|
+---+--------+--------------------+-------+
| 1| QQ APP| http://im.qq.com/| CN|
| 2| 微博 APP| http://weibo.com/| CN|
| 3| 淘宝 APP|https://www.taoba...| CN|
| 10| 微信| https://wx.qq.com/| CN|
| 11| 京东| https://www.jd.com/| CN|
+---+--------+--------------------+-------+