# -*- coding:utf-8 -*- import sys from pyspark import SparkContext from pyspark.sql.session import SparkSession from pyspark import SQLContext from pyspark.sql.types import * def getkey(line): fields = line.strip().split(' ') key = fields[0] return (key,)
#return [key] def getkey2(line): fields = line.strip().split(' ') key = fields[0] return key sc = SparkContext(appName="zxm:copc") spark = SparkSession(sc) inpath = "xxx/imei_hashv" outpath="xxx/test" imeiRdd = sc.textFile(inpath, use_unicode=False).map(getkey2) #imeiRdd = sc.textFile(inpath, use_unicode=False).map(getkey) schema = StructType([StructField("imei", StringType(), True)]) imeiDf = spark.createDataFrame(imeiRdd, schema) imeiDf.registerTempTable("t2") res = spark.sql("select imei from t2") res.repartition(1).write.format("csv").save(outpath)
(1)用sc.textFile()读取 inpath的文件成为rdd,文件只有一列,前三行为
13279285433414550239
492335506325762025
12750066214056691161
(2)schema = StructType([StructField("imei", StringType(), True)]) 表示这列的列名是imei,数据类型为 StringType
(3)然后把 rdd + schema 转换成dataframe,
(4)把dataframe注册成临时表t2,以方便使用sql语句。
----------------------------------------------------------------------------------------------------------------------------------------------------
但是在map函数中使用 getkey2 函数总是报错 StructType can not accept object '13279285433414550239' in type <type 'str'>
原因是schema 这里是一个数组,虽然只有一列。而在 getkey2函数 中 return key 返回的是一个string,spark不能把string 解析成"数组"。改成 return (key, ) 或者 return [key] 即可,这时返回的就是一个只有一列的"数组"了,能与schema对应上。
平时在map函数中多是返回多个值 return a,b,c,d 这样,这次只返回一个值的时候遇到了问题,查了许多类似的问答才发现。
https://stackoverflow.com/questions/52586199/cannot-create-dataframe-in-pyspark
https://stackoverflow.com/questions/44334326/data-not-being-populated-with-dataframe-pyspark