zoukankan      html  css  js  c++  java
  • pyspark基本使用

    【Example】

    from pysoark. sql import SparkSession
    def split_line(line):
        try:
              return line.split(b"	")
         except:pass
    
    def map_partitions(partitions):
        for line in partitions:
            yield split_line(line)
    
    if __name__ == "__main__":
    spark=SparkSession. builder. appName("pyspark").getOrCreate()
    sc=spark. sparkContext
    output="/usr/local/output"
    red = sc. textFile(input,use_unicode=False) #bytes格式加载
    red. mapPartitions(map_partitions)
    .filter(lambda line:line)
    .coalesce(10)
    .saveAsTextFile(output)
    spark. stop()
    

    加载数据与保存数据

    rdd = sc.textFile('/user/hadoop/*')
    rdd = sc.parallelize([1,2,3,4,5])
    rdd.collect()
    rdd.collect()
    rdd.saveAsTextFile("file:///usr/local/test/urls")
    

     基本函数

    map():是将文件每一行进行操作,数量不会改变
    mapPartitions():类似map,func的函数类型必须是Iterator,应用于每个分区,也就是把每个分区中的内容作为整体来处理的
    mapPartitionsWithIndex(func):类似于mapPartitions,但func带有一个整数参数表示分片的索引值
        rdd = textFile.map( lambda x:(x,1))
        rdd = textFile.mapPartitions(lambda data:[(x,1) for x in data])
        rdd = textFile.mapPartitionsWithIndex(lambda n,data:[(x,1,"分区"+str(n)) for x in data])
        rdd = rdd.reduceByKey(lambda a,b:a+b)
    flatMap():是将所有元素进行操作,数量只会大于或者等于初始数量
        rdd=sc.parallelize(["hello_tom","very_good","how_are_you"])
        def split_text(rdd):
            return rdd.split("_")
        rdd.flatMap(split_text).collect()
    filter():过滤,将符合条件的数据留下来
        rdd.filter(lambda x:len(x)>2)
    sample(withReplacement, fraction, seed):以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子
        textFile = sc.textFile("file:///usr/local/test/urls")
        textFile.sample(False,0.4,146)
    distinct():对数据去重
        rdd.distinct(2) #去重后缩减为2个分区
    coalesce(numPartitions):重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
        rdd.coalesce(3,True) 
    repartition(numPartitions):根据分区数,重新通过网络随机洗牌所有数据
        rdd = rdd.repartition(3)
    reduce(lambda a,b:a+b):每次相加两个元组然后产生新的rdd与下一位元组相加
    reduceByKey:对元素RDD中Key相同的元素的Value进行reduce操作(去键重)
        rdd.map(lambda word:(word, 1)).reduceByKey(lambda x, y: x + y)
    groupBy:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器
        rdd = textFile.groupBy(lambda x:x%2)
    groupByKey
        rdd=sc.parallelize(["one","two","two","three","three","three"])
        rdd = rdd.map( lambda x:(x,1)).groupByKey().map(lambda x:(x[0],sum(x[1])))
        rdd = rdd.sortBy(lambda x:x[1],False).map(lambda x:"	".join([str(i) for i in x]))
        
    count():是用来统计这个RDD文件里面有多少个元素
    countByValue():是用于统计RDD键值对中每个键的数量
    glom:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
        textFile=sc.parallelize(range(1,200)).repartition(4)
        rdd = textFile.map( lambda x:(x,1))
        glom_rdd = rdd.glom()
        sc.parallelize(range(1,22)).repartition(3).glom().collect()
    mapValues
        rdd=sc.parallelize(["one","two","one"])
        rdd = rdd.map( lambda x:(1,x)).mapValues(lambda x:x+x)
        #(1, 'oneone')(1, 'twotwo')(1, 'oneone')
    mapWith:map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数
    sortBy() :使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序
        rdd = rdd.sortBy(lambda x:x[0]).map(lambda x:"	".join([str(i) for i in x]))
        print(sc.parallelize([1,2,3,6,98,7,4,5]).sortBy(lambda x:x,False).collect()) #[98, 7, 6, 5, 4, 3, 2, 1]
    sortByKey():返回一个按照key进行排序的(K,V)的RDD
        rdd=sc.parallelize(["one","two","one","two","three","three","three"])
        rdd = rdd.map( lambda x:(x,1)).sortByKey(True)
        #('one', 1)('one', 1)('two', 1)
    

      

  • 相关阅读:
    Git for windows(Msysgit)中文乱码
    DB2嵌入式编程,语句“EXEC SQL INCLUDE”说明
    《DB2 最佳实践: 性能调优和问题诊断最佳实践,第 1 部分》阅读笔记
    android sdk setup时出现:Failed to fetch URL...
    Git GUI启动报错问题记录
    安全快门
    在VFP6中模拟CursorAdapter的功能
    调试asp.net网页时不显示treeview的原因
    XP机器上WCF采用X509证书加密时IIS读取证书的授权
    在SQLSERVER2008中建立数据库复制碰到的问题
  • 原文地址:https://www.cnblogs.com/boye169/p/14540837.html
Copyright © 2011-2022 走看看