1. parquet
object testSparkReadParquet { def main(args: Array[String]): Unit = { var spark = SparkSession.builder().appName("TestSparkSession").master("local").getOrCreate() val df = spark.read.parquet("D:\tools\testSparkFile\users.parquet"); df.printSchema() df.select("name","favorite_color","favorite_numbers").show() df.select("name","favorite_color").write.mode("overwrite").save("D:\tools\testSparkFile\namesAndFavColors.parquet") val df2 = spark.read.parquet("D:\tools\testSparkFile\namesAndFavColors.parquet"); df2.printSchema() } }
object TestParquet { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .master("local") .getOrCreate(); import spark.implicits._ val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.mode("append").parquet("D:\tools\testSparkFile\test\key=1") val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.mode("append").parquet("D:\tools\testSparkFile\test\key=2") // Read the partitioned table val mergedDF = spark.read.option("mergeSchema", "true").parquet("D:\tools\testSparkFile\test\") mergedDF.select("value", "square","key").show() mergedDF.printSchema() } }
2. DataFrame
object DFExample { case class Student(id:Int, name:String, phone:String, email:String, age:Int) def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("DFExample").master("local").getOrCreate() import spark.implicits._ val studentDF = spark.sparkContext.textFile("D:\tools\testSparkFile\dfTestFile.txt").map(_.split("\|")) .map(line =>Student(line(0).trim().toInt, line(1).trim(), line(2).trim(),line(3).trim(),line(4).trim().toInt)).toDF() val studentDF2 = studentDF studentDF.show() //过滤掉名字为空的数据 //studentDF.filter("name !='' OR name != 'NULL' " ).show() //没起作用 studentDF.filter("name !='NULL'" ).filter("name !=''").show() //找出名字以l开头的人 studentDF.filter("substr(name,0,1)='l'").show() // spark.sql("show functions").show(2000) import org.apache.spark.sql.functions._ import spark.implicits._ //按照名字排序操作 studentDF.sort("name").show() studentDF.sort(studentDF.col("name").desc).show() //队列进行重新命名 studentDF.select(studentDF.col("name").as("student_name")).show() //join操作 studentDF.join(studentDF2,studentDF.col("id") === studentDF2.col("id")).sort(studentDF.col("id")).show() } }
object ScalaREPL { case class REPL(id_1:Int,id_2:Int,cmp_fname_c1:Double,cmp_fname_c2:String,cmp_lname_c1:Double,cmp_lname_c2:String, cmp_sex:Double,cmp_bd:Double,cmp_bm:Double,cmp_by:Double,cmp_plz:Double,is_match:Boolean) case class REPL2(summary:String,cmp_fname_c1:Double,cmp_fname_c2:Double) def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("DFExample").master("local").getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._ val df = spark.sparkContext.textFile("D:\tools\testSparkFile\scalaREPL.txt").filter(_.length() != 0).map(_.split(",")) .map(line =>REPL(line(0).trim().toInt,line(1).trim().toInt,line(2).trim().toDouble,line(3).trim(), line(4).trim().toDouble,line(5).trim(),line(6).trim().toDouble,line(7).trim().toDouble, line(8).trim().toDouble,line(9).trim().toDouble,line(10).trim().toDouble,line(11).trim().toBoolean)).toDF() df.show() // df.cache() // df.rdd.map(_.getAs[Boolean]("is_match")).countByValue().foreach(println) // df.select(col("is_match")).groupBy(col("is_match")).count().show() // df.groupBy(col("is_match")).count().orderBy(col("is_match").desc).show() // df.agg(count(col("cmp_fname_c1")),mean(col("cmp_fname_c1")), stddev(col("cmp_fname_c1")), min(col("cmp_fname_c1")), max(col("cmp_fname_c1"))).show() // // df.createOrReplaceTempView("linkage") // spark.sql(""" // select is_match, count(*) count // from linkage // group by is_match // order by is_match desc // """).show() val matches = df.where("is_match = true") matches.createOrReplaceTempView("metch_desc") matches.show() // val matchSummary = matches.distinct() val misses = df.filter("is_match = false") misses.createOrReplaceTempView("miss_desc") // val missSummary = misses.distinct() //长表转宽表 /* val df2 = spark.sparkContext.textFile("D:\tools\testSparkFile\scalaREPL2.txt").filter(_.length() != 0).map(_.split(",")) .map(line =>REPL2(line(0).trim(),line(1).trim().toDouble,line(2).trim().toDouble)).toDF() df2.show() val schema = df2.schema val ds = df2.flatMap(row =>{ val metric = row.getString(0) (1 until row.size).map(i =>{ (metric, schema(i).name, row.getDouble(i)) }) }) val longDF = ds.toDF("metric","field","value") longDF.show() longDF.groupBy("field").pivot("metric",Seq("count","mean","suddev","max","min")).agg(first("value")).show()*/ } }
3. //dataSet 转 DataFrame 使用内置函数
object TestMachineLearning { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("DFExample").master("local").getOrCreate() import spark.implicits._ val rawUserArtistData = spark.read.textFile("D:\tools\testSparkFile\profiledata\user_artist_data.txt") rawUserArtistData.take(5).foreach(println) //dataSet 转 DataFrame 使用内置函数 val userArtistDF = rawUserArtistData.map(line =>{ val Array(user,artist, _*) = line.split(" ") (user, artist.toInt) }).toDF("user","artist") import org.apache.spark.sql.functions._ userArtistDF.agg(max("user"),min("user"),max("artist"),min("artist")).show() } }