zoukankan      html  css  js  c++  java
  • SparkSQL入门

    概述

    SparkSql将RDD封装成一个DataFrame对象,这个对象类似于关系型数据库中的表。

    创建DataFrame对象

    DataFrame就相当于数据库的一张表。它是个只读的表,不能在运算过程再往里加元素。

    RDD.toDF("列名")

       

    scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))

    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

     

    scala> rdd.toDF("id")

    res0: org.apache.spark.sql.DataFrame = [id: int]

     

    scala> res0.show#默认只显示20条数据

    +---+

    | id|

    +---+

    | 1|

    | 2|

    | 3|

    | 4|

    | 5|

    | 6|

    +---+

    scala> res0.printSchema #查看列的类型等属性

    root

    |-- id: integer (nullable = true)

    创建多列DataFrame对象

    DataFrame就相当于数据库的一张表。

    scala> sc.parallelize(List( (1,"beijing"),(2,"shanghai") ) )

    res3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[5] at parallelize at <console>:22

     

    scala> res3.toDF("id","name")

    res4: org.apache.spark.sql.DataFrame = [id: int, name: string]

     

    scala> res4.show

    +---+--------+

    | id| name|

    +---+--------+

    | 1| beijing|

    | 2|shanghai|

    +---+--------+

     

    例如3列的

    scala> sc.parallelize(List( (1,"beijing",100780),(2,"shanghai",560090),(3,"xi'an",600329)))

    res6: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:22

     

    scala> res6.toDF("id","name","postcode")

    res7: org.apache.spark.sql.DataFrame = [id: int, name: string, postcode: int]

     

    scala> res7.show

    +---+--------+--------+

    | id| name|postcode|

    +---+--------+--------+

    | 1| beijing| 100780|

    | 2|shanghai| 560090|

    | 3| xi'an| 600329|

    +---+--------+--------+

    可以看出,需要构建几列,tuple就有几个内容。

       

    由外部文件构造DataFrame对象

       

    1)txt文件

    txt文件不能直接转换成,先利用RDD转换为tuple。然后toDF()转换为DataFrame。

    scala> val rdd = sc.textFile("/root/words.txt")

    .map( x => (x,1) )

    .reduceByKey( (x,y) => x+y )

    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:21

     

    scala> rdd.toDF("word","count")

    res9: org.apache.spark.sql.DataFrame = [word: string, count: int]

     

    scala> res9.show

    +------+-----+

    | word|count|

    +------+-----+

    | spark| 3|

    | hive| 1|

    |hadoop| 2|

    | big| 2|

    | scla| 1|

    | data| 1|

    +------+-----+

    2)json文件

    文件代码:

    {"id":1, "name":"leo", "age":18}

    {"id":2, "name":"jack", "age":19}

    {"id":3, "name":"marry", "age":17}

       

    代码:

    import org.apache.spark.sql.SQLContext

    scala>val sqc=new SQLContext(sc)

    scala> val tb4=sqc.read.json("/home/software/people.json")

    scala> tb4.show

       

       

       

    3)jdbc读取

    实现步骤:

    1)将mysql 的驱动jar上传到spark的jars目录下

    2)重启spark服务

    3)进入spark客户端

    4)执行代码,比如在Mysql数据库下,有一个test库,在test库下有一张表为tabx

    执行代码:

    import org.apache.spark.sql.SQLContext

    scala> val sqc = new SQLContext(sc);

    scala> val prop = new java.util.Properties

    scala> prop.put("user","root")

    scala> prop.put("password","root")

    scala>val flow=sqc.read.jdbc("jdbc:mysql://hadoop01:3306/test","flow",prop)

    scala> flow.show

       

    注:如果报权限不足,则进入mysql,执行:

    grant all privileges on *.* to 'root'@'hadoop01' identified by 'root' with grant option;

    然后执行:

    flush privileges;

       

  • 相关阅读:
    SNAT的作用是什么
    Maven命名规范收集
    Linux下Git命令中文显示乱码的问题解决:274232350256256346200273347273223
    HttpClient中文乱码问题排查
    Ubuntu 16.04通过NetworkManager(GUI)配置网桥
    HTML5 Video P2P技术研究(转)
    CentOS 6.9下KVM虚拟机快照创建、删除、恢复(转)
    CentOS 6.9下KVM虚拟机通过virt-clone克隆虚拟机(转)
    开源规则引擎 drools
    评估系统负载
  • 原文地址:https://www.cnblogs.com/shuzhiwei/p/11323179.html
Copyright © 2011-2022 走看看