pyspark创建RDD的方式主要有两种,一种是通过spark.sparkContext.textFile 或者 sparkContext.textFile读取生成RDD数据;另一种是通过spark.sparkContext.parallelize创建RDD数据。
1. 首先导入库和进行环境配置(使用的是linux下的pycharm)
import os from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession from pyspark.sql.types import StructField, StructType, StringType from pyspark.sql import HiveContext os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" #多个python版本时需要指定 spark = SparkSession.builder.master("local").appName("SparkOnHive").enableHiveSupport().getOrCreate()
2. 创建RDD数据,这里采用的是第二种方式
data = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] # mixed,可以元组、列表或者混合,子元素的长度也不一定要一样 rdd_ = spark.sparkContext.parallelize(data) print(type(rdd_)) # support: list upledict or mixed them print(rdd_.take(2)) rdd_collect = rdd_.collect() print(rdd_collect) print(rdd_collect[1])
如下,混合也是可行的,但是长度不一致时,就不能直接转成DataFrame了,否则会出现: ValueError: Length of object (2) does not match with length of fields (3)
data = [('Alex','male',3),['Nancy',6],{'sport':'tennis'}] # 混合,长度也不一致,相当于RDD把每一行当做一整个元素了 rdd_ = spark.sparkContext.parallelize(data) print(type(rdd_)) # support: list upledict or mixed them print(rdd_.take(2)) rdd_collect = rdd_.collect() print(rdd_collect) print(rdd_collect[1])
3. 如果RDD要直接转成DataFrame,使用spark.createDataFrame,则子元素长度要一致,例如:
data = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] # mixed rdd_ = spark.sparkContext.parallelize(data) # schema schema = StructType([ # true代表不为空 StructField("name", StringType(), True), StructField("gender", StringType(), True), StructField("num", StringType(), True) ]) df = spark.createDataFrame(rdd_, schema=schema) # working when the struct of data is same. print(df.show())
其中,DataFrame和hive table的相互转换可见:https://www.cnblogs.com/qi-yuan-008/p/12494024.html
4. RDD数据的保存:saveAsTextFile,如下 repartition 表示使用一个分区,后面加上路径即可
rdd_.repartition(1).saveAsTextFile(r'some_path')
5. DataFrame数据的保存:通过路径进行设置
# save file_path = r'/home/Felix/pycharm_projects/test/testfile.csv' df.write.csv(path=file_path, header=True, sep=',', mode='overwrite') file_path_2 = r'/home/Felix/pycharm_projects/test/testfile.parquet' df.write.parquet(path=file_path_2, mode='overwrite')
6. 读取以上保存的csv和parquet文件
dt1 = spark.read.csv(r'/home/Felix/pycharm_projects/test/testfile.csv', header=True) print(dt1.show()) print('11111111') # dt1 = spark.read.parquet(r'/home/Felix/pycharm_projects/test/testfile.parquet') print(dt1.show()) print('22222222')
##