zoukankan      html  css  js  c++  java
  • Spark DataSet和RDD与DataFrame转换成DataSet

    一、什么是DataSet

            DataSet同RDD和DataFrame一样,也是Spark的一种弹性分布式数据集。它是Spark 1.6增加的新接口。我们可以从JVM的对象构造一个DataSet,然后使用map,flatMap,filter等等这样的函数式变换操作它。

    二、创建DataSet

            首先需要导入Spark Core、Spark SQL、Hadoop Client依赖包。pox.xml文件如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.leboop</groupId>
        <artifactId>mahout</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <!-- scala版本号 -->
            <scala.version>2.11</scala.version>
            <!-- spark版本号 -->
            <spark.version>2.3.0</spark.version>
            <!-- hadoop版本 -->
            <hadoop.version>2.7.3</hadoop.version>
        </properties>
        <dependencies>
            <!-- spark -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- spark sql -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
        </dependencies>
    </project>

     

    1、Seq序列生成DataSet

    package com.leboop.rdd
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * Created by LiuErBao on 2018/8/10.
      */
    
    object DataSetDemo {
    
      case class Person(name: String, age: Int)
    
      def main(args: Array[String]): Unit = {
        //创建Spark SQL的切入点(RDD的切入点是SparkContext)
        val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
        //使用toDS()函数需要导入隐士转换的包
        import spark.implicits._
        val caseClassDS = Seq(Person("Andy", 20), Person("Tom", 30)).toDS()
        caseClassDS.show(3)
    
        //创建DataSet
        val primitiveDS = Seq(1, 2, 3).toDS()
        //通过map操作生成新的DataSet
        val newDS = primitiveDS.map(_ + 1)
        //打印前三行
        newDS.show(3)
      }
    }
    

    程序运行结果

    +----+---+
    |name|age|
    +----+---+
    |Andy| 20|
    | Tom| 30|
    +----+---+
    
    
    +-----+
    |value|
    +-----+
    |    2|
    |    3|
    |    4|
    +-----+

    2、RDD转换成DataSet

    itemdata.data数据文件格式如下:

    0162381440670851711,4,7.0
    0162381440670851711,11,4.0
    0162381440670851711,32,1.0
    0162381440670851711,176,27.0
    0162381440670851711,183,11.0
    0162381440670851711,184,5.0
    0162381440670851711,207,9.0
    0162381440670851711,256,3.0
    0162381440670851711,258,4.0
    0162381440670851711,259,16.0
    0162381440670851711,260,8.0
    0162381440670851711,261,18.0
    package com.leboop.rdd
    
    import org.apache.spark.sql.SparkSession
    
    
    object DataSetDemo {
    
      case class Data(user_id: String,item_id:String, score: Double)
    
      def main(args: Array[String]): Unit = {
        //创建Spark SQL的切入点(RDD的切入点是SparkContext)
        val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
        //创建RDD
        val rdd = spark.sparkContext.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
        //RDD转换成DataSet
        import spark.implicits._
        val splitRDD=rdd.map(x=>x.split(","))
        val dataDS= splitRDD.map(x=>Data(x(0),x(1),x(2).toDouble)).toDS()
        dataDS.show(3)
      }
    }
    

    先定义了一个case class Data,存储转换后的数据。使用toDS(),需要导入隐士转换需要的包。

    程序运行结果

    +-------------------+-------+-----+
    |            user_id|item_id|score|
    +-------------------+-------+-----+
    |0162381440670851711|      4|  7.0|
    |0162381440670851711|     11|  4.0|
    |0162381440670851711|     32|  1.0|
    +-------------------+-------+-----+
    only showing top 3 rows

    3、DataFrame转换成DataSet

    package com.leboop.rdd
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * Created by LiuErBao on 2018/8/10.
      */
    
    object DataSetDemo {
    
      case class Data(user_id: String,item_id:String, score: Double)
    
      def main(args: Array[String]): Unit = {
        //创建Spark SQL的切入点(RDD的切入点是SparkContext)
        val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
        //创建DataFrame
        val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
        import spark.implicits._
        val dataDS= dataDF.map(x=>Data(x(0).toString,x(1).toString,x(2).toString.toDouble))
        dataDS.show(2)
      }
    }
    

    运行结果如下:

    +-------------------+-------+-----+
    |            user_id|item_id|score|
    +-------------------+-------+-----+
    |0162381440670851711|      4|  7.0|
    |0162381440670851711|     11|  4.0|
    +-------------------+-------+-----+
    only showing top 2 rows

    三、Spark SQL操作DataSet

    步骤:

    (1)创建DataSet数据集

    (2)由DataSet创建临时视图或者全局视图

    (3)DataSet创建sqlContext对象后执行SQL查询,或者spark直接执行SQL查询

    程序如下:

    package com.leboop.rdd
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * Created by LiuErBao on 2018/8/10.
      */
    
    object DataSetDemo {
    
      case class Data(user_id: String,item_id:String, score: Double)
    
      def main(args: Array[String]): Unit = {
        //创建Spark SQL的切入点(RDD的切入点是SparkContext)
        val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate()
        //创建RDD
        val rdd = spark.sparkContext.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
        //RDD转换成DataSet
        import spark.implicits._
        val splitRDD=rdd.map(x=>x.split(","))
        val dataDS= splitRDD.map(x=>Data(x(0),x(1),x(2).toDouble)).toDS()
        dataDS.createOrReplaceTempView("data")
        dataDS.sqlContext.sql("select user_id,item_id from data").show(2)
        spark.sql("select user_id,score from data").show(2)
      }
    }
    

    程序运行结果

    +-------------------+-------+
    |            user_id|item_id|
    +-------------------+-------+
    |0162381440670851711|      4|
    |0162381440670851711|     11|
    +-------------------+-------+
    only showing top 2 rows
    
    
    
    
    
    +-------------------+-----+
    |            user_id|score|
    +-------------------+-----+
    |0162381440670851711|  7.0|
    |0162381440670851711|  4.0|
    +-------------------+-----+
    only showing top 2 rows
  • 相关阅读:
    silverlight通过WCF访问SQLserver示例
    PRISM ‘VS100COMNTOOLS’ not set. Cannot set the build environment
    定义了重复的“system.web.extensions/scripting/scriptResourceHandler”节
    silverlight+WCF之Hello world
    Java基础——注释规范
    Java基础—JDK环境变量配置
    Java基础—异常
    Java基础—面向对象
    Java基础—基础语法与常用命令
    Eclipse添加Junit测试
  • 原文地址:https://www.cnblogs.com/leboop/p/9458821.html
Copyright © 2011-2022 走看看