zoukankan      html  css  js  c++  java
  • SparkSQL入门

    概述

    SparkSql将RDD封装成一个DataFrame对象,这个对象类似于关系型数据库中的表。

    创建DataFrame对象

    DataFrame就相当于数据库的一张表。它是个只读的表,不能在运算过程再往里加元素。

    RDD.toDF("列名")

       

    scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))

    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

     

    scala> rdd.toDF("id")

    res0: org.apache.spark.sql.DataFrame = [id: int]

     

    scala> res0.show#默认只显示20条数据

    +---+

    | id|

    +---+

    | 1|

    | 2|

    | 3|

    | 4|

    | 5|

    | 6|

    +---+

    scala> res0.printSchema #查看列的类型等属性

    root

    |-- id: integer (nullable = true)

    创建多列DataFrame对象

    DataFrame就相当于数据库的一张表。

    scala> sc.parallelize(List( (1,"beijing"),(2,"shanghai") ) )

    res3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[5] at parallelize at <console>:22

     

    scala> res3.toDF("id","name")

    res4: org.apache.spark.sql.DataFrame = [id: int, name: string]

     

    scala> res4.show

    +---+--------+

    | id| name|

    +---+--------+

    | 1| beijing|

    | 2|shanghai|

    +---+--------+

     

    例如3列的

    scala> sc.parallelize(List( (1,"beijing",100780),(2,"shanghai",560090),(3,"xi'an",600329)))

    res6: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:22

     

    scala> res6.toDF("id","name","postcode")

    res7: org.apache.spark.sql.DataFrame = [id: int, name: string, postcode: int]

     

    scala> res7.show

    +---+--------+--------+

    | id| name|postcode|

    +---+--------+--------+

    | 1| beijing| 100780|

    | 2|shanghai| 560090|

    | 3| xi'an| 600329|

    +---+--------+--------+

    可以看出,需要构建几列,tuple就有几个内容。

       

    由外部文件构造DataFrame对象

       

    1)txt文件

    txt文件不能直接转换成,先利用RDD转换为tuple。然后toDF()转换为DataFrame。

    scala> val rdd = sc.textFile("/root/words.txt")

    .map( x => (x,1) )

    .reduceByKey( (x,y) => x+y )

    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:21

     

    scala> rdd.toDF("word","count")

    res9: org.apache.spark.sql.DataFrame = [word: string, count: int]

     

    scala> res9.show

    +------+-----+

    | word|count|

    +------+-----+

    | spark| 3|

    | hive| 1|

    |hadoop| 2|

    | big| 2|

    | scla| 1|

    | data| 1|

    +------+-----+

    2)json文件

    文件代码:

    {"id":1, "name":"leo", "age":18}

    {"id":2, "name":"jack", "age":19}

    {"id":3, "name":"marry", "age":17}

       

    代码:

    import org.apache.spark.sql.SQLContext

    scala>val sqc=new SQLContext(sc)

    scala> val tb4=sqc.read.json("/home/software/people.json")

    scala> tb4.show

       

       

       

    3)jdbc读取

    实现步骤:

    1)将mysql 的驱动jar上传到spark的jars目录下

    2)重启spark服务

    3)进入spark客户端

    4)执行代码,比如在Mysql数据库下,有一个test库,在test库下有一张表为tabx

    执行代码:

    import org.apache.spark.sql.SQLContext

    scala> val sqc = new SQLContext(sc);

    scala> val prop = new java.util.Properties

    scala> prop.put("user","root")

    scala> prop.put("password","root")

    scala>val flow=sqc.read.jdbc("jdbc:mysql://hadoop01:3306/test","flow",prop)

    scala> flow.show

       

    注:如果报权限不足,则进入mysql,执行:

    grant all privileges on *.* to 'root'@'hadoop01' identified by 'root' with grant option;

    然后执行:

    flush privileges;

       

  • 相关阅读:
    牛客IOI周赛17-提高组 卷积 生成函数 多项式求逆 数列通项公式
    6.3 省选模拟赛 Decompose 动态dp 树链剖分 set
    AtCoder Grand Contest 044 A Pay to Win 贪心
    5.29 省选模拟赛 树的染色 dp 最优性优化
    luogu P6097 子集卷积 FST FWT
    CF724C Ray Tracing 扩展欧几里得 平面展开
    5.30 省选模拟赛 方格操作 扫描线 特殊性质
    5.29 省选模拟赛 波波老师 SAM 线段树 单调队列 并查集
    Spring main方法中怎么调用Dao层和Service层的方法
    Bug -- WebService报错(两个类具有相同的 XML 类型名称 "{http://webService.com/}getPriceResponse"。请使用 @XmlType.name 和 @XmlType.namespace 为类分配不同的名称。)
  • 原文地址:https://www.cnblogs.com/shuzhiwei/p/11323179.html
Copyright © 2011-2022 走看看