1、配置文件
package config import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} case object conf { private val master = "local[*]" val confs: SparkConf = new SparkConf().setMaster(master).setAppName("jobs") // val confs: SparkConf = new SparkConf().setMaster("http://laptop-2up1s8pr:4040/").setAppName("jobs") val sc = new SparkContext(confs) sc.setLogLevel("ERROR") val spark_session: SparkSession = SparkSession.builder() .appName("jobs").config(confs).getOrCreate() // 设置支持笛卡尔积 对于spark2.0来说 spark_session.conf.set("spark.sql.crossJoin.enabled",value = true) }
2、处理脚本
package sparkDataMange import config.conf.{sc, spark_session} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Column, DataFrame, Row, SaveMode} import config.conf.spark_session.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.DoubleType object irisDataFrame { def main(args: Array[String]): Unit = { val st: Long = System.currentTimeMillis() val path:String = "data/iris.data" var df: DataFrame = spark_session.read.csv(path) /* * csv 文件数据内容如下 * 5.1,3.5,1.4,0.2,Iris-setosa 5.0,2.0,3.5,1.0,Iris-versicolor 6.2,3.4,5.4,2.3,Iris-virginica * */ /* +---+---+---+---+-----------+ |_c0|_c1|_c2|_c3| _c4| +---+---+---+---+-----------+ */ val ls: Column = when(col("_c4").equalTo("Iris-setosa"), "1"). when(col("_c4").equalTo("Iris-versicolor"), "2"). otherwise("3") df = df.withColumn("_c4",ls) df = df.select(df.columns.map(f => df(f).cast(DoubleType)): _*) df.show() /* * 处理结果 * 5.1,3.5,1.4,0.2,1 5.0,2.0,3.5,1.0,2 6.2,3.4,5.4,2.3,3 * */ df.printSchema() spark_session.stop() println("执行时间为:"+ (System.currentTimeMillis()-st)/1000.toDouble +"s") } }