zoukankan      html  css  js  c++  java
  • SparkSql初级编程实践

    1.Spark SQL 基本操作
    将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。
    { "id":1 , "name":" Ella" , "age":36 }
    { "id":2, "name":"Bob","age":29 }
    { "id":3 , "name":"Jack","age":29 }
    { "id":4 , "name":"Jim","age":28 }
    { "id":4 , "name":"Jim","age":28 }
    { "id":5 , "name":"Damon" }
    { "id":5 , "name":"Damon" }
    为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:

    (1) 查询所有数据;

     

    (2) 查询所有数据,并去除重复的数据;

     

    (3) 查询所有数据,打印时去除 id 字段;

    (4) 筛选出 age>30 的记录;

    (5) 将数据按 age 分组;

    (6) 将数据按 name 升序排列;

    (7) 取出前 3 行数据

    (8) 查询所有记录的 name 列,并为其取别名为 username

    (9) 查询年龄 age 的平均值;

    (10) 查询年龄 age 的最小值。

    2.编程实现将 RDD 转换为 DataFrame

    源文件内容如下(包含 id,name,age):
    1,Ella,36
    2,Bob,29
    3,Jack,29
    请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到
    DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代
    码。

    package cn.spark.study.sy5
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    
    
    object Testsql {
      def main(args: Array[String]) {
           val conf = new SparkConf()
           conf.setMaster("local")
               .setAppName("Testsql")
           val sc = new SparkContext(conf)
           val sqlContext = new SQLContext(sc)
           //hdfs://192.168.6.134:9000/nlc/1.txt
           //H:文件数据集
           val studentRDD = sc.textFile("D:\myDevelopTools\Intellij IDEA\workplace\spark-study-scala\src\main\java\cn\spark\study\sy5\employee.txt", 1)
          .map { line => Row(line.split(",")(0), line.split(",")(1), line.split(",")(2)) }
            // 第二步,编程方式动态构造元数据
          val structType = StructType(Array(
              StructField("id", StringType, true),
              StructField("name", StringType, true),
              StructField("age", StringType, true)))
          // 第三步,进行RDD到DataFrame的转换
          val studentDF = sqlContext.createDataFrame(studentRDD, structType)
          // 继续正常使用
          studentDF.registerTempTable("employee")
    //      val teenagerDF = sqlContext.sql("select usrid,count(usrid) from students group by usrid order by usrid")
          val teenagerDF = sqlContext.sql("select id,name,age from employee")
          val teenagerRDD = teenagerDF.rdd.collect().foreach { row => println("id:"+row(0)+",name:"+row(1)+",age:"+row(2)) }
        }
    }

    3. 编程实现利用 DataFrame 读写 MySQL 的数据
    (1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的
    两行数据。
    表 6-2 employee 表原有数据
    id name gender Age
    1  Alice   F         22
    2  John   M        25
    (2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所
    示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。
    表 6-3 employee 表新增数据
    id name gender age
    3  Mary   F   26
    4  Tom   M   23

    package cn.spark.study.sy5
    import java.util.Properties
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{Row, SQLContext}
    /**
      * Created by Lenovo on 2019/3/27.
      */
    object TestMySQL {
      def main(args: Array[String]) {
        val conf = new SparkConf()
        conf.setMaster("local")
          .setAppName("Testsql")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
    
        val employeeRDD = sqlContext.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
        val schema = StructType(List(StructField("id", IntegerType,
          true),StructField("name", StringType, true),StructField("gender", StringType,
          true),StructField("age", IntegerType, true)))
        val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1),
          p(2),p(3).toInt))
        val employeeDF = sqlContext.createDataFrame(rowRDD, schema)
        val prop = new Properties()
        prop.put("user", "root")
        prop.put("password", "123123")
        prop.put("driver","com.mysql.jdbc.Driver")
        employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest", "sparktest.employee", prop)
          val jdbcDF = sqlContext.read.format("jdbc").option("url",
          "jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "123123").load()
          jdbcDF.agg("age" -> "max", "age" -> "sum")
          }
    
    }

     

  • 相关阅读:
    微服务下的持续集成-Jenkins自动化部署GitHub项目
    JDK新特性-Lambda表达式的神操作
    ActiveMQ详细入门教程系列(一)
    程序员必须了解的知识点——你搞懂mysql索引机制了吗?
    面试之后,扼腕叹息。
    哎,这让人抠脑壳的 LFU。
    延迟消息的五种实现方案
    Java实现Kafka生产者和消费者的示例
    Pytorch训练时显存分配过程探究
    Python命令行参数定义及注意事项
  • 原文地址:https://www.cnblogs.com/news1997/p/10606556.html
Copyright © 2011-2022 走看看