zoukankan      html  css  js  c++  java
  • 2.3、操纵json、parquet、jdbc数据库

    1、启动spark shell

    spark-shell --master spark://s101:7077

    2、选择数据库

    spark.sql("use spark").show

    3、生成DF

    scala> val df = spark.sql("select * from orders")
    df: org.apache.spark.sql.DataFrame = [oid: int, num: string ... 2 more fields]

    4、打印数据

    scala> df.show
    +---+-----+-----+---+
    |oid|  num|price|cid|
    +---+-----+-----+---+
    |  1|no001| 12.3|  7|
    |  2|no002| 18.8|  4|
    +---+-----+-----+---+

    5.1、输出为json:切为local模式

    df.write.json("file:///home/centos/myspark/json")

         读取json文件

    scala> val dfRead = spark.read.json("file:///home/centos/myspark/json")
    dfRead: org.apache.spark.sql.DataFrame = [cid: bigint, num: string ... 2 more fields]
    
    scala> dfRead.show
    +---+-----+---+-----+
    |cid|  num|oid|price|
    +---+-----+---+-----+
    |  7|no001|  1| 12.3|
    |  4|no002|  2| 18.8|
    |  3|no003|  3| 20.0|
    |  7|no004|  4| 50.0|
    |  2|no005|  5| 23.1|
    |  3|no006|  6| 39.0|
    |  2|no007|  7|  5.0|
    |  1|no008|  8|  6.0|
    +---+-----+---+-----+

    5.2、保存为parquet格式

    df.write.parquet("file:///home/centos/myspark/par")

      读取parquet文件

    scala> val dfReadPar = spark.read.parquet("file:///home/centos/myspark/par")
    dfReadPar: org.apache.spark.sql.DataFrame = [oid: int, num: string ... 2 more fields]
    
    scala> dfReadPar.show
    +---+-----+-----+---+
    |oid|  num|price|cid|
    +---+-----+-----+---+
    |  1|no001| 12.3|  7|
    |  2|no002| 18.8|  4|
    |  3|no003| 20.0|  3|
    |  4|no004| 50.0|  7|
    |  5|no005| 23.1|  2|
    |  6|no006| 39.0|  3|
    |  7|no007|  5.0|  2|
    |  8|no008|  6.0|  1|
    +---+-----+-----+---+

    5.3、操纵jdbc数据库

    //读取表数据
    scala> val prop = new java.util.Properties() prop: java.util.Properties = {} scala> prop.put("driver" , "com.mysql.jdbc.Driver") res5: Object = null scala> prop.put("user" , "root") res6: Object = null scala> prop.put("password" , "root") res7: Object = null scala> val df = spark.read.jdbc("jdbc:mysql://s101:3306/lx" , "wc1" ,prop)

      scala> df.show

      写入到i表

    scala> val dfRead = spark.read.json("file:///home/centos/myspark/json")
    dfRead: org.apache.spark.sql.DataFrame = [cid: bigint, num: string ... 2 more fields]
    
    scala> dfRead.write.
    bucketBy   format       jdbc   mode     options   parquet       save          sortBy
    csv        insertInto   json   option   orc       partitionBy   saveAsTable   text
    
    scala> dfRead.write.jdbc
       def jdbc(url: String,table: String,connectionProperties: java.util.Properties): Unit
    
    scala> dfRead.write.jdbc("jdbc:mysql://s101:3306/lx" , "wuyong" ,prop)

      查看

    mysql> select * from wuyong;
    +------+-------+------+-------+
    | cid  | num   | oid  | price |
    +------+-------+------+-------+
    |    7 | no001 |    1 |  12.3 |
    |    4 | no002 |    2 |  18.8 |
    |    3 | no003 |    3 |    20 |
    |    7 | no004 |    4 |    50 |
    |    2 | no005 |    5 |  23.1 |
    |    3 | no006 |    6 |    39 |
    |    2 | no007 |    7 |     5 |
    |    1 | no008 |    8 |     6 |
    +------+-------+------+-------+
    渐变 --> 突变
  • 相关阅读:
    一次select一个socket的测试结果
    一次select多个socket的测试结果
    NFS服务器的工作原理
    静态局部变量
    openstack controller ha测试环境搭建记录(十一)——配置neutron(网络节点)
    openstack controller ha测试环境搭建记录(十)——配置neutron(控制节点)
    openstack controller ha测试环境搭建记录(九)——配置nova(计算节点)
    openstack controller ha测试环境搭建记录(八)——配置nova(控制节点)
    openstack controller ha测试环境搭建记录(七)——配置glance
    openstack controller ha测试环境搭建记录(六)——配置keystone
  • 原文地址:https://www.cnblogs.com/lybpy/p/9832225.html
Copyright © 2011-2022 走看看