zoukankan      html  css  js  c++  java
  • spark SQL编程

    1.编程实现将 RDD 转换为 DataFrame
    源文件内容如下(包含 id,name,age):

    1,Ella,36
    2,Bob,29
    3,Jack,29

     

     

    将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代码。 

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Encoder
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
    object RDDtoDF {
      def main(args: Array[String]) {
       val spark=SparkSession.builder().appName("RddToFrame").master("local").getOrCreate()
        import  spark.implicits._
        val employeeRDD=spark.sparkContext.textFile("file:///usr/local/spark/employee.txt")
        val schemaString="id name age"
        val fields=schemaString.split(" ").map(fieldName=>StructField
        (fieldName,StringType,nullable = true))
        val schema = StructType(fields)
        val  rowRDD  =  employeeRDD.map(_.split(",")).map(attributes  =>
          Row(attributes(0).trim, attributes(1), attributes(2).trim))
    
        val employeeDF = spark.createDataFrame(rowRDD, schema)
        employeeDF.createOrReplaceTempView("employee")
        val results=spark.sql("select id,name,age from employee")
        results.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show()
    
      }
    }

    2.编程实现利用 DataFrame 读写 MySQL 的数据
    1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的
    两行数据。
    6-2 employee 表原有数据

    id name gender Age
    1 Alice F 22
    2 John M 25

     打开mysql

     


    2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。

    6-3 employee 表新增数据

    id name gender age
    3 Mary F 26
    4 Tom M 23

     

     

    import java.util.Properties
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
    object TestMySQL {
      def main(args: Array[String]): Unit = {
        val spark=SparkSession.builder().appName("TestMySQL").master("local").getOrCreate()
        import  spark.implicits._
        val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
        val schema=StructType(List(StructField("id",IntegerType,
          true),StructField("name",StringType,true),StructField("gender",StringType,true),
          StructField("age",IntegerType,true)))
        val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
        val employeeDF=spark.createDataFrame(rowRDD,schema)
        val prop=new Properties()
        prop.put("user","root")
        prop.put("password","wangli")
        prop.put("driver","com.mysql.jdbc.Driver")
        employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee",prop)
        val jdbcDF = spark.read.format("jdbc").option("url",
          "jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee")
          .option("user","root").option("password", "wangli").load()
        jdbcDF.agg("age" -> "max", "age" -> "sum").show()
      }
    
    }

     

     

     

     

     

     

     

     

     

     

     

     

     

     





    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利.
  • 相关阅读:
    Swift开发第六篇——操作运算符也可以重载& func 的参数修饰
    Swift开发第五篇——四个知识点(Struct Mutable方法&Tuple&autoclosure&Optional Chain)
    Swift开发第四篇——柯里化
    Swift开发第三篇——Playground
    Swift开发第一篇——异常处理及断言
    在Linux CentOS 6.5 (Final)上安装git-1.9.0
    如何有效地配置基于Spring的应用系统
    关于URL编码的问题
    如何优化pom依赖项?
    启动Tomcat的几种方式
  • 原文地址:https://www.cnblogs.com/wl2017/p/10604328.html
Copyright © 2011-2022 走看看