zoukankan      html  css  js  c++  java
  • 07 从RDD创建DataFrame

    0.前次作业:从文件创建DataFrame

    1.pandas df 与 spark df的相互转换

    df_s=spark.createDataFrame(df_p)

    df_p=df_s.toPandas()

    >>> import pandas as pd
    >>> import numpy as np
    >>> arr = np.arange(6).reshape(-1,3)
    >>> df_p=pd.DataFrame(arr)
    >>> df_p

    >>> arr

    >>> df_p.columns=['a','b','c']
    >>> df_p

     

    >>> df_s=spark.createDataFrame(df_p)
    >>> df_s.show()

    >>> df_s.collect()

    >>> df_s.toPandas()

    2. Spark与Pandas中DataFrame对比

    http://www.lining0806.com/spark%E4%B8%8Epandas%E4%B8%ADdataframe%E5%AF%B9%E6%AF%94/

    3.1 利用反射机制推断RDD模式

    • sc创建RDD
    • 转换成Row元素,列名=值
    • spark.createDataFrame生成df
    • df.show(), df.printSchema()

    >>> from pyspark.sql import Row
    >>> people = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line:line.split(',')).map(lambda p:Row(name=p[0],age=int(p[1])))
    >>> schemaPeople=spark.createDataFrame(people)
    >>> schemaPeople.createOrReplaceTempView("people")
    >>> personsDF=spark.sql("select name,age from people where age>20")
    >>> personsRDD=personsDF.rdd.map(lambda p:"Name:"+p.name+","+"Age:"+str(p.age))
    >>> personsRDD.foreach(print)

    >>> schemaPeople.show()

    >>> schemaPeople.printSchema()

    3.2 使用编程方式定义RDD模式

    • 生成“表头”
      • fields = [StructField(field_name, StringType(), True) ,...]
      • schema = StructType(fields)

    >>> from pyspark.sql.types import StringType,StructField,StructType
    >>> from pyspark.sql import Row
    >>> schemaString = "name age"
    >>> fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split(" ")]
    >>> schema = StructType(fields)

    >>> fields

    >>> schema

    • 生成“表中的记录”
      • 创建RDD
      • 转换成Row元素,列名=值

    >>> lines = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
    >>> parts = lines.map(lambda x:x.split(","))
    >>> people = parts.map(lambda p:Row(p[0],p[1].strip()))
    >>> people.collect()

    • 把“表头”和“表中的记录”拼装在一起
      • = spark.createDataFrame(RDD, schema)

    >>> schemaPeople = spark.createDataFrame(people,schema)
    >>> schemaPeople.show()

    >>> schemaPeople.printSchema()

     4. DataFrame保存为文件

    df.write.json(dir)

    >>> schemaPeople.write.json("file///home/hadoop/schema_out")

  • 相关阅读:
    求阶乘及其和
    JAVA 字符串题目 以静态方法实现encode()和decode()的调用
    JAVA 类与对象题目5
    JAVA 类与对象题目4
    JAVA 类与对象题目3
    JAVA 类与对象题目2
    JAVA 基础练习题代码
    JAVA 关于值类型和引用类型的区别
    JAVA学习 判断一个字符或字符是否位于另一个字符串的末尾
    JAVA 截取4个随机数字字母的代码
  • 原文地址:https://www.cnblogs.com/0311Chrome/p/14766673.html
Copyright © 2011-2022 走看看