zoukankan      html  css  js  c++  java
  • 07 从RDD创建DataFrame

    0.前次作业:从文件创建DataFrame

    1.pandas df 与 spark df的相互转换

    df_s=spark.createDataFrame(df_p)

    df_p=df_s.toPandas()

    >>> import pandas as pd
    >>> import numpy as np
    >>> arr = np.arange(6).reshape(-1,3)
    >>> df_p=pd.DataFrame(arr)
    >>> df_p

    >>> arr

    >>> df_p.columns=['a','b','c']
    >>> df_p

     

    >>> df_s=spark.createDataFrame(df_p)
    >>> df_s.show()

    >>> df_s.collect()

    >>> df_s.toPandas()

    2. Spark与Pandas中DataFrame对比

    http://www.lining0806.com/spark%E4%B8%8Epandas%E4%B8%ADdataframe%E5%AF%B9%E6%AF%94/

    3.1 利用反射机制推断RDD模式

    • sc创建RDD
    • 转换成Row元素,列名=值
    • spark.createDataFrame生成df
    • df.show(), df.printSchema()

    >>> from pyspark.sql import Row
    >>> people = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line:line.split(',')).map(lambda p:Row(name=p[0],age=int(p[1])))
    >>> schemaPeople=spark.createDataFrame(people)
    >>> schemaPeople.createOrReplaceTempView("people")
    >>> personsDF=spark.sql("select name,age from people where age>20")
    >>> personsRDD=personsDF.rdd.map(lambda p:"Name:"+p.name+","+"Age:"+str(p.age))
    >>> personsRDD.foreach(print)

    >>> schemaPeople.show()

    >>> schemaPeople.printSchema()

    3.2 使用编程方式定义RDD模式

    • 生成“表头”
      • fields = [StructField(field_name, StringType(), True) ,...]
      • schema = StructType(fields)

    >>> from pyspark.sql.types import StringType,StructField,StructType
    >>> from pyspark.sql import Row
    >>> schemaString = "name age"
    >>> fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split(" ")]
    >>> schema = StructType(fields)

    >>> fields

    >>> schema

    • 生成“表中的记录”
      • 创建RDD
      • 转换成Row元素,列名=值

    >>> lines = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
    >>> parts = lines.map(lambda x:x.split(","))
    >>> people = parts.map(lambda p:Row(p[0],p[1].strip()))
    >>> people.collect()

    • 把“表头”和“表中的记录”拼装在一起
      • = spark.createDataFrame(RDD, schema)

    >>> schemaPeople = spark.createDataFrame(people,schema)
    >>> schemaPeople.show()

    >>> schemaPeople.printSchema()

     4. DataFrame保存为文件

    df.write.json(dir)

    >>> schemaPeople.write.json("file///home/hadoop/schema_out")

  • 相关阅读:
    Silverlight项目开发准则
    WCF序列化65536大小限制的问题
    自定义的asp.net翻页控件
    解决Process因缓冲区满而导至进程阻塞的办法
    选择适合的Silverlight通信技术
    深入剖析WCF的可靠会话[实例篇](内含美女图片,定力差者慎入)
    25个强大的 jQuery 砌体网页设计作品
    Silverlight 4常用StringFormat格式总结
    C# 发送邮件内容嵌入图片
    如何解决Silverlight跨域访问安全性问题
  • 原文地址:https://www.cnblogs.com/0311Chrome/p/14766673.html
Copyright © 2011-2022 走看看