zoukankan      html  css  js  c++  java
  • Spark实验五

    2.编程实现将 RDD 转换为 DataFrame
    利用反射来推断包含特定类型对象的 RDD 的 schema,适用对已知数据结构的 RDD转换
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.Encoder
    import spark.implicits._
    object RDDtoDF {
     def main(args: Array[String]) {
    case class Employee(id:Long,name: String, age: Long)
    val employeeDF = 
    spark.sparkContext.textFile("file:///usr/local/spark/employee.txt").map(_.split(",")).map(at
    tributes => Employee(attributes(0).trim.toInt,attributes(1), attributes(2).trim.toInt)).toDF()
    employeeDF.createOrReplaceTempView("employee")
    val employeeRDD = spark.sql("select id,name,age from employee")
    employeeRDD.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show()
     } }

    3. 编程实现利用 DataFrame 读写 MySQL 的数据

    import java.util.Properties
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    object TestMySQL {
     def main(args: Array[String]) {
    val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 
    23")).map(_.split(" "))
    val schema = StructType(List(StructField("id", IntegerType, 
    true),StructField("name", StringType, true),StructField("gender", StringType, 
    true),StructField("age", IntegerType, true)))
    val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim, 
    p(2).trim,p(3).toInt))
    val employeeDF = spark.createDataFrame(rowRDD, schema)
    val prop = new Properties()
    prop.put("user", "root") 
    prop.put("password", "hadoop") 
    prop.put("driver","com.mysql.jdbc.Driver")
    employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest", 
    sparktest.employee", prop)
    val jdbcDF = spark.read.format("jdbc").option("url", 
    "jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").optio
    n("dbtable","employee").option("user","root").option("password", "hadoop").load()
    jdbcDF.agg("age" -> "max", "age" -> "sum")
     } }
  • 相关阅读:
    OC中数组的使用方法
    fuel Explain
    OpenStack images
    linux dd实现磁盘完整全盘镜像备份backup,恢复recover(restore)
    linux大事件集
    Ruiy classicsQuotations
    OpenSuSE zypper OpenStack Icehouse repoAdd
    OpenSuSE zypper repo及Desktop媒体播放器设置 for OpenSuSE12.
    OpenSuSE查看指定软件包是否安装(OpenSuSE使用RPM作为默认的软件包维护管理工具)
    OpenStack开启sshd
  • 原文地址:https://www.cnblogs.com/daisy99lijing/p/12262181.html
Copyright © 2011-2022 走看看