zoukankan      html  css  js  c++  java
  • 学习进度笔记6

    今天完成spark实验5:Spark SQL编程初级实践。

    1、Spark SQL基本操作

    将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。

    { "id":1 , "name":" Ella" , "age":36 }

    { "id":2, "name":"Bob","age":29 }

    { "id":3 , "name":"Jack","age":29 }

    { "id":4 , "name":"Jim","age":28 }

    { "id":4 , "name":"Jim","age":28 }

    { "id":5 , "name":"Damon" }

    { "id":5 , "name":"Damon" }

    为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:

     创建DataFrame

    (1) 查询所有数据;

    (2) 查询所有数据,并去除重复的数据;

    (3) 查询所有数据,打印时去除 id 字段;

    (4) 筛选出 age>30 的记录;

    (5) 将数据按 age 分组;

    (6) 将数据按 name 升序排列;

    (7) 取出前 3 行数据;

    (8) 查询所有记录的 name 列,并为其取别名为 username;

    (9) 查询年龄 age 的平均值;

    (10) 查询年龄 age 的最小值。

    2、编程实现将RDD转换为DataFrame

    源文件内容如下(包含 id,name,age):

    1,Ella,36

    2,Bob,29

    3,Jack,29

    请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代码。

    假设当前目录为/usr/local/spark/mycode/rddtodf,在当前目录下新建一个目录 mkdir -p src/main/scala ,然后在目录 /usr/local/spark/mycode/rddtodf/src/main/scala 下 新 建 一 个 rddtodf.scala,复制下面代码;(下列两种方式任选其一)

    方法一:利用反射来推断包含特定类型对象的 RDD 的 schema,适用对已知数据结构的 RDD 转换;

    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.Encoder
    import spark.implicits._
    object RDDtoDF {
     def main(args: Array[String]) {
      case class Employee(id:Long,name: String, age: Long)
      val employeeDF =
    spark.sparkContext.textFile("file:///usr/local/spark/employee.txt").map(_.split(",")).map(at
    tributes => Employee(attributes(0).trim.toInt,attributes(1), attributes(2).trim.toInt)).toDF()
      employeeDF.createOrReplaceTempView("employee")
      val employeeRDD = spark.sql("select id,name,age from employee")
      employeeRDD.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show()
     }
    }

    方法二:使用编程接口,构造一个 schema 并将其应用在已知的 RDD 上。

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Encoder import org.apache.spark.sql.Row
    object RDDtoDF { def main(args: Array[String]) { val employeeRDD = spark.sparkContext.textFile("file:///usr/local/spark/employee.txt") val schemaString = "id name age" val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) val rowRDD = employeeRDD.map(_.split(",")).map(attributes => Row(attributes(0).trim, attributes(1), attributes(2).trim)) val employeeDF = spark.createDataFrame(rowRDD, schema) employeeDF.createOrReplaceTempView("employee") val results = spark.sql("SELECT id,name,age FROM employee") results.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show() } }

    在目录/usr/local/spark/mycode/rddtodf 目录下新建 simple.sbt,复制下面代码:

    name := "Simple Project"

    version := "1.0"

    scalaVersion := "2.11.8"

    libraryDependencies += "org.apache.spark" % "spark-core" % "2.1.0"

    在目录/usr/local/spark/mycode/rddtodf 下执行下面命令打包程序

     最后在目录/usr/local/spark/mycode/rddtodf 下执行下面命令提交程序

    在终端即可看到输出结果。

    3、编程实现利用DataFrame读写Mysql的数据

    (1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的两行数据。

    表 6-2 employee 表原有数据

    id name gender Age

    1  Alice    F         22

    2  John    M        25

    (2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。

    表 6-3 employee 表新增数据

    id name gender age

    3  Mary     F       26

    4  Tom      M      23 

    假设当前目录为/usr/local/spark/mycode/testmysql,在当前目录下新建一个目录 mkdir -p src/main/scala , 然 后 在 目 录 /usr/local/spark/mycode/testmysql/src/main/scala 下 新 建 一 个 testmysql.scala,复制下面代码;

    import java.util.Properties
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    object TestMySQL {
       def main(args: Array[String]) {
        val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M
      23")).map(_.split(" "))
        val schema = StructType(List(StructField("id", IntegerType,true),StructField("name", StringType, true),StructField("gender", StringType,
    true),StructField("age", IntegerType, true)))
        val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
        val employeeDF = spark.createDataFrame(rowRDD, schema)
        val prop = new Properties()
        prop.put("user", "root")
        prop.put("password", "hadoop")
        prop.put("driver","com.mysql.jdbc.Driver")
        employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest",
        sparktest.employee", prop)
        val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").optio
    n("dbtable","employee").option("user","root").option("password", "hadoop").load()
        jdbcDF.agg("age" -> "max", "age" -> "sum")
       }
    }

    在目录/usr/local/spark/mycode/testmysql 目录下新建 simple.sbt,复制下面代码:

    name := "Simple Project"

    version := "1.0"

    scalaVersion := "2.11.8"

    libraryDependencies += "org.apache.spark" % "spark-core" % "2.1.0"

    在目录/usr/local/spark/mycode/testmysql 下执行下面命令打包程序

     最后在目录/usr/local/spark/mycode/testmysql 下执行下面命令提交程序

    在终端即可看到输出结果。

    遇到的问题

    解决方法

    添加如下代码:

    val spark = org.apache.spark.sql.SparkSession.builder
    .master("local")
    .appName("Spark CSV Reader")
    .getOrCreate;

    import spark.implicits._

  • 相关阅读:
    测试的基本方法
    一些基本常用的正则表达式
    MySQL和Oracle的区别与不同
    Ubuntu中使用python3中的venv创建虚拟环境
    在Ubuntu中搭建Python3的虚拟环境并开始django项目
    Django中的图片加载不出来解决方式记录
    在django中进行后台管理时插入外键数据时不显示值的问题
    Django2.2连接mysql数据库出现django.core.exceptions.ImproperlyConfigured: mysqlclient 1.3.3 or newer is required; you have 0.7.11.None问题
    在Ubuntu中安装了MongoDB后无法启动mongod的问题
    PostgreSQL练习3
  • 原文地址:https://www.cnblogs.com/songxinai/p/14240209.html
Copyright © 2011-2022 走看看