zoukankan      html  css  js  c++  java
  • DateFrame

    DateFrame 早期叫作SchemaRDD是存放Row 对象的RDD,每个Row 对象代表一行记录。

    SchemaRDD 还包含记录的结构信息(即数据字段)。SchemaRDD 看起来和普通的RDD 很像,

    但是在内部,SchemaRDD 可以利用结构信息更加高效地存储数据。

    此外,SchemaRDD 还支持RDD 上所没有的一些新操作,比如运行SQL 查询。SchemaRDD 可以

    从外部数据源创建,也可以从查询结果或普通RDD 中创建。

    spark dataframe 的几个关键点:

    • 分布式的数据集
    • 类似关系型数据库中的table
    • 拥有丰富的操作函数,类似于 rdd 中的算子
    • 一个 dataframe 可以被注册成一张数据表,然后用 sql 语言在上面操作
    • 丰富的创建方式
      • 已有的RDD
      • 结构化数据文件
      • JSON数据集
      • Hive表
      • 外部数据库

    DataFrame API 是在 R 和 Python data frame 的设计灵感之上设计的,具有以下功能特性:

    • 从KB到PB级的数据量支持;
    • 多种数据格式和多种存储系统支持;
    • 通过Spark SQL 的 Catalyst优化器进行先进的优化,生成代码;
    • 通过Spark无缝集成所有大数据工具与基础设施;
    • 为Python、Java、Scala和R语言(SparkR)API;

    简单来说,dataframe 能够更方便的操作数据集,而且因为其底层是通过 spark sql 的 Catalyst优化器生成优化后的执行代码,所以其执行速度会更快。总结下来就是,使用 spark dataframe 来构建 spark app,能:

    • write less : 写更少的代码
    • do more : 做更多的事情
    • faster : 以更快的速度

     sparkshell 读取mysql

    scala> import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.SQLContext

    scala> val sqlcon=new SQLContext(sc)
    warning: there was one deprecation warning; re-run with -deprecation for details
    sqlcon: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4e079825

    scala> val mysqlDF=sqlcon.read.format("jdbc").options(Map("url"->"jdbc:mysql://localhost:3306/test","user"->"root","password"->"root","dbtable"->"role")).load()
    mysqlDF: org.apache.spark.sql.DataFrame = [roleid: int, name: string ... 4 more fields]

    show:展示数据

    scala> mysqlDF.show
    +------+----+-------------------+----+-----+----+
    |roleid|name| dateid|addr|phone| sex|
    +------+----+-------------------+----+-----+----+
    | 1|null|2017-11-16 14:49:11|null| null|null|
    | 40|null|2017-11-13 14:50:25|null| null|null|
    | 110|null|2017-11-14 14:50:47|null| null|null|
    | 200|null|2017-11-14 14:49:47|null| null|null|
    | 400|null|2017-11-15 14:49:56|null| null|null|
    | 600|null|2017-11-15 14:50:05|null| null|null|
    +------+----+-------------------+----+-----+----+

    scala> mysqlDF.show(2)
    +------+----+-------------------+----+-----+----+
    |roleid|name| dateid|addr|phone| sex|
    +------+----+-------------------+----+-----+----+
    | 1|null|2017-11-16 14:49:11|null| null|null|
    | 40|null|2017-11-13 14:50:25|null| null|null|
    +------+----+-------------------+----+-----+----+
    only showing top 2 rows

    collect:获取所有数据到数组

    scala> mysqlDF.collect
    res35: Array[org.apache.spark.sql.Row] = Array([1,null,2017-11-16 14:49:11.0,null,null,null], [40,null,2017-11-13 14:50:25.0,null,null,null],

    [110,null,2017-11-14 14:50:47.0,null,null,null], [200,null,2017-11-14 14:49:47.0,null,null,null], [400,null,2017-11-15 14:49:56.0,null,null,null],

    [600,null,2017-11-15 14:50:05.0,null,null,null])

    collectAsList: 获取所有数据到List

    scala> mysqlDF.collectAsList
    res36: java.util.List[org.apache.spark.sql.Row] = [[1,null,2017-11-16 14:49:11.0,null,null,null], [40,null,2017-11-13 14:50:25.0,null,null,null], [110,null,2017-11-14 14:50:47.0,null,null,null], [200,null,2017-11-14 14:49:47.0,null,null,null], [400,null,2017-11-15 14:49:56.0,null,null,null], [600,null,2017-11-15 14:50:05.0,null,null,null]]

    describe(cols: String*):获取指定字段的统计信息

    这个方法可以动态的传入一个或多个String类型的字段名,结果仍然为DataFrame对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max等。 

    scala> mysqlDF.describe("level").show
    +-------+------------------+
    |summary| level|
    +-------+------------------+
    | count| 6|
    | mean|22.333333333333332|
    | stddev| 34.26173764809174|
    | min| 1|
    | max| 91|
    +-------+------------------+

    first, head, take, takeAsList:获取若干行记录

    scala> mysqlDF.head
    res50: org.apache.spark.sql.Row = [1,null,2017-11-16 14:49:11.0,null,null,10]

    scala> mysqlDF.head(3)
    res51: Array[org.apache.spark.sql.Row] = Array([1,null,2017-11-16 14:49:11.0,null,null,10], [40,null,2017-11-13 14:50:25.0,null,null,1], [110,null,2017-11-14 14:50:47.0,null,null,20])

    scala> mysqlDF.take(2)
    res52: Array[org.apache.spark.sql.Row] = Array([1,null,2017-11-16 14:49:11.0,null,null,10], [40,null,2017-11-13 14:50:25.0,null,null,1])

    where(conditionExpr: String):SQL语言中where关键字后的条件

    scala> mysqlDF.where("roleid=1 or roleid=400").show
    +------+----+-------------------+----+-----+-----+
    |roleid|name| dateid|addr|phone|level|
    +------+----+-------------------+----+-----+-----+
    | 1|null|2017-11-16 14:49:11|null| null| 10|
    | 400|null|2017-11-15 14:49:56|null| null| 4|
    +------+----+-------------------+----+-----+-----+


    scala> mysqlDF.where("roleid in (1,400)").show
    +------+----+-------------------+----+-----+-----+
    |roleid|name| dateid|addr|phone|level|
    +------+----+-------------------+----+-----+-----+
    | 1|null|2017-11-16 14:49:11|null| null| 10|
    | 400|null|2017-11-15 14:49:56|null| null| 4|
    +------+----+-------------------+----+-----+-----+

    filter:根据字段进行筛选,和where使用条件相同 
    scala> mysqlDF.filter("roleid in(1,400)").show
    +------+----+-------------------+----+-----+-----+
    |roleid|name| dateid|addr|phone|level|
    +------+----+-------------------+----+-----+-----+
    | 1|null|2017-11-16 14:49:11|null| null| 10|
    | 400|null|2017-11-15 14:49:56|null| null| 4|
    +------+----+-------------------+----+-----+-----+

    select:获取指定字段值

    scala> mysqlDF.select("roleid","dateid","level").where("roleid=1 or roleid=400").show
    +------+-------------------+-----+
    |roleid| dateid|level|
    +------+-------------------+-----+
    | 1|2017-11-16 14:49:11| 10|
    | 400|2017-11-15 14:49:56| 4|
    +------+-------------------+-----+

    selectExpr:可以对指定字段进行特殊处理

    scala> mysqlDF.selectExpr("roleid","cast(dateid as int)").show
    +------+----------+
    |roleid| dateid|
    +------+----------+
    | 1|1510814951|
    | 40|1510555825|
    | 110|1510642247|
    | 200|1510642187|
    | 400|1510728596|
    | 600|1510728605|
    +------+----------+


    scala> mysqlDF.apply("dateid")
    res91: org.apache.spark.sql.Column = dateid

    scala> mysqlDF.col("dateid")
    res92: org.apache.spark.sql.Column = dateid

    drop:去除指定字段,保留其他字段

    返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。

    scala> mysqlDF.drop("phone").drop("addr").drop("name").show
    +------+-------------------+-----+
    |roleid| dateid|level|
    +------+-------------------+-----+
    | 1|2017-11-16 14:49:11| 10|
    | 40|2017-11-13 14:50:25| 1|
    | 110|2017-11-14 14:50:47| 20|
    | 200|2017-11-14 14:49:47| 8|
    | 400|2017-11-15 14:49:56| 4|
    | 600|2017-11-15 14:50:05| 91|
    +------+-------------------+-----+

    dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe

    scala> mysqlDF.dropDuplicates("name","level").show
    +------+----+-------------------+----+-----+-----+
    |roleid|name| dateid|addr|phone|level|
    +------+----+-------------------+----+-----+-----+
    | 600|null|2017-11-15 14:50:05|null| null| 91|
    | 40|null|2017-11-13 14:50:25|null| null| 20|
    | 400|null|2017-11-15 14:49:56|null| null| 4|
    | 200|null|2017-11-14 14:49:47|null| null| 8|
    | 1|null|2017-11-16 14:49:11|null| null| 10|
    +------+----+-------------------+----+-----+-----+


    scala> mysqlDF.dropDuplicates("roleid","level").show
    +------+----+-------------------+----+-----+-----+
    |roleid|name| dateid|addr|phone|level|
    +------+----+-------------------+----+-----+-----+
    | 400|null|2017-11-15 14:49:56|null| null| 4|
    | 200|null|2017-11-14 14:49:47|null| null| 8|
    | 40|null|2017-11-13 14:50:25|null| null| 20|
    | 600|null|2017-11-15 14:50:05|null| null| 91|
    | 110|null|2017-11-14 14:50:47|null| null| 20|
    | 1|null|2017-11-16 14:49:11|null| null| 10|
    +------+----+-------------------+----+-----+-----+


    scala> mysqlDF.show
    +------+----+-------------------+----+-----+-----+
    |roleid|name| dateid|addr|phone|level|
    +------+----+-------------------+----+-----+-----+
    | 1|null|2017-11-16 14:49:11|null| null| 10|
    | 40|null|2017-11-13 14:50:25|null| null| 20|
    | 110|null|2017-11-14 14:50:47|null| null| 20|
    | 200|null|2017-11-14 14:49:47|null| null| 8|
    | 400|null|2017-11-15 14:49:56|null| null| 4|
    | 600|null|2017-11-15 14:50:05|null| null| 91|
    +------+----+-------------------+----+-----+-----+

    limit

    limit方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和takehead不同的是,limit方法不是Action操作。

    scala> mysqlDF.limit(3).show
    +------+----+-------------------+----+-----+-----+
    |roleid|name| dateid|addr|phone|level|
    +------+----+-------------------+----+-----+-----+
    | 1|null|2017-11-16 14:49:11|null| null| 10|
    | 40|null|2017-11-13 14:50:25|null| null| 20|
    | 110|null|2017-11-14 14:50:47|null| null| 20|
    +------+----+-------------------+----+-----+-----+

     orderBysort:按指定字段排序,默认为升序 

     sort 同orderBy

    scala> mysqlDF.orderBy(mysqlDF("level").desc).show
    +------+----+-------------------+----+-----+-----+
    |roleid|name| dateid|addr|phone|level|
    +------+----+-------------------+----+-----+-----+
    | 600|null|2017-11-15 14:50:05|null| null| 91|
    | 40|null|2017-11-13 14:50:25|null| null| 20|
    | 110|null|2017-11-14 14:50:47|null| null| 20|
    | 1|null|2017-11-16 14:49:11|null| null| 10|
    | 200|null|2017-11-14 14:49:47|null| null| 8|
    | 400|null|2017-11-15 14:49:56|null| null| 4|
    +------+----+-------------------+----+-----+-----+


    scala> mysqlDF.orderBy(-mysqlDF("level")).show
    +------+----+-------------------+----+-----+-----+
    |roleid|name| dateid|addr|phone|level|
    +------+----+-------------------+----+-----+-----+
    | 600|null|2017-11-15 14:50:05|null| null| 91|
    | 40|null|2017-11-13 14:50:25|null| null| 20|
    | 110|null|2017-11-14 14:50:47|null| null| 20|
    | 1|null|2017-11-16 14:49:11|null| null| 10|
    | 200|null|2017-11-14 14:49:47|null| null| 8|
    | 400|null|2017-11-15 14:49:56|null| null| 4|
    +------+----+-------------------+----+-----+-----+

    scala> mysqlDF.orderBy("level").show
    +------+----+-------------------+----+-----+-----+
    |roleid|name| dateid|addr|phone|level|
    +------+----+-------------------+----+-----+-----+
    | 400|null|2017-11-15 14:49:56|null| null| 4|
    | 200|null|2017-11-14 14:49:47|null| null| 8|
    | 1|null|2017-11-16 14:49:11|null| null| 10|
    | 110|null|2017-11-14 14:50:47|null| null| 20|
    | 40|null|2017-11-13 14:50:25|null| null| 20|
    | 600|null|2017-11-15 14:50:05|null| null| 91|
    +------+----+-------------------+----+-----+-----+


    scala> mysqlDF.sort(mysqlDF("level").desc).show
    +------+----+-------------------+----+-----+-----+
    |roleid|name| dateid|addr|phone|level|
    +------+----+-------------------+----+-----+-----+
    | 600|null|2017-11-15 14:50:05|null| null| 91|
    | 40|null|2017-11-13 14:50:25|null| null| 20|
    | 110|null|2017-11-14 14:50:47|null| null| 20|
    | 1|null|2017-11-16 14:49:11|null| null| 10|
    | 200|null|2017-11-14 14:49:47|null| null| 8|
    | 400|null|2017-11-15 14:49:56|null| null| 4|
    +------+----+-------------------+----+-----+-----+


    scala> loginDF.orderBy(desc("level")).show ---逆序

    scala> loginDF.orderBy(asc("level")).show  --顺序

    group by 返回值是一个org.apache.spark.sql.GroupedData数据类型。

    grouby必须结合聚合函数使用才有效。

    scala> mysqlDF.groupBy("level").count().show
    +-----+-----+
    |level|count|
    +-----+-----+
    | 91| 1|
    | 20| 2|
    | 4| 1|
    | 8| 1|
    | 10| 1|
    +-----+-----+

    columns 返回所有列明

    scala> mysqlDF.columns
    res150: Array[String] = Array(roleid, name, dateid, addr, phone, level)

    集成查询:agg 返回DateFrame类型

    scala> mysqlDF.groupBy("sex").agg(max("level"),min("level")).show
    +---+----------+----------+
    |sex|max(level)|min(level)|
    +---+----------+----------+
    | 1| 20| 10|
    | 0| 91| 4|
    +---+----------+----------+

    scala> roleDF.cube("sex").max("level").show
    +----+----------+
    | sex|max(level)|
    +----+----------+
    | 1| 29|
    |null| 91|
    | 0| 91|
    +----+----------+

    scala> mysqlDF.groupBy("sex").agg(max("level"),min("level")).show
    +---+----------+----------+
    |sex|max(level)|min(level)|
    +---+----------+----------+
    | 1| 20| 10|
    | 0| 91| 4|
    +---+----------+----------+

    scala> mysqlDF.agg(Map("level"->"Avg","roleid"->"Max")).show
    +----------+-----------+
    |avg(level)|max(roleid)|
    +----------+-----------+
    | 25.5| 600|
    +----------+-----------+

    ---cube与groupby相比多了一个总的分组聚合

    scala> mysqlDF.cube("sex").agg(max("level"),min("level")).show
    +----+----------+----------+
    | sex|max(level)|min(level)|
    +----+----------+----------+
    | 1| 20| 10|
    |null| 91| 4|
    | 0| 91| 4|
    +----+----------+----------+

    scala> mysqlDF.cube("sex").agg(Map("Level"->"Min","level"->"Max")).show
    +----+----------+----------+
    | sex|min(Level)|max(level)|
    +----+----------+----------+
    | 1| 10| 20|
    |null| 4| 91|
    | 0| 4| 91|
    +----+----------+----------+

    withColumnRenamed:修改列明

    scala> mysqlDF.cube("sex").agg(Map("Level"->"Min","level"->"Max")).withColumnRenamed("min(level)","minlevel").show
    +----+--------+----------+
    | sex|minlevel|max(level)|
    +----+--------+----------+
    | 1| 10| 20|
    |null| 4| 91|
    | 0| 4| 91|
    +----+--------+----------+

    scala> mysqlDF.cube("sex").agg(max("level")  as "max",min("level")  as "min").show
    +----+---+---+
    | sex|max|min|
    +----+---+---+
    | 1| 29| 10|
    |null| 91| 4|
    | 0| 91| 4|
    +----+---+---+

    explode 返回值是dataframe类型 

    将addr 按空格拆分后放在addrs中,拆分成多行

    scala> mysqlDF.explode("addr","addrs"){addr:String=>addr.split(" ")}.show
    warning: there was one deprecation warning; re-run with -deprecation for details
    +------+----+-------------------+-------------------+---+-----+---------+
    |roleid|name| dateid| addr|sex|level| addrs|
    +------+----+-------------------+-------------------+---+-----+---------+
    | 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10| henan|
    | 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10| luohe|
    | 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10| linying|
    | 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|guangdong|
    | 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20| shenzhen|
    | 110|null|2017-11-14 14:50:47| beijing| 1| 20| beijing|
    | 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8| shandong|
    | 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8| qingdao|
    | 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4| anhui|
    | 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4| hefei|
    | 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91| hunan|
    | 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91| changsha|
    +------+----+-------------------+-------------------+---+-----+---------+

    scala> mysqlDF.printSchema
    root
    |-- roleid: integer (nullable = true)
    |-- name: string (nullable = true)
    |-- dateid: timestamp (nullable = true)
    |-- addr: string (nullable = true)
    |-- sex: integer (nullable = true)
    |-- level: integer (nullable = true)

    explain 打印执行计划

    scala> mysqlDF.explain
    == Physical Plan ==
    *Scan JDBCRelation(role) [numPartitions=1] [roleid#1278,name#1279,dateid#1280,addr#1281,sex#1282,level#1283] ReadSchema: struct<roleid:int,name:string,dateid:timestamp,addr:string,sex:int,level:int>

    删除所有列的空值和NaN

    scala> mysqlDF.na.drop().show
    +------+----+------+----+---+-----+
    |roleid|name|dateid|addr|sex|level|
    +------+----+------+----+---+-----+
    +------+----+------+----+---+-----+


    scala> mysqlDF.select("roleid","level").na.drop().show
    +------+-----+
    |roleid|level|
    +------+-----+
    | 1| 10|
    | 40| 20|
    | 110| 20|
    | 200| 8|
    | 400| 4|
    | 600| 91|
    +------+-----+

    填充所有列的空值

    scala> mysqlDF.na.fill("noname").show
    +------+------+-------------------+-------------------+---+-----+
    |roleid| name| dateid| addr|sex|level|
    +------+------+-------------------+-------------------+---+-----+
    | 1|noname|2017-11-16 14:49:11|henan luohe linying| 1| 10|
    | 40|noname|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 110|noname|2017-11-14 14:50:47| beijing| 1| 20|
    | 200|noname|2017-11-14 14:49:47| shandong qingdao| 0| 8|
    | 400|noname|2017-11-15 14:49:56| anhui hefei| 0| 4|
    | 600|noname|2017-11-15 14:50:05| hunan changsha| 0| 91|
    +------+------+-------------------+-------------------+---+-----+


    scala> mysqlDF.show
    +------+----+-------------------+-------------------+---+-----+
    |roleid|name| dateid| addr|sex|level|
    +------+----+-------------------+-------------------+---+-----+
    | 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
    | 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 110|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
    | 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
    | 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
    | 650|null|2017-11-01 17:24:34| null| 1| 29|
    | 688|null| null| shanxi xiaan| 0| 55|
    +------+----+-------------------+-------------------+---+-----+


    scala> mysqlDF.na.drop(Array("addr","dateid")).show
    +------+----+-------------------+-------------------+---+-----+
    |roleid|name| dateid| addr|sex|level|
    +------+----+-------------------+-------------------+---+-----+
    | 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
    | 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 110|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
    | 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
    | 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
    +------+----+-------------------+-------------------+---+-----+

    指定列空值填充

    scala> mysqlDF.na.fill("noname",Array("name")).show
    +------+------+-------------------+-------------------+---+-----+
    |roleid| name| dateid| addr|sex|level|
    +------+------+-------------------+-------------------+---+-----+
    | 1|noname|2017-11-16 14:49:11|henan luohe linying| 1| 10|
    | 40|noname|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 110|noname|2017-11-14 14:50:47| beijing| 1| 20|
    | 200|noname|2017-11-14 14:49:47| shandong qingdao| 0| 8|
    | 400|noname|2017-11-15 14:49:56| anhui hefei| 0| 4|
    | 600|noname|2017-11-15 14:50:05| hunan changsha| 0| 91|
    | 650|noname|2017-11-01 17:24:34| null| 1| 29|
    | 688|noname| null| shanxi xiaan| 0| 55|
    +------+------+-------------------+-------------------+---+-----+


    scala> mysqlDF.na.fill(Map("name"->"noname")).show
    +------+------+-------------------+-------------------+---+-----+
    |roleid| name| dateid| addr|sex|level|
    +------+------+-------------------+-------------------+---+-----+
    | 1|noname|2017-11-16 14:49:11|henan luohe linying| 1| 10|
    | 40|noname|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 110|noname|2017-11-14 14:50:47| beijing| 1| 20|
    | 200|noname|2017-11-14 14:49:47| shandong qingdao| 0| 8|
    | 400|noname|2017-11-15 14:49:56| anhui hefei| 0| 4|
    | 600|noname|2017-11-15 14:50:05| hunan changsha| 0| 91|
    | 650|noname|2017-11-01 17:24:34| null| 1| 29|
    | 688|noname| null| shanxi xiaan| 0| 55|
    +------+------+-------------------+-------------------+---+-----+

    两个DF的join

    scala> roleDF.show
    +------+----+-------------------+-------------------+---+-----+
    |roleid|name| dateid| addr|sex|level|
    +------+----+-------------------+-------------------+---+-----+
    | 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
    | 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 110|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
    | 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
    | 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
    | 650|null|2017-11-01 17:24:34| null| 1| 29|
    | 688|null| null| shanxi xiaan| 0| 55|
    +------+----+-------------------+-------------------+---+-----+


    scala> loginDF.show
    +------+-------------------+-------+-----+
    |roleid| logindate|loginip|level|
    +------+-------------------+-------+-----+
    | 1|2017-11-16 14:49:11| null| 100|
    | 110|2017-11-15 14:49:27| null| 20|
    | 200|2017-11-14 14:49:47| null| 50|
    | 400|2017-11-15 14:49:56| null| 30|
    | 600|2017-11-15 14:50:05| null| 20|
    | 40|2017-11-13 14:50:25| null| 11|
    | 110|2017-11-14 14:50:47| null| 1|
    | 40|2017-11-14 14:51:03| null| 40|
    | 110|2017-11-16 14:51:20| null| 500|
    +------+-------------------+-------+-----+

    --多个字段关联的话可以使用Seq(下例中两个DF 关联的字段名须相同)

    scala> loginDF.join(roleDF,Seq("roleid")).show
    +------+-------------------+-------+-----+----+-------------------+-------------------+---+-----+
    |roleid| logindate|loginip|level|name| dateid| addr|sex|level|
    +------+-------------------+-------+-----+----+-------------------+-------------------+---+-----+
    | 1|2017-11-16 14:49:11| null| 100|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
    | 40|2017-11-13 14:50:25| null| 11|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 40|2017-11-14 14:51:03| null| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 400|2017-11-15 14:49:56| null| 30|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
    | 200|2017-11-14 14:49:47| null| 50|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
    | 110|2017-11-15 14:49:27| null| 20|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 110|2017-11-14 14:50:47| null| 1|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 110|2017-11-16 14:51:20| null| 500|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 600|2017-11-15 14:50:05| null| 20|null|2017-11-15 14:50:05| hunan changsha| 0| 91|

    两个DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi类型

    scala> loginDF.join(roleDF,Seq("roleid"),"inner").show
    +------+-------------------+-------+-----+----+-------------------+-------------------+---+-----+
    |roleid| logindate|loginip|level|name| dateid| addr|sex|level|
    +------+-------------------+-------+-----+----+-------------------+-------------------+---+-----+
    | 1|2017-11-16 14:49:11| null| 100|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
    | 40|2017-11-13 14:50:25| null| 11|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 40|2017-11-14 14:51:03| null| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 400|2017-11-15 14:49:56| null| 30|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
    | 200|2017-11-14 14:49:47| null| 50|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
    | 110|2017-11-15 14:49:27| null| 20|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 110|2017-11-14 14:50:47| null| 1|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 110|2017-11-16 14:51:20| null| 500|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 600|2017-11-15 14:50:05| null| 20|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
    +------+-------------------+-------+-----+----+-------------------+-------------------+---+-----+

    如果关联的字段名不同可以使用Column类型来join

    scala> loginDF.join(roleDF,loginDF("roleid")===roleDF("roleid"),"inner").show
    +------+-------------------+-------+-----+------+----+-------------------+-------------------+---+-----+
    |roleid| logindate|loginip|level|roleid|name| dateid| addr|sex|level|
    +------+-------------------+-------+-----+------+----+-------------------+-------------------+---+-----+
    | 1|2017-11-16 14:49:11| null| 100| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
    | 40|2017-11-13 14:50:25| null| 11| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 40|2017-11-14 14:51:03| null| 40| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
    | 400|2017-11-15 14:49:56| null| 30| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
    | 200|2017-11-14 14:49:47| null| 50| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
    | 110|2017-11-15 14:49:27| null| 20| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 110|2017-11-14 14:50:47| null| 1| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 110|2017-11-16 14:51:20| null| 500| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
    | 600|2017-11-15 14:50:05| null| 20| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
    +------+-------------------+-------+-----+------+----+-------------------+-------------------+---+-----+

    withColumn:往当前DataFrame中新增一列

    scala> loginDF.withColumn("levelnew",loginDF("level")+100).show
    +------+-------------------+-------+-----+--------+
    |roleid| logindate|loginip|level|levelnew|
    +------+-------------------+-------+-----+--------+
    | 1|2017-11-16 14:49:11| null| 100| 200|
    | 110|2017-11-15 14:49:27| null| 20| 120|
    | 200|2017-11-14 14:49:47| null| 50| 150|
    | 400|2017-11-15 14:49:56| null| 30| 130|
    | 600|2017-11-15 14:50:05| null| 20| 120|
    | 40|2017-11-13 14:50:25| null| 11| 111|
    | 110|2017-11-14 14:50:47| null| 1| 101|
    | 40|2017-11-14 14:51:03| null| 40| 140|
    | 110|2017-11-16 14:51:20| null| 500| 600|
    +------+-------------------+-------+-----+--------+

     intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素

    except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的

    scala> roleDF.registerTempTable("tb_name")


    scala> sqlcon.sql("select roleid,sex,level from tb_name where sex=0").show
    +------+---+-----+
    |roleid|sex|level|
    +------+---+-----+
    | 200| 0| 8|
    | 400| 0| 4|
    | 600| 0| 91|
    | 688| 0| 55|
    +------+---+-----+

    如果第一个字段为int获取该字段的值 则使用getInt(0)
    scala> roleDF.map(row=>row.getInt(0)).show
    +-----+
    |value|
    +-----+
    | 1|
    | 40|
    | 110|
    | 200|
    | 400|
    | 600|
    | 650|
    | 688|
    +-----+

    如果第一个字段是int 采用getString获取就会报错
    scala> roleDF.map(row=>row.getString(0)).show
    17/11/17 14:55:13 ERROR executor.Executor: Exception in task 0.0 in stage 126.0 (TID 2036)
    java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
    at org.apache.spark.sql.Row$class.getString(Row.scala:255)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:165)

    ........................................

    读取json 和hive 的数据

    scala> import org.apache.spark.sql.hive._
    import org.apache.spark.sql.hive._

    scala> val jsonDF=hivecon.jsonFile("/tmp/20171024/namejson.txt")
    warning: there was one deprecation warning; re-run with -deprecation for details
    jsonDF: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field]


    scala> jsonDF.registerTempTable("rolename")
    warning: there was one deprecation warning; re-run with -deprecation for details

    scala> hivecon.sql("select * from rolename where age>18 or name like 'm%'").show
    +---+---+-----+
    |age| id| name|
    +---+---+-----+
    | 19| 2| jack|
    | 17| 3|marry|
    +---+---+-----+

    scala> hivecon.sql("select * from rolename where age>18 or name like 'm%'").printSchema
    root
    |-- age: long (nullable = true)
    |-- id: long (nullable = true)
    |-- name: string (nullable = true)

    scala> hivecon.sql("show databases")
    res78: org.apache.spark.sql.DataFrame = [databaseName: string]

    scala> hivecon.sql("show databases").show
    +------------+
    |databaseName|
    +------------+
    | aaa|
    | default|
    | sparkhive|
    | userdb|
    +------------+

    在Scala 中基于case class 创建DateFrame

    scala> case class person(name:String,age:Int,grade:Int)
    defined class person

    scala> val personRDD=sc.parallelize(List(person("tianyongtao",10,8),person("xiaomage",30,5),person("xidada",50,3)))
    personRDD: org.apache.spark.rdd.RDD[person] = ParallelCollectionRDD[169] at parallelize at <console>:31

    scala> personRDD.collect
    res89: Array[person] = Array(person(tianyongtao,10,8), person(xiaomage,30,5), person(xidada,50,3))

    scala> val personDF=personRDD.toDF()
    personDF: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

    scala> personDF.show
    +-----------+---+-----+
    | name|age|grade|
    +-----------+---+-----+
    |tianyongtao| 10| 8|
    | xiaomage| 30| 5|
    | xidada| 50| 3|
    +-----------+---+-----+

    scala> personDF.registerTempTable("person")
    warning: there was one deprecation warning; re-run with -deprecation for details

    scala> hivecon.sql("select * from person").show
    +-----------+---+-----+
    | name|age|grade|
    +-----------+---+-----+
    |tianyongtao| 10| 8|
    | xiaomage| 30| 5|
    | xidada| 50| 3|
    +-----------+---+-----+

     spark UDF

    用Scala编写的UDF与普通的Scala函数没有任何区别,唯一需要多执行的一个步骤是要让SQLContext注册它

    scala> val sqlcon=new SQLContext(sc)

    scala> def len110(str:String):Int=str.length
    len110: (str: String)Int

    scala> sqlcon.udf.register("len119",len110 _)
    res6: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

    scala> sqlcon.sql("select len119('aaag')").show
    +----------------+
    |UDF:len119(aaag)|
    +----------------+
    | 4|
    +----------------+

  • 相关阅读:
    python环境搭建以及jupyter notebook的安装和启动
    Python base(三)
    Python base(二)
    Python base(一)
    python之装饰器
    python 内置函数 (二)
    函数的简单写法
    python中set的用法
    python在终端运行时增加一些效果
    序列化写到类里
  • 原文地址:https://www.cnblogs.com/playforever/p/7844232.html
Copyright © 2011-2022 走看看