zoukankan      html  css  js  c++  java
  • Spark DataFrame及RDD与DataSet转换成DataFrame

    Spark DataFrame及RDD与DataSet转换成DataFrame

    一、什么是DataFrame

            DataFrame和RDD一样,也是Spark的一种弹性分布式数据集,它是一个由列组成的数据集,概念上等同于关系型数据库中的一张表。DataFrame可以从非常宽泛的数据源中的构建,比如结构化的数据文件,Hive中的表,外部数据库,或者已经创建好的RDDs等等。在Scala和Java中,DataFrame由行数据集表示。在Scala API中,DataFrame 可以简单看成DataSer[Row],而在Java API中,使用DataSet<Row>表示DataFrame。有人肯定会问,已经有了弹性分布式数据集RDD,为什么还要引入DataFrame呢?因为在Spark中,我们可以像在关系型数据库中使用SQL操作数据库表一样,使用Spark SQL操作DataFrame。这让熟悉关系型数据库SQL人员也能轻松掌握。

    二、创建DataFrame

            首先导入Spark Core、Spark SQL、和Hadoop Client,pom.xml文件如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.leboop</groupId>
        <artifactId>mahout</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <!-- scala版本号 -->
            <scala.version>2.11</scala.version>
            <!-- spark版本号 -->
            <spark.version>2.3.0</spark.version>
            <!-- hadoop版本 -->
            <hadoop.version>2.7.3</hadoop.version>
        </properties>
        <dependencies>
            <!-- spark core-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- spark sql -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- Hadoop客户端,用于操作HDFS -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
        </dependencies>
    </project>

    下面使用三种方式创建DataFrame

    itemdata.data文件数据格式如下:

    0162381440670851711,4,7.0
    0162381440670851711,11,4.0
    0162381440670851711,32,1.0
    0162381440670851711,176,27.0
    0162381440670851711,183,11.0
    0162381440670851711,184,5.0
    0162381440670851711,207,9.0
    0162381440670851711,256,3.0
    0162381440670851711,258,4.0

    第一列是user_id,第二列是item_id,第三列是score。

    1、以csv格式读取文件

    步骤:

    (1)使用SparkSesstion创建Spark SQL的切入点spark,

    (2)spark以csv格式读取HDFS文件系统/input/mahout-demo/目录下的文件itemdata.data,

    读取的结果就是是DataFrame数据集,程序如下:

    package com.leboop.rdd
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * DataFrame创建Demo
      */
    object DataFrameDemo {
      def main(args: Array[String]): Unit = {
        //创建Spark SQL的切入点(RDD的切入点是SparkContext)
        val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
        //以csv格式读取文件,直接生成DataFrame数据集
        val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
        //打印前三行
        dataDF.show(3)
      }
    }
    

    运行结果如下:

    +-------------------+---+---+
    |                _c0|_c1|_c2|
    +-------------------+---+---+
    |0162381440670851711|  4|7.0|
    |0162381440670851711| 11|4.0|
    |0162381440670851711| 32|1.0|
    +-------------------+---+---+
    only showing top 3 rows

    默认DataFrame的列名为_cn(n=0,1,2.……)。后面我们会说明如何指定列名

    2、以一般格式读取文件

            同样使用创建好的spark切入点,调用textFile函数读取文件itemdata.data,生成的是DataSet数据集,数据文件的每一行内容作为整体是DataSet的一列,首先需要map算子将数据按“,”分词,得到三个元素的数组作为整体,再次使用map将数组编程具有三列的DataSet,最后使用toDF()函数将其转换成DataFrame数据集,这里我们在toDF()函数中指明了每个列的列名。程序如下:

    package com.leboop.rdd
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * DataFrame创建Demo
      */
    object DataFrameDemo {
      def main(args: Array[String]): Unit = {
        //创建Spark SQL的切入点(RDD的切入点是SparkContext)
        val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
        //spark读取一般文件,生成的是DataSet数据集
        val dataDataSet = spark.read.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
        //使用toDF()函数将DataSet转换成DataFrame
        import spark.implicits._
        val dataDF2 = dataDataSet.map(x=>x.split(",")).map(x=>(x(0),x(1),x(2))).toDF("user_id","item_id","score")
        //打印前三行
        dataDF2.show(3)
      }
    }
    

    程序运行结果:

    +-------------------+-------+-----+
    |            user_id|item_id|score|
    +-------------------+-------+-----+
    |0162381440670851711|      4|  7.0|
    |0162381440670851711|     11|  4.0|
    |0162381440670851711|     32|  1.0|
    +-------------------+-------+-----+
    only showing top 3 rows

    3、以SparkContext作为切入点读取文件

            Spark SQL的切入点SparkSesstion封装了Spark Core的切入点SparkContext,所以SparkContext对象可以由SparkSesstion创建。SparkContext读取文件生成的是RDD数据集,转换如下:

    package com.leboop.rdd
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * DataFrame创建Demo
      */
    object DataFrameDemo {
      def main(args: Array[String]): Unit = {
        //创建Spark SQL的切入点(RDD的切入点是SparkContext)
        val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
        val rdd = spark.sparkContext.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
        val dataDF3 = rdd.map(x=>x.split(",")).map(x=>(x(0),x(1),x(2))).toDF()
        dataDF3.show(3)
      }
    }
    

    程序运行结果

    +-------------------+---+---+
    |                 _1| _2| _3|
    +-------------------+---+---+
    |0162381440670851711|  4|7.0|
    |0162381440670851711| 11|4.0|
    |0162381440670851711| 32|1.0|
    +-------------------+---+---+
    only showing top 3 rows

    DataFrame的列名默认为_n(n=1,2,3,……)。

    4、指定列名

    通过定义case class Data实体类,为DataFrame每个列指定列名,代码如下:

    package com.leboop.rdd
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * DataFrame创建Demo
      */
    case class Data(user_id:String,item_id:String,score:Double)
    object DataFrameDemo {
      def main(args: Array[String]): Unit = {
        //创建Spark SQL的切入点(RDD的切入点是SparkContext)
        val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
        //以csv格式读取文件,直接生成DataFrame数据集
        val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score")
        //打印前三行
        dataDF.show(3)
    
        //spark读取一般文件,生成的是DataSet数据集
        val dataDataSet = spark.read.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
        //使用toDF()函数将DataSet转换成DataFrame
        import spark.implicits._
        val dataDF2 = dataDataSet.map(x=>x.split(",")).map(x=>Data(x(0),x(1),x(2).toDouble)).toDF()
        //打印前三行
        dataDF2.show(3)
    
        val rdd = spark.sparkContext.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
        val dataDF3 = rdd.map(x=>x.split(",")).map(x=>Data(x(0),x(1),x(2).toDouble)).toDF()
        dataDF3.show(3)
      }
    }
    

    三种方式程序运行结果都是:

    +-------------------+-------+-----+
    |            user_id|item_id|score|
    +-------------------+-------+-----+
    |0162381440670851711|      4|  7.0|
    |0162381440670851711|     11|  4.0|
    |0162381440670851711|     32|  1.0|
    +-------------------+-------+-----+
    only showing top 3 rows

    注:关于切入点的问题,其实需要搞清楚Spark的组成,它主要有五个部分组成:

    (1)Spark Core

    Spark核心部分,操作RDD数据集

    (2)Spark SQL

    像关系型数据一样,使用SQL操作DataFrame数据集

    (3)Spark Streaming

    实时数据流处理

    (4)Spark MLlib

    机器学习算法库

    (5)GraphX

    图计算

    三、Spark SQL操作DataFrame

    1、创建临时视图

    基本步骤:

    (1)创建DataFrame

    (2)由DataFrame创建临时视图

    (3)写Spark SQL操作临时视图

    代码如下:

    package com.leboop.rdd
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * DataFrame创建Demo
      */
    case class Data(user_id:String,item_id:String,score:Double)
    object DataFrameDemo {
      def main(args: Array[String]): Unit = {
        //创建Spark SQL的切入点(RDD的切入点是SparkContext)
        val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
        //以csv格式读取文件,直接生成DataFrame数据集
        val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score")
        //创建临时视图
        dataDF.createOrReplaceTempView("data")
        //sql查询
        spark.sql("select * from data where score>10").show(3)
      }
    }
    

    我们创建DataFrame后,创建了一个名为data的临时视图(SparkSession关闭后,临时视图立即会失效),然后写了Spark SQL查询评分大于10分的所有行,并打印前3行。运行结果如下:

    +-------------------+-------+-----+
    |            user_id|item_id|score|
    +-------------------+-------+-----+
    |0162381440670851711|    176| 27.0|
    |0162381440670851711|    183| 11.0|
    |0162381440670851711|    259| 16.0|
    +-------------------+-------+-----+
    only showing top 3 rows

    2、创建全局临时视图

    package com.leboop.rdd
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * DataFrame创建Demo
      */
    case class Data(user_id:String,item_id:String,score:Double)
    object DataFrameDemo {
      def main(args: Array[String]): Unit = {
        //创建Spark SQL的切入点(RDD的切入点是SparkContext)
        val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
        //以csv格式读取文件,直接生成DataFrame数据集
        val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score")
        //创建全局临时视图
        dataDF.createGlobalTempView("data")
        //sql查询
        spark.sql("select * from global_temp.data where score>10").show(3)
      }
    }
    

    首先我们创建了一张全局临时视图,然后使用Spark SQL查询视图。注意全局临时视图存放在系统隐藏的数据库global_temp中,访问data临时视图时需要使用global_temp.data。全局临时视图,在一个Spark Sesstion关闭后,其他的Spark Sesstion可以继续使用,知道Spark应用被关闭。

    程序运行结果

    +-------------------+-------+-----+
    |            user_id|item_id|score|
    +-------------------+-------+-----+
    |0162381440670851711|    176| 27.0|
    |0162381440670851711|    183| 11.0|
    |0162381440670851711|    259| 16.0|
    +-------------------+-------+-----+
    only showing top 3 rows

    3、不创建临时视图

    如下:

    package com.leboop.rdd
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * DataFrame创建Demo
      */
    case class Data(user_id:String,item_id:String,score:Double)
    object DataFrameDemo {
      def main(args: Array[String]): Unit = {
    
        //创建Spark SQL的切入点(RDD的切入点是SparkContext)
        val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
        //以csv格式读取文件,直接生成DataFrame数据集
        val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score")
    
        //打印dataDF的数据结构
        dataDF.printSchema()
        //查询user_id列
        dataDF.select("user_id","score").show(3)
        //过滤出评分大于10的所有数据
        dataDF.filter("score>10").show(3)
        //查询user_id,score,score加上10 起别名score
        dataDF.selectExpr("user_id","score","score+10 as score").show(3)
        //使用$符号查询,需要隐士转换
        import spark.implicits._
        dataDF.select($"user_id",$"score",$"score"+10).show(3)
      }
    }
    

    程序运行结果如下:

    root
     |-- user_id: string (nullable = true)
     |-- item_id: string (nullable = true)
     |-- score: string (nullable = true)
    
    
    
    +-------------------+-----+
    |            user_id|score|
    +-------------------+-----+
    |0162381440670851711|  7.0|
    |0162381440670851711|  4.0|
    |0162381440670851711|  1.0|
    +-------------------+-----+
    only showing top 3 rows
    
    
    
    +-------------------+-------+-----+
    |            user_id|item_id|score|
    +-------------------+-------+-----+
    |0162381440670851711|    176| 27.0|
    |0162381440670851711|    183| 11.0|
    |0162381440670851711|    259| 16.0|
    +-------------------+-------+-----+
    only showing top 3 rows
    
    
    
    
    +-------------------+-----+-----+
    |            user_id|score|score|
    +-------------------+-----+-----+
    |0162381440670851711|  7.0| 17.0|
    |0162381440670851711|  4.0| 14.0|
    |0162381440670851711|  1.0| 11.0|
    +-------------------+-----+-----+
    only showing top 3 rows
    
    
    
    
    +-------------------+-----+------------+
    |            user_id|score|(score + 10)|
    +-------------------+-----+------------+
    |0162381440670851711|  7.0|        17.0|
    |0162381440670851711|  4.0|        14.0|
    |0162381440670851711|  1.0|        11.0|
    +-------------------+-----+------------+
    only showing top 3 rows
  • 相关阅读:
    进入正在运行的Docker的asp.net core容器
    EF 更新记录发现外键更改但更新又跳回以前值
    远程获取http数据和提交数据
    C# 32位16进制加密
    netcore命令行运行程序
    MD5加密32位16进制
    C# MD5加密32位16进制有时少一位问题
    netcoreMVC中使用Vue模板分页封装(不适合数据量大)
    Vue组件间传值 和 访问
    jenkins部署安装
  • 原文地址:https://www.cnblogs.com/leboop/p/9458832.html
Copyright © 2011-2022 走看看