zoukankan      html  css  js  c++  java
  • 【Spark】这一篇或许能让你大概了解如何通过JavaAPI实现DataFrame的相关操作


    需求概述

    将RDD转换得到DataFrame,主要有两种方法:利用反射机制通过编程结构与RDD进行交互

    步骤

    一、创建Maven工程并导包

    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--    <verbal>true</verbal>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    二、选用第一种方法:利用反射机制配合样例类构建DataFrame

    开发代码
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    
    // 定义person case class
    case class Person(id: Int,name: String,age: Int)
    
    object SparkDF {
      def main(args: Array[String]): Unit = {
        // 获取SparkSession
        val sparkSession: SparkSession = SparkSession.builder().appName("SparkDF").master("local[2]").config("spark.driver.host", "localhost").getOrCreate()
        // 获取SparkContext
        val sparkContext: SparkContext = sparkSession.sparkContext
        // 过滤筛选日志
        sparkContext.setLogLevel("WARN")
        //读取文件
        val fileRDD: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第三天/Spark第三天教案/资料/person.txt")
        //按照分隔符切割数据
        val splitRDD: RDD[Array[String]] = fileRDD.map(x => x.split(" "))
        //关联person case class
        val personRDD: RDD[Person] = splitRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
    
        //准备把personRDD转换成DataFrame
        //首先要导包
        import sparkSession.implicits._
        val personDF: DataFrame = personRDD.toDF()
        //todo 至此已经成功通过反射机制构建DataFrame
    
        /**
         * 接下来可以使用DFL语法和SQL语法验证是否构建成功
         */
        println("*******************************************DSL语法测试开始********************************************")
    
        //查看全表数据    show()默认展示前20条数据
        personDF.show()
        //查看指定字段数据
        personDF.select($"name",$"age").show()
        //查看schema
        personDF.printSchema()
        //过滤筛选数据
        personDF.filter($"age" > 25).show()
    
        println("*******************************************DSL语法测试结束********************************************")
    
        println("*******************************************SQL语法测试开始********************************************")
        // 将DataFrame注册为一张table,有三种方法
        //第一种 已过时
        val person1: Unit = personDF.registerTempTable("person1")
        //第二种
        val person2: Unit = personDF.createTempView("person2")
        //第三种 (推荐)
        val person3: Unit = personDF.createOrReplaceTempView("person3")
    
        //打印三钟表的数据
        sparkSession.sql("select * from person1").show()
        sparkSession.sql("select * from person2").show()
        sparkSession.sql("select * from person3").show()
    
        //实现left join操作
        sparkSession.sql("select * from person1 p1 left join person2 p2 on p1.id = p2.id").show()
    
    
        println("*******************************************SQL语法测试结束********************************************")
    
    	sparkContext.stop()
        sparkSession.close()
      }
    }
    
    控制台结果
    
    *******************************************DSL语法测试开始********************************************
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan|108|
    |  2|    lisi| 28|
    |  3|  wangwu| 58|
    |  4| zhaoliu| 24|
    |  5|   zhuqi| 35|
    |  6|   qiuba| 28|
    +---+--------+---+
    
    +--------+---+
    |    name|age|
    +--------+---+
    |zhangsan|108|
    |    lisi| 28|
    |  wangwu| 58|
    | zhaoliu| 24|
    |   zhuqi| 35|
    |   qiuba| 28|
    +--------+---+
    
    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = true)
     |-- age: integer (nullable = false)
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan|108|
    |  2|    lisi| 28|
    |  3|  wangwu| 58|
    |  5|   zhuqi| 35|
    |  6|   qiuba| 28|
    +---+--------+---+
    
    *******************************************DSL语法测试结束********************************************
    *******************************************SQL语法测试开始********************************************
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan|108|
    |  2|    lisi| 28|
    |  3|  wangwu| 58|
    |  4| zhaoliu| 24|
    |  5|   zhuqi| 35|
    |  6|   qiuba| 28|
    +---+--------+---+
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan|108|
    |  2|    lisi| 28|
    |  3|  wangwu| 58|
    |  4| zhaoliu| 24|
    |  5|   zhuqi| 35|
    |  6|   qiuba| 28|
    +---+--------+---+
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan|108|
    |  2|    lisi| 28|
    |  3|  wangwu| 58|
    |  4| zhaoliu| 24|
    |  5|   zhuqi| 35|
    |  6|   qiuba| 28|
    +---+--------+---+
    
    +---+--------+---+---+--------+---+
    | id|    name|age| id|    name|age|
    +---+--------+---+---+--------+---+
    |  1|zhangsan|108|  1|zhangsan|108|
    |  6|   qiuba| 28|  6|   qiuba| 28|
    |  3|  wangwu| 58|  3|  wangwu| 58|
    |  5|   zhuqi| 35|  5|   zhuqi| 35|
    |  4| zhaoliu| 24|  4| zhaoliu| 24|
    |  2|    lisi| 28|  2|    lisi| 28|
    +---+--------+---+---+--------+---+
    
    *******************************************SQL语法测试结束********************************************
    
    Process finished with exit code 0
    

    选用第二种方法:通过StrucType配合Row构建DataFrame

    开发代码
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    
    object SparkDF2 {
      def main(args: Array[String]): Unit = {
        //获取SparkSession
        val sparkSession: SparkSession = SparkSession.builder().appName("saprkDF2").master("local[2]").config("spark.driver.host", "localhost").getOrCreate()
        //获取SparkContext
        val sparkContext: SparkContext = sparkSession.sparkContext
        //设置日志筛选
        sparkContext.setLogLevel("WARN")
        //读取文件
        val fileRDD: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第三天/Spark第三天教案/资料/person.txt")
        //对数据进行切割
        val arrayRDD: RDD[Array[String]] = fileRDD.map(x => x.split(" "))
        //将arrayRDD转换为row
        val rowRDD: RDD[Row] = arrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
    
        //获取StructType对象
        val structType = new StructType().add("id",IntegerType).add("name",StringType).add("age",IntegerType)
    
        //需要两个参数 RDD[Row] 和 StructType
        val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, structType)
    
        //接下来DSL和SQL的操作和之前相同
        personDF.printSchema()
        
    	sparkContext.stop()
        sparkSession.close()
    
      }
    
    }
    
    控制台结果
    
    root
     |-- id: integer (nullable = true)
     |-- name: string (nullable = true)
     |-- age: integer (nullable = true)
    
    
    Process finished with exit code 0
    
  • 相关阅读:
    密码由6-12位数字或字母组成,密码哈希加密
    获得一个字符串的汉语拼音码
    WPF中ComboBox绑定数据库自动读取产生数据
    SQL存储过程生成顺序编码
    SQL 语句调用这个存储过程,生成顺序编码
    restful(1):序列化
    Django的CBV和FBV
    权限管理组件:rbac
    ModelForm组件和forms组件补充
    BBS+Blog项目代码
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772387.html
Copyright © 2011-2022 走看看