zoukankan      html  css  js  c++  java
  • Spark 实例

    1. parquet

    object testSparkReadParquet {
        def main(args: Array[String]): Unit = {
            var spark = SparkSession.builder().appName("TestSparkSession").master("local").getOrCreate()
            
            val df = spark.read.parquet("D:\tools\testSparkFile\users.parquet");
            
            df.printSchema()
            
            df.select("name","favorite_color","favorite_numbers").show()
            
             df.select("name","favorite_color").write.mode("overwrite").save("D:\tools\testSparkFile\namesAndFavColors.parquet")
             
             val df2 = spark.read.parquet("D:\tools\testSparkFile\namesAndFavColors.parquet");
            df2.printSchema()
        }
    }
    object TestParquet {
        def main(args: Array[String]): Unit = {
           val spark = SparkSession
                      .builder()
                      .appName("Java Spark SQL basic example")
                      .config("spark.some.config.option", "some-value")
                      .master("local")
                      .getOrCreate();
            import spark.implicits._
            val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
            squaresDF.write.mode("append").parquet("D:\tools\testSparkFile\test\key=1")
            
            val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
            cubesDF.write.mode("append").parquet("D:\tools\testSparkFile\test\key=2")
            
            // Read the partitioned table
            val mergedDF = spark.read.option("mergeSchema", "true").parquet("D:\tools\testSparkFile\test\")
            mergedDF.select("value", "square","key").show()
            mergedDF.printSchema()
        }
    }

    2. DataFrame

    object DFExample {
        case class Student(id:Int, name:String, phone:String, email:String, age:Int)
        def main(args: Array[String]): Unit = {
            val spark = SparkSession.builder().appName("DFExample").master("local").getOrCreate()
            import spark.implicits._
            val studentDF = spark.sparkContext.textFile("D:\tools\testSparkFile\dfTestFile.txt").map(_.split("\|"))
            .map(line =>Student(line(0).trim().toInt, line(1).trim(), line(2).trim(),line(3).trim(),line(4).trim().toInt)).toDF()
            
            val studentDF2 = studentDF
            
            studentDF.show()
            
            //过滤掉名字为空的数据
            //studentDF.filter("name !='' OR name != 'NULL' " ).show()  //没起作用
            studentDF.filter("name !='NULL'" ).filter("name !=''").show()  
            
            //找出名字以l开头的人
            studentDF.filter("substr(name,0,1)='l'").show()
            
    //        spark.sql("show functions").show(2000)
            import org.apache.spark.sql.functions._
            import spark.implicits._
            
            //按照名字排序操作
            studentDF.sort("name").show()
            studentDF.sort(studentDF.col("name").desc).show()
            
            //队列进行重新命名
            studentDF.select(studentDF.col("name").as("student_name")).show()
            
            //join操作
            studentDF.join(studentDF2,studentDF.col("id") === studentDF2.col("id")).sort(studentDF.col("id")).show()
        }
    }
    object ScalaREPL {
        case class REPL(id_1:Int,id_2:Int,cmp_fname_c1:Double,cmp_fname_c2:String,cmp_lname_c1:Double,cmp_lname_c2:String,
            cmp_sex:Double,cmp_bd:Double,cmp_bm:Double,cmp_by:Double,cmp_plz:Double,is_match:Boolean)
        case class REPL2(summary:String,cmp_fname_c1:Double,cmp_fname_c2:Double)
        def main(args: Array[String]): Unit = {
            val spark = SparkSession.builder().appName("DFExample").master("local").getOrCreate()
            import spark.implicits._
            import org.apache.spark.sql.functions._
            val df = spark.sparkContext.textFile("D:\tools\testSparkFile\scalaREPL.txt").filter(_.length() != 0).map(_.split(","))
            .map(line =>REPL(line(0).trim().toInt,line(1).trim().toInt,line(2).trim().toDouble,line(3).trim(),
                line(4).trim().toDouble,line(5).trim(),line(6).trim().toDouble,line(7).trim().toDouble,
                line(8).trim().toDouble,line(9).trim().toDouble,line(10).trim().toDouble,line(11).trim().toBoolean)).toDF()
                
              df.show()
    //        df.cache()    
    //        df.rdd.map(_.getAs[Boolean]("is_match")).countByValue().foreach(println)
    //        df.select(col("is_match")).groupBy(col("is_match")).count().show()
    //          df.groupBy(col("is_match")).count().orderBy(col("is_match").desc).show()
              
    //          df.agg(count(col("cmp_fname_c1")),mean(col("cmp_fname_c1")), stddev(col("cmp_fname_c1")), min(col("cmp_fname_c1")), max(col("cmp_fname_c1"))).show()
    //          
    //          df.createOrReplaceTempView("linkage")
              
    //          spark.sql("""
    //            select is_match, count(*) count 
    //            from linkage 
    //            group by is_match 
    //            order by is_match desc
    //            """).show()
              
              val matches = df.where("is_match = true")
              matches.createOrReplaceTempView("metch_desc")
              matches.show()
    //          val matchSummary = matches.distinct()
              val misses = df.filter("is_match = false")
              misses.createOrReplaceTempView("miss_desc")
    //          val missSummary = misses.distinct()
              
              
                //长表转宽表
            /*    val df2 = spark.sparkContext.textFile("D:\tools\testSparkFile\scalaREPL2.txt").filter(_.length() != 0).map(_.split(","))
                .map(line =>REPL2(line(0).trim(),line(1).trim().toDouble,line(2).trim().toDouble)).toDF()
                df2.show()
                val schema = df2.schema
                val ds = df2.flatMap(row =>{
                  val metric = row.getString(0)
                  (1 until row.size).map(i =>{
                    (metric, schema(i).name, row.getDouble(i))
                  })
                })
                val longDF = ds.toDF("metric","field","value")
                longDF.show()
                longDF.groupBy("field").pivot("metric",Seq("count","mean","suddev","max","min")).agg(first("value")).show()*/
                
                
                
        }
    }

     3.  //dataSet 转 DataFrame  使用内置函数

    object TestMachineLearning {
        def main(args: Array[String]): Unit = {
            val spark = SparkSession.builder().appName("DFExample").master("local").getOrCreate()
            import spark.implicits._
            val rawUserArtistData = spark.read.textFile("D:\tools\testSparkFile\profiledata\user_artist_data.txt")
            rawUserArtistData.take(5).foreach(println)
            
            //dataSet 转 DataFrame  使用内置函数
            val userArtistDF = rawUserArtistData.map(line =>{
              val Array(user,artist, _*) = line.split(" ")
              (user, artist.toInt)
            }).toDF("user","artist")
            import org.apache.spark.sql.functions._
            userArtistDF.agg(max("user"),min("user"),max("artist"),min("artist")).show()
        }
    }
  • 相关阅读:
    mysql重复数据下,删除一条重复数据
    Mysql常用函数
    鼠标滑至某位置,在鼠标旁边出现详情弹窗div
    限制文本框只能输入正数,负数,小数
    JAVA中split对空串的影响。
    《JAVA与模式》之简单工厂模式 (转)
    linux 常用命令汇总
    《JAVA与模式》之责任链模式
    《JAVA与模式》之模板方法模式 (转)
    java 异常汇总
  • 原文地址:https://www.cnblogs.com/redhat0019/p/11423811.html
Copyright © 2011-2022 走看看