一、DataFrame创建
1、通过case class方式创建
people.txt
//定义case class,相当于表结构 case class People(var name:String,var age:Int) object TestDataFrame1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local") val sc = new SparkContext(conf) val context = new SQLContext(sc) // 将本地的数据读入 RDD, 并将 RDD 与 case class 关联 val peopleRDD = sc.textFile("E:\666\people.txt") .map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt)) import context.implicits._ // 将RDD 转换成 DataFrames val df = peopleRDD.toDF //将DataFrames创建成一个临时的视图 df.createOrReplaceTempView("people") //使用SQL语句进行查询 context.sql("select * from people").show() } }
2、通过structType方式创建
object TestDataFrame2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val fileRDD = sc.textFile("E:\666\people.txt") // 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row val rowRDD: RDD[Row] = fileRDD.map(line => { val fields = line.split(",") Row(fields(0), fields(1).trim.toInt) }) // 创建 StructType 来定义结构 val structType: StructType = StructType( //字段名,字段类型,是否可以为空 StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil ) /** * rows: java.util.List[Row], * schema: StructType * */ val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType) df.createOrReplaceTempView("people") sqlContext.sql("select * from people").show() } }
3、通过json文件创建 DataFrames
object TestDataFrame3 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df: DataFrame = sqlContext.read.json("E:\666\people.") df.createOrReplaceTempView("people") sqlContext.sql("select * from people").show() } }
二、DataFrame数据读取
object TestRead { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //方式一 val df1 = sqlContext.read.json("E:\666\people.json") val df2 = sqlContext.read.parquet("E:\666\users.parquet") //方式二 val df3 = sqlContext.read.format("json").load("E:\666\people.json") val df4 = sqlContext.read.format("parquet").load("E:\666\users.parquet") //方式三,默认是parquet格式 val df5 = sqlContext.load("E:\666\users.parquet") } }
三、数据保存
object TestSave { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df1 = sqlContext.read.json("E:\666\people.json") //方式一 df1.write.json("E:\111") df1.write.parquet("E:\222") //方式二 df1.write.format("json").save("E:\333") df1.write.format("parquet").save("E:\444") //方式三 df1.write.save("E:\555") } }
数据保存模式:
df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\444")
四、DataFrame数据源
1、json
见上
2、parquet
见上
3、mysql
object TestMysql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestMysql").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val url = "jdbc:mysql://192.168.123.102:3306/hivedb" val table = "dbs" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","root") //需要传入Mysql的URL、表明、properties(连接数据库的用户名密码) val df = sqlContext.read.jdbc(url,table,properties) df.createOrReplaceTempView("dbs") sqlContext.sql("select * from dbs").show() } }
4、hive
在pom.xml文件中添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.3.0</version> </dependency>
开发环境则把resource文件夹下添加hive-site.xml文件,集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下
<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value> <description>JDBC connect string for a JDBC metastore</description> <!-- 如果 mysql 和 hive 在同一个服务器节点,那么请更改 hadoop02 为 localhost --> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>root</value> <description>password to use against metastore database</description> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/hive/warehouse</value> <description>hive default warehouse, if nessecory, change it</description> </property> </configuration>
代码:
object TestHive { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) sqlContext.sql("select * from myhive.student").show() } }
总结:
DataFrame创建方式应该按照数据源进行划分,数据源是:普通的txt文件、json/parquet文件、mysql数据库、hive数据仓库
1、普通txt文件:
(1)case class 创建
(2)structType 创建
2、json/parquet文件:
直接读取
3、mysql数据库:创建连接读取
4、hive数据仓库:创建连接读取