zoukankan      html  css  js  c++  java
  • pyspark创建RDD数据、RDD转DataFrame以及保存

    pyspark创建RDD的方式主要有两种,一种是通过spark.sparkContext.textFile 或者 sparkContext.textFile读取生成RDD数据;另一种是通过spark.sparkContext.parallelize创建RDD数据。

    1. 首先导入库和进行环境配置(使用的是linux下的pycharm)

    import os
    from pyspark import SparkContext, SparkConf
    from pyspark.sql.session import SparkSession
    from pyspark.sql.types import StructField, StructType, StringType
    from pyspark.sql import HiveContext
    
    os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"  #多个python版本时需要指定
    
    spark = SparkSession.builder.master("local").appName("SparkOnHive").enableHiveSupport().getOrCreate()

    2. 创建RDD数据,这里采用的是第二种方式

    data = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] # mixed,可以元组、列表或者混合,子元素的长度也不一定要一样
    rdd_ = spark.sparkContext.parallelize(data)
    print(type(rdd_))
    # support: list	upledict or mixed them
    print(rdd_.take(2))
    rdd_collect = rdd_.collect()
    print(rdd_collect)
    print(rdd_collect[1])

    如下,混合也是可行的,但是长度不一致时,就不能直接转成DataFrame了,否则会出现: ValueError: Length of object (2) does not match with length of fields (3)

    data = [('Alex','male',3),['Nancy',6],{'sport':'tennis'}] # 混合,长度也不一致,相当于RDD把每一行当做一整个元素了
    rdd_ = spark.sparkContext.parallelize(data)
    print(type(rdd_))
    # support: list	upledict or mixed them
    print(rdd_.take(2))
    rdd_collect = rdd_.collect()
    print(rdd_collect)
    print(rdd_collect[1])

    3. 如果RDD要直接转成DataFrame,使用spark.createDataFrame,则子元素长度要一致,例如:

    data = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] # mixed
    rdd_ = spark.sparkContext.parallelize(data)
    
    # schema
    schema = StructType([
            # true代表不为空
            StructField("name", StringType(), True),
            StructField("gender", StringType(), True),
            StructField("num", StringType(), True)
        ])
    df = spark.createDataFrame(rdd_, schema=schema)  # working when the struct of data is same.
    print(df.show()) 

    其中,DataFrame和hive table的相互转换可见:https://www.cnblogs.com/qi-yuan-008/p/12494024.html

    4. RDD数据的保存:saveAsTextFile,如下 repartition 表示使用一个分区,后面加上路径即可

    rdd_.repartition(1).saveAsTextFile(r'some_path') 

    5. DataFrame数据的保存:通过路径进行设置

    # save
    file_path = r'/home/Felix/pycharm_projects/test/testfile.csv'
    df.write.csv(path=file_path, header=True, sep=',', mode='overwrite')
    
    file_path_2 = r'/home/Felix/pycharm_projects/test/testfile.parquet'
    df.write.parquet(path=file_path_2, mode='overwrite')

    6. 读取以上保存的csv和parquet文件

    dt1 = spark.read.csv(r'/home/Felix/pycharm_projects/test/testfile.csv', header=True)
    print(dt1.show())
    print('11111111')
    #
    dt1 = spark.read.parquet(r'/home/Felix/pycharm_projects/test/testfile.parquet')
    print(dt1.show())
    print('22222222')

    ##

  • 相关阅读:
    python中的继承和多态
    python中的深浅copy
    面向对象初识
    常用模块,异常处理
    递归,re,time,random
    内置函数,匿名函数
    生成器和迭代器
    记一次nginx由于文件过大的相关问题
    vue的Element+gin实现文件上传
    Vue问题汇总
  • 原文地址:https://www.cnblogs.com/qi-yuan-008/p/12504882.html
Copyright © 2011-2022 走看看