zoukankan      html  css  js  c++  java
  • 实验 5 Spark SQL 编程初级实践

    1.Spark SQL 基本操作
    将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。

    { "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name":"Jack","age":29 } { "id":4 , "name":"Jim","age":28 } { "id":4 , "name":"Jim","age":28 } { "id":5 , "name":"Damon" } { "id":5 , "name":"Damon" }

    为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:

    (1) 查询所有数据;
    (2) 查询所有数据,并去除重复的数据;

    (3) 查询所有数据,打印时去除 id 字段;

    (4) 筛选出 age>30 的记录;

    (5) 将数据按 age 分组;

    (6) 将数据按 name 升序排列;

    (7) 取出前 3 行数据;

    (8) 查询所有记录的 name 列,并为其取别名为 username;

    (9) 查询年龄 age 的平均值;

    (10) 查询年龄 age 的最小值

    .

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    
    object two1
    {
      def main(args: Array[String]) {
        val conf = new SparkConf()
        conf.setMaster("local")
          .setAppName("two1")
        val sc = new SparkContext(conf)
        var spark = SparkSession.builder().getOrCreate()
        var df = spark.read.json("D:\Users\Administrator\IdeaProjects\WordCount\employee.json")
        //查询所有数据;
        df.show()
        //2 查询所有数据,并去除重复的数据
        df.distinct().show()
        //(3) 查询所有数据,打印时去除 id 字段
        df.drop("id").show()
        //(4) 筛选出 age>30 的记录;
        df.filter(df("age")>30).show()
        //(5) 将数据按 age 分组;
        df.groupBy("name").count().show()
        //(6) 将数据按 name 升序排列;
        df.sort(df("name").asc).show()
        //(7) 取出前 3 行数据
        df.take(3)
        //(8) 查询所有记录的 name 列,并为其取别名为 username;
        df.select(df("name")as("username")).show()
        //(9) 查询年龄 age 的平均值;
        df.agg("age"->"avg")
        //(10) 查询年龄 age 的最小值
        df.agg("age"->"min")
      }
    }

    2.编程实现将 RDD 转换为 DataFrame
    源文件内容如下(包含 id,name,age): 1,Ella,36 2,Bob,29 3,Jack,29 请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代 码

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SparkSession
    object RDDtoDF {
      case class Employee(id:Long,name:String,age:Long)
      def main(args: Array[String]) {
        val conf = new SparkConf()
        conf.setMaster("local")
          .setAppName("RDDtoDF")
        val sc = new SparkContext(conf)
        val spark = SparkSession.builder.getOrCreate
        import spark.implicits._
        val employeeDF = sc.textFile("D:\Users\Administrator\IdeaProjects\WordCount\employee.txt")
          .map(_.split(","))
          .map(attributes => Employee(attributes(0).trim.toInt, attributes(1), attributes(2).trim.toInt)).toDF()
        employeeDF.createOrReplaceTempView("employee")
        var employeeRDD = spark.sql("select id,name,age from employee")
        employeeRDD.map(t=>"id:"+t(0)+"name:"+t(1)+"age:"+t(2)).show()
      }
    }

    3. 编程实现利用 DataFrame 读写 MySQL 的数据
    (1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的 两行数据。 表原有数据

    id name gender Age

    1 Alice F 22

    2 John M 25

    (2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所 示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。 表 6-3 employee 表新增数据

    id name gender age

    3 Mary F 26

    4 Tom M 23

    import java.util.Properties
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{Row, SparkSession}
    object TestMySql {
      def main(args: Array[String])
      {
        val conf = new SparkConf()
        conf.setMaster("local")
          .setAppName("RDDtoDF")
        val sc = new SparkContext(conf)
        val spark = SparkSession.builder.getOrCreate
        import spark.implicits._
        val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23"))
        .map(_.split(" "))
        val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true)
          ,StructField("gender", StringType, true)
          ,StructField("age", IntegerType, true)))
        val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim, p(2).trim,p(3).toInt))
        val employeeDF = spark.createDataFrame(rowRDD, schema)
        val prop = new Properties()
        prop.put("user","root")//账号
        prop.put("password","****")//密码
        prop.put("driver","com.mysql.jdbc.Driver")
        employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","employee",prop)//前面是数据库名,后面是表名
        val jdbcDF = spark.read.format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/sparktest")
          .option("driver","com.mysql.jdbc.Driver")
          .option("dbtable","employee")
          .option("user","root")
          .option("password","*****")
          .load()
        jdbcDF.agg("age" -> "max", "age" -> "sum").show()
      }
    }
    https://necydcy.me/
  • 相关阅读:
    HDU 5486 Difference of Clustering 图论
    HDU 5481 Desiderium 动态规划
    hdu 5480 Conturbatio 线段树 单点更新,区间查询最小值
    HDU 5478 Can you find it 随机化 数学
    HDU 5477 A Sweet Journey 水题
    HDU 5476 Explore Track of Point 数学平几
    HDU 5475 An easy problem 线段树
    ZOJ 3829 Known Notation 贪心
    ZOJ 3827 Information Entropy 水题
    zoj 3823 Excavator Contest 构造
  • 原文地址:https://www.cnblogs.com/miria-486/p/10603964.html
Copyright © 2011-2022 走看看