zoukankan      html  css  js  c++  java
  • 寒假学习进度-5

    spark sql的基本编程方法

    连接文件

    val df = spark.read.json(“file:///abc/lianxi/bigdata/src/main/data/people.json”)

    显示

    scala> df.show() 

    去重并显示

    scala> df.distinct().show() 

    查找

    scala> df.filter(df("age") > 30 ).show() 

    排序

    scala> df.sort(df("name").asc).show() 

    平均值

    scala> df.agg("age"->"avg") 

    实现从 RDD 转换得到 DataFrame

    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.Encoder
    import spark.implicits._ object RDDtoDF
    { def main(args: Array[String])
    { case class Employee(id:Long,name: String, age: Long)
    val employeeDF = spark.sparkContext.textFile("file:///usr/local/spark/employee.txt").map(_.split(",")).map(at tributes => Employee(attributes(0).trim.toInt,attributes(1),
    attributes(2).trim.toInt)).toDF() employeeDF.createOrReplaceTempView("employee") val employeeRDD = spark.sql("select id,name,age from employee")
    employeeRDD.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show() } }

    利用 DataFrame 读写 MySQL 的数据 

    import java.util.Properties
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row object TestMySQL
    { def main(args: Array[String])
    { val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
    val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
    val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim, p(2).trim,p(3).toInt))
    val employeeDF = spark.createDataFrame(rowRDD, schema)
    val prop = new Properties() prop.put("user", "root")
    prop.put("password", "hadoop")
    prop.put("driver","com.mysql.jdbc.Driver")
    employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest", sparktest.employee", prop)
    val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").optio n("dbtable","employee").option("user","root").option("password", "hadoop").load() jdbcDF.agg("age" -> "max", "age" -> "sum") } }
  • 相关阅读:
    地址打开eclipse汉化全程
    可行性nullpoj 2723 Get Luffy Out 2sat
    服务器端提交Git版本控制tag标签的使用(二)
    原因总结六级之阅读理解
    子类父类浅谈filter里面为什么要强制转换成httpServletRequest类型
    排序中文POJ 1696/hrbustoj 1318 几何 蛋疼的蚂蚁
    选择复选框js限制checkbox勾选的个数以及php获取多个checkbbox的方法
    环境节点[置顶] 如何终止特定 RAC 实例上的 session
    整数实例hdu2041(超级楼梯)
    属性框架Fixjs——显示基类DisplayObject
  • 原文地址:https://www.cnblogs.com/liujinxin123/p/12253319.html
Copyright © 2011-2022 走看看