一、什么是DataSet
DataSet同RDD和DataFrame一样,也是Spark的一种弹性分布式数据集。它是Spark 1.6增加的新接口。我们可以从JVM的对象构造一个DataSet,然后使用map,flatMap,filter等等这样的函数式变换操作它。
二、创建DataSet
首先需要导入Spark Core、Spark SQL、Hadoop Client依赖包。pox.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.leboop</groupId>
<artifactId>mahout</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<!-- scala版本号 -->
<scala.version>2.11</scala.version>
<!-- spark版本号 -->
<spark.version>2.3.0</spark.version>
<!-- hadoop版本 -->
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</project>
1、Seq序列生成DataSet
package com.leboop.rdd
import org.apache.spark.sql.SparkSession
/**
* Created by LiuErBao on 2018/8/10.
*/
object DataSetDemo {
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
//创建Spark SQL的切入点(RDD的切入点是SparkContext)
val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
//使用toDS()函数需要导入隐士转换的包
import spark.implicits._
val caseClassDS = Seq(Person("Andy", 20), Person("Tom", 30)).toDS()
caseClassDS.show(3)
//创建DataSet
val primitiveDS = Seq(1, 2, 3).toDS()
//通过map操作生成新的DataSet
val newDS = primitiveDS.map(_ + 1)
//打印前三行
newDS.show(3)
}
}
程序运行结果
+----+---+
|name|age|
+----+---+
|Andy| 20|
| Tom| 30|
+----+---+
+-----+
|value|
+-----+
| 2|
| 3|
| 4|
+-----+
2、RDD转换成DataSet
itemdata.data数据文件格式如下:
0162381440670851711,4,7.0
0162381440670851711,11,4.0
0162381440670851711,32,1.0
0162381440670851711,176,27.0
0162381440670851711,183,11.0
0162381440670851711,184,5.0
0162381440670851711,207,9.0
0162381440670851711,256,3.0
0162381440670851711,258,4.0
0162381440670851711,259,16.0
0162381440670851711,260,8.0
0162381440670851711,261,18.0
package com.leboop.rdd
import org.apache.spark.sql.SparkSession
object DataSetDemo {
case class Data(user_id: String,item_id:String, score: Double)
def main(args: Array[String]): Unit = {
//创建Spark SQL的切入点(RDD的切入点是SparkContext)
val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
//创建RDD
val rdd = spark.sparkContext.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
//RDD转换成DataSet
import spark.implicits._
val splitRDD=rdd.map(x=>x.split(","))
val dataDS= splitRDD.map(x=>Data(x(0),x(1),x(2).toDouble)).toDS()
dataDS.show(3)
}
}
先定义了一个case class Data,存储转换后的数据。使用toDS(),需要导入隐士转换需要的包。
程序运行结果
+-------------------+-------+-----+
| user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711| 4| 7.0|
|0162381440670851711| 11| 4.0|
|0162381440670851711| 32| 1.0|
+-------------------+-------+-----+
only showing top 3 rows
3、DataFrame转换成DataSet
package com.leboop.rdd
import org.apache.spark.sql.SparkSession
/**
* Created by LiuErBao on 2018/8/10.
*/
object DataSetDemo {
case class Data(user_id: String,item_id:String, score: Double)
def main(args: Array[String]): Unit = {
//创建Spark SQL的切入点(RDD的切入点是SparkContext)
val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
//创建DataFrame
val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
import spark.implicits._
val dataDS= dataDF.map(x=>Data(x(0).toString,x(1).toString,x(2).toString.toDouble))
dataDS.show(2)
}
}
运行结果如下:
+-------------------+-------+-----+
| user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711| 4| 7.0|
|0162381440670851711| 11| 4.0|
+-------------------+-------+-----+
only showing top 2 rows
三、Spark SQL操作DataSet
步骤:
(1)创建DataSet数据集
(2)由DataSet创建临时视图或者全局视图
(3)DataSet创建sqlContext对象后执行SQL查询,或者spark直接执行SQL查询
程序如下:
package com.leboop.rdd
import org.apache.spark.sql.SparkSession
/**
* Created by LiuErBao on 2018/8/10.
*/
object DataSetDemo {
case class Data(user_id: String,item_id:String, score: Double)
def main(args: Array[String]): Unit = {
//创建Spark SQL的切入点(RDD的切入点是SparkContext)
val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
//创建RDD
val rdd = spark.sparkContext.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
//RDD转换成DataSet
import spark.implicits._
val splitRDD=rdd.map(x=>x.split(","))
val dataDS= splitRDD.map(x=>Data(x(0),x(1),x(2).toDouble)).toDS()
dataDS.createOrReplaceTempView("data")
dataDS.sqlContext.sql("select user_id,item_id from data").show(2)
spark.sql("select user_id,score from data").show(2)
}
}
程序运行结果
+-------------------+-------+
| user_id|item_id|
+-------------------+-------+
|0162381440670851711| 4|
|0162381440670851711| 11|
+-------------------+-------+
only showing top 2 rows
+-------------------+-----+
| user_id|score|
+-------------------+-----+
|0162381440670851711| 7.0|
|0162381440670851711| 4.0|
+-------------------+-----+
only showing top 2 rows