zoukankan      html  css  js  c++  java
  • Spark DataFrame基础

    Spark创建DataFrame的不同方式

    本文介绍了使用Scala示例在Spark中创建DataFrame(createDataFrame)的不同方法。

    首先,让我们导入Spark需要的隐式函数,如.toDF()函数,并为示例创建数据。

    import spark.implicits._
    val columns = Seq("language", "users_count")
    val data = Seq(("Java", "20000"), ("Python", "10000"), ("Scala", "30000"))
    

    1. Create Spark DataFrame from RDD

    首先,调用SparkContext中的parallelize()函数从集合Seq创建RDD。对于下面的所有示例,都需要这个rdd对象。

    val rdd = spark.SparkContext.parallelize(data)
    

    1. a) 使用toDF()函数

    一旦创建了一个RDD,可以使用toDF()来创建一个DataFrame。默认情况下,假如数据集每一行有两列,创建的DF时候的列名就是"_1"和"_2"。

    val dfFromRDD1 = rdd.toDF()
    dfFromRDD1.printSchema()
    
    root
    |-- _1: string (nullable = true)
    |-- _2: string (nullable = true)
    

    toDF()具有另一个签名,该签名自定义列名称参数,如下所示:

    val dfFromRDD1 = rdd.toDF("language", "users_count")
    dfFromRDD1.printSchema()
    
    root
    |-- language: string (nullable = true)
    |-- users: string (nullable = true)
    

    默认情况下,这些列的数据类型是通过推断列的数据类型来判断的。我们可以通过提供模式来更改此行为,我们可以在其中为每个字段/列指定列名,数据类型和可为空。

    1.b) 使用SparkSession的creatDataFrame()函数

    使用SparkSession中的createDataFrame()是另一种创建方法,它以rdd对象作为参数。使用toDF()来指定列的名称。

    dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)
    

    1.c)对行类型使用createDataFrame()

    createDataFrame()有另一个签名,它将列名的RDD[Row]类型和模式作为参数。首先,我们需要将rdd对象从RDD[T]转换为RDD[Row]类型。

    val schema = StructType(columns.map(fieldName => StructField(fieldName, StringType, nullable = true)))
    val rowRDD = rdd.map(attributes => Row(attributes._1, attributs._2))
    val dfFromRDD3 = spark.createDataFrame(rowRdd.schema)
    

    2. 从List和Seq集合中创建Spark DataFrame

    在本节中,我们将看到从集合Seq[T]或List[T]创建Spark DataFrame的几种方法。这些示例与我们上面的RDD部分看到的类型,但是我们使用的是数据对象而不是RDD对象。

    2.a) List或者Seq使用toDF()

    val dfFromData1 = data.toDF()
    

    2.b) 使用SparkSession的createDataFrame()方法

    var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)
    

    2.c) 使用Row type的createDataFrame()方法

    import scala.collection.JavaConversions._
    val rowData = data.map(attributes => Row(attributes._1, attributes._2))
    var dfFromData3 = spark.createDataFrame(rowData, schema)
    

    3. 从CSV文件创建Spark DataFrame

    val df2 = spark.read.csv("/src/resources/file.csv")
    

    4. 从text文件创建

    val df2 = spark.read.text("/src/resources/file.txt")
    

    5. 从JSON文件创建

    val df2 = spark.read.json("/src/resources/file.json")
    

    6. 从XML文件创建

    从xml解析DataFrame,我们应该使用数据源:com.databricks.spark.xml

    <dependency>
         <groupId>com.databricks</groupId>
         <artifactId>spark-xml_2.11</artifactId>
         <version>0.6.0</version>
     </dependency>
    
    val df = spark.read.format("com.databricks.spark.xml")
            .option("rowTag", "person")
            .xml("src/main/resources/persons.xml")
    

    7. 从Hive创建

    val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
    val hiveDF = hiveContext.sql("select * from emp")
    

    8. 从RDBMS创建

    8.a) Mysql table

    确保在pom.xml文件或类路径中的MySQL jars中都具有Mysql库作为依赖项

    val df_mysql = spark.read.format("jdbc")
        .option("url", "jdbc:mysql://localhost:port/db")
        .option("driver", "com.mysql.jdbc.Driver")
        .option("dbtable", "tablename")
        .option("user", "user")
        .option("password", "password")
        .load()
    

    8.b) DB2

    确保在pom.xml文件或类路径中的DB2 jar中将DB2库作为依赖项。

    val df_db2 = spark.read.format(“jdbc”)
       .option(“url”, “jdbc:db2://localhost:50000/dbname”)
       .option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
       .option(“dbtable”, “tablename”) 
       .option(“user”, “user”) 
       .option(“password”, “password”) 
       .load()
    

    9. 从HBase创建DataFrame

    要从HBase表创建Spark DataFrame,我们应该使用Spark HBase连接器中定义的数据源。

     val hbaseDF = sparkSession.read
          .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
          .format("org.apache.spark.sql.execution.datasources.hbase")
          .load()
    
  • 相关阅读:
    数组与字符串的相互转换
    数组新增,修改json数据
    百度Ueditor设置图片自动压缩
    微信小程序——自定义图标组件
    微信小程序——自定义导航栏
    微信小程序——网盘图片预览
    微信小程序——星星评分
    微信小程序——页面中调用组件方法
    Vue路由获取路由参数
    C#随机颜色和随机字母
  • 原文地址:https://www.cnblogs.com/xiagnming/p/12494577.html
Copyright © 2011-2022 走看看