from pysoark. sql import SparkSession
def split_line(line):
try:
return line.split(b" ")
except:pass
def map_partitions(partitions):
for line in partitions:
yield split_line(line)
if __name__ == "__main__":
spark=SparkSession. builder. appName("pyspark").getOrCreate()
sc=spark. sparkContext
output="/usr/local/output"
red = sc. textFile(input,use_unicode=False) #bytes格式加载
red. mapPartitions(map_partitions)
.filter(lambda line:line)
.coalesce(10)
.saveAsTextFile(output)
spark. stop()
rdd = sc.textFile('/user/hadoop/*')
rdd = sc.parallelize([1,2,3,4,5])
rdd.collect()
rdd.collect()
rdd.saveAsTextFile("file:///usr/local/test/urls")
map():是将文件每一行进行操作,数量不会改变
mapPartitions():类似map,func的函数类型必须是Iterator,应用于每个分区,也就是把每个分区中的内容作为整体来处理的
mapPartitionsWithIndex(func):类似于mapPartitions,但func带有一个整数参数表示分片的索引值
rdd = textFile.map( lambda x:(x,1))
rdd = textFile.mapPartitions(lambda data:[(x,1) for x in data])
rdd = textFile.mapPartitionsWithIndex(lambda n,data:[(x,1,"分区"+str(n)) for x in data])
rdd = rdd.reduceByKey(lambda a,b:a+b)
flatMap():是将所有元素进行操作,数量只会大于或者等于初始数量
rdd=sc.parallelize(["hello_tom","very_good","how_are_you"])
def split_text(rdd):
return rdd.split("_")
rdd.flatMap(split_text).collect()
filter():过滤,将符合条件的数据留下来
rdd.filter(lambda x:len(x)>2)
sample(withReplacement, fraction, seed):以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子
textFile = sc.textFile("file:///usr/local/test/urls")
textFile.sample(False,0.4,146)
distinct():对数据去重
rdd.distinct(2) #去重后缩减为2个分区
coalesce(numPartitions):重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
rdd.coalesce(3,True)
repartition(numPartitions):根据分区数,重新通过网络随机洗牌所有数据
rdd = rdd.repartition(3)
reduce(lambda a,b:a+b):每次相加两个元组然后产生新的rdd与下一位元组相加
reduceByKey:对元素RDD中Key相同的元素的Value进行reduce操作(去键重)
rdd.map(lambda word:(word, 1)).reduceByKey(lambda x, y: x + y)
groupBy:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器
rdd = textFile.groupBy(lambda x:x%2)
groupByKey
rdd=sc.parallelize(["one","two","two","three","three","three"])
rdd = rdd.map( lambda x:(x,1)).groupByKey().map(lambda x:(x[0],sum(x[1])))
rdd = rdd.sortBy(lambda x:x[1],False).map(lambda x:" ".join([str(i) for i in x]))
count():是用来统计这个RDD文件里面有多少个元素
countByValue():是用于统计RDD键值对中每个键的数量
glom:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
textFile=sc.parallelize(range(1,200)).repartition(4)
rdd = textFile.map( lambda x:(x,1))
glom_rdd = rdd.glom()
sc.parallelize(range(1,22)).repartition(3).glom().collect()
mapValues
rdd=sc.parallelize(["one","two","one"])
rdd = rdd.map( lambda x:(1,x)).mapValues(lambda x:x+x)
#(1, 'oneone')(1, 'twotwo')(1, 'oneone')
mapWith:map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数
sortBy() :使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序
rdd = rdd.sortBy(lambda x:x[0]).map(lambda x:" ".join([str(i) for i in x]))
print(sc.parallelize([1,2,3,6,98,7,4,5]).sortBy(lambda x:x,False).collect()) #[98, 7, 6, 5, 4, 3, 2, 1]
sortByKey():返回一个按照key进行排序的(K,V)的RDD
rdd=sc.parallelize(["one","two","one","two","three","three","three"])
rdd = rdd.map( lambda x:(x,1)).sortByKey(True)
#('one', 1)('one', 1)('two', 1)