RDD操作
词频统计
line = sc.textFile("file:///usr/local/spark/word.txt")
wordCount = lines.flatMap(lambda line: line.split(" ").map(lambda word: (word,1)).reduceByKey(lambda a,b: a + b)
print(wordCount.collect())
//[('good',1),('Spark',2),('hadoop',1)]
//.collect()以数组形式将结果集缓存磁盘
Pair RDD操作
计算每种图书的每天平均销量
rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])
rdd.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).mapValues(lambda x:x[0]/x[1]).collect()
//输出结果[('hadoop',5.0),('spark',4.0)]
求TOP值
file文件有4列,求第三列前N个TOP值。
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("TOP")
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")
//strip()消去空格,result1把数据没有4项的过滤掉
result1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(","))==4))
//result2取第三列的数据
result2 = result1.map(lambda x:x.split(",")[2])
//result3转化为键值对(x,""),转化成键值对才能排序
result3 = result2.map(lambda x:(int(x),""))
//result4分成1个分区,否则是各自分区排序
result4 = result3.repartition(1)
//result5对key排序
result5 = result4.sortByKey(False)
//result6取键值对的key
result6 = result5.map(lambda x:x[0])
//result7取前5个
result7 = result6.take(5)
for a in result7:
print(a)
文件排序
读取文件中所有整数进行排序
from pyspark import SparkConf,SparkContext
index = 0
def getindex():
global index
index+=1
return index
def main():
conf = SparkConf().setMaster("local[1]").setAppName("FileSort")
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesor/file*.txt")
index = 0
//过滤空行
result1 = lines.filter(lambda line:(len(line.strip()) > 0))
result2 = result1.map(lambda x:(int(x.strip()),""))
result3 = result2.repartition(1)
result4 = result3.sortByKey(True)
result5 = result4.map(lambda x:x[0])
result6 = result5.map(lambda x:(getindex(),x))
result6.foreach(print)
result6.saveAsTextFile("file:///usr/local/spark/mycode/rdd/filesort/sortresult")
if__name__=='__main__':
main()
二次排序
对于一个给定的文件(有两列整数),请对数据进行排序,首先根据第一列数据降序排序,如果第一列数据相等,则根据第二列数据降序排序
- 按照Ordered和Serializable接口实现自定义排序的key
- 将要进行二次排序的文件加载进来生成<key,value>类型的RDD
- 使用sortByKey基于自定义的key进行二次排序
- 去掉排序的key只保留排序的结果
from pyspark import SparkConf,SparkContext
from operator import gt
class SecondarySortKey():
def__init__(self,k):
self.column1 = k[0]
self.column2 = k[1]
def__gt__(self, other):
if other.column1 ==self.column1:
return gt(self.column2,other.column2)
else:
return gt(self.column1,other.column1)
def main():
conf = SparkConf().setAppName('spark_sort').setMaster('local[1]')
sc = SparkConf(conf = conf)
file = "file:///usr/local/spark/mycode/secondarysort/file.txt"
rdd1 = sc.textFile(file)
rdd2 = rdd1.filter(lambda x:len(x.strip()>0))
rdd3 = rdd2.map(lambda x:((int(x.split(" ")[0]),int(x.split(" ")[1])),x))
rdd4 = rdd3.map(lambda x: (SecondarySortKey(x[0]),x[1]))
rdd5 = rdd4.sortByKey(False)
rdd6 = rdd5.map(lambda x:x[1])
rdd6.foreach(print)
if__name__=='__main__':
main()