zoukankan      html  css  js  c++  java
  • Spark SQL 之 DataFrame

    Spark SQL 之 DataFrame


    转载请注明出处:http://www.cnblogs.com/BYRans/

    概述(Overview)

    Spark SQL是Spark的一个组件,用于结构化数据的计算。Spark SQL提供了一个称为DataFrames的编程抽象,DataFrames可以充当分布式SQL查询引擎。

    DataFrames

    DataFrame是一个分布式的数据集合,该数据集合以命名列的方式进行整合。DataFrame可以理解为关系数据库中的一张表,也可以理解为R/Python中的一个data frame。DataFrames可以通过多种数据构造,例如:结构化的数据文件、hive中的表、外部数据库、Spark计算过程中生成的RDD等。
    DataFrame的API支持4种语言:Scala、Java、Python、R。

    入口:SQLContext(Starting Point: SQLContext)

    Spark SQL程序的主入口是SQLContext类或它的子类。创建一个基本的SQLContext,你只需要SparkContext,创建代码示例如下:

    • Scala
    val sc: SparkContext // An existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    • Java
    JavaSparkContext sc = ...; // An existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    

    除了基本的SQLContext,也可以创建HiveContext。SQLContext和HiveContext区别与联系为:

    • SQLContext现在只支持SQL语法解析器(SQL-92语法)
    • HiveContext现在支持SQL语法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器,来运行HiveSQL不支持的语法。
    • 使用HiveContext可以使用Hive的UDF,读写Hive表数据等Hive操作。SQLContext不可以对Hive进行操作。
    • Spark SQL未来的版本会不断丰富SQLContext的功能,做到SQLContext和HiveContext的功能容和,最终可能两者会统一成一个Context

    HiveContext包装了Hive的依赖包,把HiveContext单独拿出来,可以在部署基本的Spark的时候就不需要Hive的依赖包,需要使用HiveContext时再把Hive的各种依赖包加进来。

    SQL的解析器可以通过配置spark.sql.dialect参数进行配置。在SQLContext中只能使用Spark SQL提供的”sql“解析器。在HiveContext中默认解析器为”hiveql“,也支持”sql“解析器。

    创建DataFrames(Creating DataFrames)

    使用SQLContext,spark应用程序(Application)可以通过RDD、Hive表、JSON格式数据等数据源创建DataFrames。下面是基于JSON文件创建DataFrame的示例:

    • Scala
    val sc: SparkContext // An existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    val df = sqlContext.read.json("examples/src/main/resources/people.json")
    
    // Displays the content of the DataFrame to stdout
    df.show()
    
    • Java
    JavaSparkContext sc = ...; // An existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    
    DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");
    
    // Displays the content of the DataFrame to stdout
    df.show();
    

    DataFrame操作(DataFrame Operations)

    DataFrames支持Scala、Java和Python的操作接口。下面是Scala和Java的几个操作示例:

    • Scala
    val sc: SparkContext // An existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    // Create the DataFrame
    val df = sqlContext.read.json("examples/src/main/resources/people.json")
    
    // Show the content of the DataFrame
    df.show()
    // age  name
    // null Michael
    // 30   Andy
    // 19   Justin
    
    // Print the schema in a tree format
    df.printSchema()
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)
    
    // Select only the "name" column
    df.select("name").show()
    // name
    // Michael
    // Andy
    // Justin
    
    // Select everybody, but increment the age by 1
    df.select(df("name"), df("age") + 1).show()
    // name    (age + 1)
    // Michael null
    // Andy    31
    // Justin  20
    
    // Select people older than 21
    df.filter(df("age") > 21).show()
    // age name
    // 30  Andy
    
    // Count people by age
    df.groupBy("age").count().show()
    // age  count
    // null 1
    // 19   1
    // 30   1
    
    • Java
    JavaSparkContext sc // An existing SparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    // Create the DataFrame
    DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");
    
    // Show the content of the DataFrame
    df.show();
    // age  name
    // null Michael
    // 30   Andy
    // 19   Justin
    
    // Print the schema in a tree format
    df.printSchema();
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)
    
    // Select only the "name" column
    df.select("name").show();
    // name
    // Michael
    // Andy
    // Justin
    
    // Select everybody, but increment the age by 1
    df.select(df.col("name"), df.col("age").plus(1)).show();
    // name    (age + 1)
    // Michael null
    // Andy    31
    // Justin  20
    
    // Select people older than 21
    df.filter(df.col("age").gt(21)).show();
    // age name
    // 30  Andy
    
    // Count people by age
    df.groupBy("age").count().show();
    // age  count
    // null 1
    // 19   1
    // 30   1
    

    详细的DataFrame API请参考 API Documentation

    除了简单列引用和表达式,DataFrames还有丰富的library,功能包括string操作、date操作、常见数学操作等。详细内容请参考 DataFrame Function Reference

    运行SQL查询程序(Running SQL Queries Programmatically)

    Spark Application可以使用SQLContext的sql()方法执行SQL查询操作,sql()方法返回的查询结果为DataFrame格式。代码如下:

    • Scala
    val sqlContext = ...  // An existing SQLContext
    val df = sqlContext.sql("SELECT * FROM table")
    
    • Java
    SQLContext sqlContext = ...  // An existing SQLContext
    DataFrame df = sqlContext.sql("SELECT * FROM table")
    

    DataFrames与RDDs的相互转换(Interoperating with RDDs)

    Spark SQL支持两种RDDs转换为DataFrames的方式:

    • 使用反射获取RDD内的Schema
      • 当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。
    • 通过编程接口指定Schema
      • 通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。
      • 这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema

    使用反射获取Schema(Inferring the Schema Using Reflection)

    Spark SQL支持将JavaBean的RDD自动转换成DataFrame。通过反射获取Bean的基本信息,依据Bean的信息定义Schema。当前Spark SQL版本(Spark 1.5.2)不支持嵌套的JavaBeans和复杂数据类型(如:List、Array)。创建一个实现Serializable接口包含所有属性getters和setters的类来创建一个JavaBean。通过调用createDataFrame并提供JavaBean的Class object,指定一个Schema给一个RDD。示例如下:

    public static class Person implements Serializable {
      private String name;
      private int age;
    
      public String getName() {
        return name;
      }
    
      public void setName(String name) {
        this.name = name;
      }
    
      public int getAge() {
        return age;
      }
    
      public void setAge(int age) {
        this.age = age;
      }
    }
    
    // sc is an existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    
    // Load a text file and convert each line to a JavaBean.
    JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
      new Function<String, Person>() {
        public Person call(String line) throws Exception {
          String[] parts = line.split(",");
    
          Person person = new Person();
          person.setName(parts[0]);
          person.setAge(Integer.parseInt(parts[1].trim()));
    
          return person;
        }
      });
    
    // Apply a schema to an RDD of JavaBeans and register it as a table.
    DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
    schemaPeople.registerTempTable("people");
    
    // SQL can be run over RDDs that have been registered as tables.
    DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
    
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
      public String call(Row row) {
        return "Name: " + row.getString(0);
      }
    }).collect();
    

    通过编程接口指定Schema(Programmatically Specifying the Schema)

    当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:

    • 从原来的RDD创建一个Row格式的RDD
    • 创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema
    • 通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema

    示例如下:

    import org.apache.spark.api.java.function.Function;
    // Import factory methods provided by DataTypes.
    import org.apache.spark.sql.types.DataTypes;
    // Import StructType and StructField
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.sql.types.StructField;
    // Import Row.
    import org.apache.spark.sql.Row;
    // Import RowFactory.
    import org.apache.spark.sql.RowFactory;
    
    // sc is an existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    
    // Load a text file and convert each line to a JavaBean.
    JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt");
    
    // The schema is encoded in a string
    String schemaString = "name age";
    
    // Generate the schema based on the string of schema
    List<StructField> fields = new ArrayList<StructField>();
    for (String fieldName: schemaString.split(" ")) {
      fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
    }
    StructType schema = DataTypes.createStructType(fields);
    
    // Convert records of the RDD (people) to Rows.
    JavaRDD<Row> rowRDD = people.map(
      new Function<String, Row>() {
        public Row call(String record) throws Exception {
          String[] fields = record.split(",");
          return RowFactory.create(fields[0], fields[1].trim());
        }
      });
    
    // Apply the schema to the RDD.
    DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
    
    // Register the DataFrame as a table.
    peopleDataFrame.registerTempTable("people");
    
    // SQL can be run over RDDs that have been registered as tables.
    DataFrame results = sqlContext.sql("SELECT name FROM people");
    
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    List<String> names = results.javaRDD().map(new Function<Row, String>() {
      public String call(Row row) {
        return "Name: " + row.getString(0);
      }
    }).collect();
    


  • 相关阅读:
    is_numeric — 检测变量是否为数字或数字字符串
    intval — 获取变量的整数值
    php获取数组最后一个值
    js正則匹配经纬度(经纬度逗号隔开)
    安装netcat(-bash: netcat: command not found)
    提示-bash: telnet: command not found的解决方法
    Laravel中使用Redis
    鏈接Redis報錯`AUTH` failed: ERR Client sent AUTH, but no password is set [tcp://127.0.0.1:6379]
    使用composer遇到的坑
    laravel的ORM模型的find(),findOrFail(),first(),firstOrFail(),get(),list(),toArray()之间的区别
  • 原文地址:https://www.cnblogs.com/BYRans/p/5003029.html
Copyright © 2011-2022 走看看