zoukankan      html  css  js  c++  java
  • spark DataFrame的创建几种方式和存储

    一。

    从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。

    SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持。

    下面我们就介绍如何使用SparkSession来创建DataFrame。
    请进入Linux系统,打开“终端”,进入Shell命令提示符状态。
    首先,请找到样例数据。 Spark已经为我们提供了几个样例数据,就保存在“/usr/local/spark/examples/src/main/resources/”这个目录下,这个目录下有两个样例数据people.json和people.txt。
    people.json文件的内容如下:

    {"name":"Michael"}
    {"name":"Andy", "age":30}
    {"name":"Justin", "age":19}
    

    people.txt文件的内容如下:

    Michael, 29
    Andy, 30
    Justin, 19
    

    下面我们就介绍如何从people.json文件中读取数据并生成DataFrame并显示数据(从people.txt文件生成DataFrame需要后面将要介绍的另外一种方式)。
    请使用如下命令打开pyspark:

      cd /usr/local/spark
      ./bin/pyspark

    进入到pyspark状态后执行下面命令:

      

    >>> spark=SparkSession.builder.getOrCreate()
    >>> df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
    >>> df.show()
    +----+-------+
    | age| name|
    +----+-------+
    |null|Michael|
    | 30| Andy|
    | 19| Justin|
    +----+-------+

    现在,我们可以执行一些常用的DataFrame操作。

    // 打印模式信息
    >>> df.printSchema()
    root
    |-- age: long (nullable = true)
    |-- name: string (nullable = true)

    // 选择多列
    >>> df.select(df.name,df.age + 1).show()
    +-------+---------+
    | name|(age + 1)|
    +-------+---------+
    |Michael| null|
    | Andy| 31|
    | Justin| 20|
    +-------+---------+

    // 条件过滤
    >>> df.filter(df.age > 20 ).show()
    +---+----+
    |age|name|
    +---+----+
    | 30|Andy|
    +---+----+

    // 分组聚合
    >>> df.groupBy("age").count().show()
    +----+-----+
    | age|count|
    +----+-----+
    | 19| 1|
    |null| 1|
    | 30| 1|
    +----+-----+

    // 排序
    >>> df.sort(df.age.desc()).show()
    +----+-------+
    | age| name|
    +----+-------+
    | 30| Andy|
    | 19| Justin|
    |null|Michael|
    +----+-------+

    //多列排序
    >>> df.sort(df.age.desc(), df.name.asc()).show()
    +----+-------+
    | age| name|
    +----+-------+
    | 30| Andy|
    | 19| Justin|
    |null|Michael|
    +----+-------+

    //对列进行重命名
    >>> df.select(df.name.alias("username"),df.age).show()
    +--------+----+
    |username| age|
    +--------+----+
    | Michael|null|
    | Andy| 30|
    | Justin| 19|
    +--------+----+

    二。由RDD转换到DataFrame。

      Spark官网提供了两种方法来实现从RDD转换得到DataFrame,第一种方法是,利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。

      1.利用反射机制推断RDD模式

       

    >>> from pyspark.sql.types import Row
    >>> def f(x):
    ...     rel = {}
    ...     rel['name'] = x[0]
    ...     rel['age'] = x[1]
    ...     return rel
    ... 
    >>> peopleDF = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF()
    >>> peopleDF.createOrReplaceTempView("people")  //必须注册为临时表才能供下面的查询使用
     
    >>> personsDF = spark.sql("select * from people")
    >>> personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print)
     
    Name: 19,Age:Justin
    Name: 29,Age:Michael
    Name: 30,Age:Andy

      2.使用编程方式定义RDD模式

      

    >>>  from pyspark.sql.types import Row
    >>>  from pyspark.sql.types import StructType
    >>> from pyspark.sql.types import StructField
    >>> from pyspark.sql.types import StringType
     
    //生成 RDD
    >>> peopleRDD = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
     
    //定义一个模式字符串
    >>> schemaString = "name age"
     
    //根据模式字符串生成模式
    >>> fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
    >>> schema = StructType(fields)
    //从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
     
     
    >>> rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))
     
    >>> peopleDF = spark.createDataFrame(rowRDD, schema)
     
    //必须注册为临时表才能供下面查询使用
    scala> peopleDF.createOrReplaceTempView("people")
     
    >>> results = spark.sql("SELECT * FROM people")
    >>> results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print)
     
    name: Michael,age: 29
    name: Andy,age: 30
    name: Justin,age: 19
     

    三。保存成文件

      

    >>> peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
     
    >>> peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")
    >>> peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json"
    >>> peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")
     
  • 相关阅读:
    python高阶1--is 和==
    python基础知识 -- 输入与输出
    Linux忘记用户名密码
    pip 安装第三方库报错
    python读取ini文件(含中文)
    fiddler之手机抓包
    python接口测试之参数关联遇到的问题
    (十一)TestNG 其他使用技巧
    (十二)TestNG 生成测试报告
    (十) TestNG 多线程运行用例
  • 原文地址:https://www.cnblogs.com/dhName/p/10699798.html
Copyright © 2011-2022 走看看