zoukankan      html  css  js  c++  java
  • Spark-Dataframe(SQL)

    Spark-Dataframe创建-读取json文件

    jsData.js 数据

    {"name":"json","age":23,"hobby":"running"}
    {"name":"charles","age":32,"hobby":"basketball"}
    {"name":"tom","age":28,"hobby":"football"}
    {"name":"lili","age":24,"hobby":"running"}
    {"name":"bob","age":20,"hobby":"swimming"}
    
    //    创建spark对象
    val spark = SparkSession.builder().getOrCreate()
    
    //    读取 js文件来创建 DF
    val df = spark.read.json("D:/jsData.js")
    
    //    查看信息
    df.show()
    
    //    查看表结构
    df.printSchema()
    
    //    选择多列 ,并且对age+1
    df.select(df("name"), df("age") + 1).show()
    
    //    将name列重命名为username
    df.select(df("name").as("username"), df("age")).show()
    
    //    筛选age大于25的数据
    df.filter(df("age") > 25).show()
    
    //    分组统计age数量
    df.groupBy("age").count().show()
    
    //    降序排序age
    df.sort(df("age").desc).show()
    
    //    多列排序
    df.sort(df("age").desc, df("name").desc).show()
    
    +---+----------+-------+
    |age|     hobby|   name|
    +---+----------+-------+
    | 23|   running|   json|
    | 32|basketball|charles|
    | 28|  football|    tom|
    | 24|   running|   lili|
    | 20|  swimming|    bob|
    +---+----------+-------+
    
    root
     |-- age: long (nullable = true)
     |-- hobby: string (nullable = true)
     |-- name: string (nullable = true)
    
    +-------+---------+
    |   name|(age + 1)|
    +-------+---------+
    |   json|       24|
    |charles|       33|
    |    tom|       29|
    |   lili|       25|
    |    bob|       21|
    +-------+---------+
    
    +--------+---+
    |username|age|
    +--------+---+
    |    json| 23|
    | charles| 32|
    |     tom| 28|
    |    lili| 24|
    |     bob| 20|
    +--------+---+
    
    +---+----------+-------+
    |age|     hobby|   name|
    +---+----------+-------+
    | 32|basketball|charles|
    | 28|  football|    tom|
    +---+----------+-------+
    
    +---+-----+
    |age|count|
    +---+-----+
    | 32|    1|
    | 28|    1|
    | 23|    1|
    | 20|    1|
    | 24|    1|
    +---+-----+
    
    +---+----------+-------+
    |age|     hobby|   name|
    +---+----------+-------+
    | 32|basketball|charles|
    | 28|  football|    tom|
    | 24|   running|   lili|
    | 23|   running|   json|
    | 20|  swimming|    bob|
    +---+----------+-------+
    
    +---+----------+-------+
    |age|     hobby|   name|
    +---+----------+-------+
    | 32|basketball|charles|
    | 28|  football|    tom|
    | 24|   running|   lili|
    | 23|   running|   json|
    | 20|  swimming|    bob|
    +---+----------+-------+
    
    Spark-Dataframe创建-Rdd转Dataframe

    数据

    历史 4
    java 2
    C 8
    C++ 1
    python 3
    PHP 5
    
    import org.apache.spark.sql.{Row, SparkSession}
    
    
    //创建 case class 用于包装每一行数据
    case class Thing(name: String, num: Int)
    
    // 创建 spark session 会话
    val spark = SparkSession
    .builder()
    .appName("SparkSessionT")
    .master("local[1]")
    .getOrCreate()
    
    // 包装每一行数据
    val rowRdd = spark.sparkContext
    .textFile("D:/data.txt")
    .map(_.split(" "))
    .map(words => Thing(words(0), words(1).trim.toInt)) //将数据包装入case class 里面
    
    //导入 spark session 对象的隐式转换
    import spark.implicits._
    
    // 将包装后的数据转换成 Dataframe
    val rddDF = rowRdd.toDF()
    
    // 创建临时表
    rddDF.createOrReplaceTempView("table")
    
    // 查看所有数据
    rddDF.show()
    
    // 选择数据并保存为特定格式的数据
    //    rddDF.select("name","num").write.format("csv").save("D:/sava.csv")
    
    //查看表结构
    rddDF.printSchema()
    
    //结束spark 会话进程
    spark.stop()
    
    +------+---+
    |  name|num|
    +------+---+
    | scala|  4|
    |  java|  2|
    |     C|  8|
    |   C++|  1|
    |python|  3|
    |   PHP|  5|
    +------+---+
    
    root
     |-- name: string (nullable = true)
     |-- num: integer (nullable = false)
    
    Spark-Dataframe创建-加载到SparkSession
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.sql.{Row, SparkSession}
    
    
    // 创建 spark session会话
    val spark = SparkSession
    .builder()
    .appName("Dataframe")
    .master("local")
    .getOrCreate()
    
    // 包装处理行数据
    val rowRdd = spark.sparkContext
    .textFile("D:/data.txt")
    .map(_.split(" ")) // 分割每一列的数据
    .map(words => Row(words(0), words(1))) // 包装每一行的数据为 Row
    
    val fieldName = "name,num"
    //定义列指定的结构
    val fields = fieldName.split(",")
    .map(words => StructField(words, StringType, true)) //设置字段结构
    val columnFiled = StructType(fields) //创建列字段
    
    spark.createDataFrame(rowRdd, columnFiled) //加载行rdd数据,列字段
    .createOrReplaceTempView("thing") //创建 thing 表
    
    // 使用SQL语句 查询数据
    spark.sql("select * from thing").show()
    
    //关闭 spark session 会话
    spark.stop()
    
    +------+---+
    |  name|num|
    +------+---+
    | scala|  4|
    |  java|  2|
    |     C|  8|
    |   C++|  1|
    |python|  3|
    |   PHP|  5|
    +------+---+
    
    Spark-DataFrame数据读取和保存
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{Row, SparkSession}
    
    val conf = new SparkConf()
    .setAppName("hotel")
    .setMaster("local[*]")
    val sc = new SparkContext(conf)
    
    val spark = SparkSession
    .builder()
    .appName("SparkSessionT")
    .master("local[1]")
    .getOrCreate()
    
    //读取 json 的数据
    val df = spark.read.format("json").load("D:/jsData.js")
    
    // 选取并保存为 csv 格式的数据
    df.select("hobby", "name", "age").write
    .format("csv").save("D:/js_to_csv") //csv 文件保存
    df.write.parquet("D:/json_to_parquet.parquet") // parquest 文件保存
    
    //读取 parquet 文件的信息
    spark.read.parquet("D:/json_to_parquet.parquet")
    .createOrReplaceTempView("parquetTable")
    
    spark.sql("select name,age from parquetTable").show()
    spark.sql("select * from parquetTable").foreach(row => println(row(0), row(1), row(2)))
    
    // sc 可以一次性读取目录下的多个文件的内容组合起来
    sc.textFile("D:/js_to_csv").foreach(println)
    
    +-------+---+
    |   name|age|
    +-------+---+
    |   json| 23|
    |charles| 32|
    |    tom| 28|
    |   lili| 24|
    |    bob| 20|
    +-------+---+
    
    (23,running,json)
    (32,basketball,charles)
    (28,football,tom)
    (24,running,lili)
    (20,swimming,bob)
    
    running,lili,24
    swimming,bob,20
    running,json,23
    basketball,charles,32
    football,tom,28
    
    Spark-Dataframe-Mysql操作
    import java.util.Properties
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.sql.{Row, SparkSession}
    
    
    val spark = SparkSession
    .builder()
    .appName("SparkSessionT")
    .master("local[1]")
    .getOrCreate()
    
    val jdbcDF = spark.read.format("jdbc") //利用jdbc读取MySQL数据库的数据
    .option("url", "jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC") //连接的URL
    .option("driver", "com.mysql.jdbc.Driver") //连接的驱动
    .option("dbtable", "apps") //获取的 数据表
    .option("user", "root") //用户名
    .option("password", "123456") //密码
    .load() //登录
    
    // 查看 表的所有数据
    jdbcDF.show()
    
    //插入数据到MySQL(需要将数据转换为Dataframe)
    val dataRdd = spark.sparkContext
    .parallelize(Array("微信 https://wx.qq.com/ CN", "京东 https://www.jd.com/ CN")) // 处理插入的数据
    .map(_.split(" "))
    .map(words => Row(words(0), words(1), words(2))) //包装为行数据
    
    //定义列字段和类型
    val fields = StructType(List(StructField("app_name", StringType), StructField("url", StringType), StructField("country", StringType)))
    
    //绑定 数据和字段创建 DF
    val rddDF = spark.createDataFrame(dataRdd, fields)
    
    //包装登录信息
    val info = new Properties()
    info.put("user", "root")
    info.put("password", "123456")
    info.put("driver", "com.mysql.jdbc.Driver")
    
    //以追加的方式向 apps 表中插入数据
    rddDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC", "testdb.apps", info)
    
    // 查看 表的所有数据
    jdbcDF.show()
    
    +---+--------+--------------------+-------+
    | id|app_name|                 url|country|
    +---+--------+--------------------+-------+
    |  1|  QQ APP|   http://im.qq.com/|     CN|
    |  2|  微博 APP|   http://weibo.com/|     CN|
    |  3|  淘宝 APP|https://www.taoba...|     CN|
    +---+--------+--------------------+-------+
    
    +---+--------+--------------------+-------+
    | id|app_name|                 url|country|
    +---+--------+--------------------+-------+
    |  1|  QQ APP|   http://im.qq.com/|     CN|
    |  2|  微博 APP|   http://weibo.com/|     CN|
    |  3|  淘宝 APP|https://www.taoba...|     CN|
    | 10|      微信|  https://wx.qq.com/|     CN|
    | 11|      京东| https://www.jd.com/|     CN|
    +---+--------+--------------------+-------+
    
  • 相关阅读:
    【推荐】微服务分布式企业框架Springmvc+mybatis+shiro+Dubbo+ZooKeeper+Redis
    将WCF寄宿在托管的Windows服务中
    剑指Offer
    剑指Offer
    Entity Framework 无法对没有主键的视图映射实体的解决办法
    设置Sql Agent运行Job时的执行账户
    IE中Keep-Alive机制引起的错误
    Ajax错误 “SCRIPT7002: XMLHttpRequest: 网络错误 0x2ef3, 由于出现错误 00002ef3 而导致此项操作无法完成” 的归纳总结
    在IIS站点中Adomd.net集成认证账号问题
    出现“不能执行已释放的Script代码”错误的原因及解决办法
  • 原文地址:https://www.cnblogs.com/studyNotesSL/p/11341328.html
Copyright © 2011-2022 走看看