【Example】
from pysoark. sql import SparkSession def split_line(line): try: return line.split(b" ") except:pass def map_partitions(partitions): for line in partitions: yield split_line(line) if __name__ == "__main__": spark=SparkSession. builder. appName("pyspark").getOrCreate() sc=spark. sparkContext output="/usr/local/output" red = sc. textFile(input,use_unicode=False) #bytes格式加载 red. mapPartitions(map_partitions) .filter(lambda line:line) .coalesce(10) .saveAsTextFile(output) spark. stop()
加载数据与保存数据
rdd = sc.textFile('/user/hadoop/*') rdd = sc.parallelize([1,2,3,4,5]) rdd.collect() rdd.collect() rdd.saveAsTextFile("file:///usr/local/test/urls")
基本函数
map():是将文件每一行进行操作,数量不会改变 mapPartitions():类似map,func的函数类型必须是Iterator,应用于每个分区,也就是把每个分区中的内容作为整体来处理的 mapPartitionsWithIndex(func):类似于mapPartitions,但func带有一个整数参数表示分片的索引值 rdd = textFile.map( lambda x:(x,1)) rdd = textFile.mapPartitions(lambda data:[(x,1) for x in data]) rdd = textFile.mapPartitionsWithIndex(lambda n,data:[(x,1,"分区"+str(n)) for x in data]) rdd = rdd.reduceByKey(lambda a,b:a+b) flatMap():是将所有元素进行操作,数量只会大于或者等于初始数量 rdd=sc.parallelize(["hello_tom","very_good","how_are_you"]) def split_text(rdd): return rdd.split("_") rdd.flatMap(split_text).collect() filter():过滤,将符合条件的数据留下来 rdd.filter(lambda x:len(x)>2) sample(withReplacement, fraction, seed):以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子 textFile = sc.textFile("file:///usr/local/test/urls") textFile.sample(False,0.4,146) distinct():对数据去重 rdd.distinct(2) #去重后缩减为2个分区 coalesce(numPartitions):重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。 rdd.coalesce(3,True) repartition(numPartitions):根据分区数,重新通过网络随机洗牌所有数据 rdd = rdd.repartition(3) reduce(lambda a,b:a+b):每次相加两个元组然后产生新的rdd与下一位元组相加 reduceByKey:对元素RDD中Key相同的元素的Value进行reduce操作(去键重) rdd.map(lambda word:(word, 1)).reduceByKey(lambda x, y: x + y) groupBy:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器 rdd = textFile.groupBy(lambda x:x%2) groupByKey rdd=sc.parallelize(["one","two","two","three","three","three"]) rdd = rdd.map( lambda x:(x,1)).groupByKey().map(lambda x:(x[0],sum(x[1]))) rdd = rdd.sortBy(lambda x:x[1],False).map(lambda x:" ".join([str(i) for i in x])) count():是用来统计这个RDD文件里面有多少个元素 countByValue():是用于统计RDD键值对中每个键的数量 glom:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]] textFile=sc.parallelize(range(1,200)).repartition(4) rdd = textFile.map( lambda x:(x,1)) glom_rdd = rdd.glom() sc.parallelize(range(1,22)).repartition(3).glom().collect() mapValues rdd=sc.parallelize(["one","two","one"]) rdd = rdd.map( lambda x:(1,x)).mapValues(lambda x:x+x) #(1, 'oneone')(1, 'twotwo')(1, 'oneone') mapWith:map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数 sortBy() :使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序 rdd = rdd.sortBy(lambda x:x[0]).map(lambda x:" ".join([str(i) for i in x])) print(sc.parallelize([1,2,3,6,98,7,4,5]).sortBy(lambda x:x,False).collect()) #[98, 7, 6, 5, 4, 3, 2, 1] sortByKey():返回一个按照key进行排序的(K,V)的RDD rdd=sc.parallelize(["one","two","one","two","three","three","three"]) rdd = rdd.map( lambda x:(x,1)).sortByKey(True) #('one', 1)('one', 1)('two', 1)