IDEA创建SparkSQL程序
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.创建DataFrame
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency>
package com.yinzhengjie.bigdata.spark.sql import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.SparkConf object SparkSQLDemo { def main(args: Array[String]): Unit = { //创建spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo") //创建SparkSQL的环境对象,即创建SparkSession val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //读取json文件,构建DataFrame对象 val frame:DataFrame = spark.read.json("E:\yinzhengjie\bigdata\input\json\user.json") //展示数据 frame.show() //释放资源 spark.close() } }
二.采用SQL的语法访问数据
package com.yinzhengjie.bigdata.spark.sql import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} object SparkSQLDemo2 { def main(args: Array[String]): Unit = { //创建spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo2") //创建SparkSQL的环境对象,即创建SparkSession val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //读取json文件,构建DataFrame对象 val frame:DataFrame = spark.read.json("E:\yinzhengjie\bigdata\input\json\user.json") //创建一张临时视图 frame.createTempView("user") //展示数据 // frame.show() spark.sql("select * from user").show() //采用SQL的语法访问数据 //释放资源 spark.close() } }
三.RDD,DataFrame和DataSet相互转换案例
package com.yinzhengjie.bigdata.spark.sql import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * 定义样例类 */ case class User(id:Int,name:String,age:Int) object SparkSQLDemo3 { def main(args: Array[String]): Unit = { //创建spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo3") //创建SparkSQL的环境对象,即创建SparkSession val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() /** * 温馨提示: * 进行转换之前,需要引入隐式转换规则,这里的spark不是包名的含义,而是SparkSession对象的名字哟~ */ import spark.implicits._ //创建RDD val listRDD:RDD[(Int,String,Int)] = spark.sparkContext.parallelize(List((1,"YinZhengjie",18),(2,"Jason Yin",28),(3,"Danny",27))) //转换为DataFrame val df:DataFrame = listRDD.toDF("Id","Name","Age") //将DataFrame转换为DataSet val ds:Dataset[User] = df.as[User] //将DataSet转换为DataFrame val df1:DataFrame = ds.toDF() //将DataFrame转换为RDD val rdd1:RDD[Row] = df1.rdd //遍历RDD,获取数据时,可以通过索引访问数据 rdd1.foreach(row =>{ println(row.getString(1)) }) //为listRDD手动添加类型 val userRDD:RDD[User] = listRDD.map { case (id, name, age) => { User(id, name, age) } } //将RDD直接转换为DataSet val ds2:Dataset[User] = userRDD.toDS() //将DataSet直接转换为RDD val rdd2:RDD[User] = ds2.rdd //遍历rdd2 rdd2.foreach(println) //释放资源 spark.close() } }