zoukankan      html  css  js  c++  java
  • SparkRDD基本操作

    一)新建maven项目,引入如下pom文件<dependency>

        <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.2.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.10</artifactId>
    <version>2.2.2</version>
    <scope>runtime</scope>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.2.2</version>
    </dependency>

    二)代码示例及讲解如下:
    import org.apache.spark.{SparkConf, SparkContext}

    /**
    * Created by fishjar on 2018/9/8.
    */
    object RddOperate {
    def main(args: Array[String]): Unit = {
    //1)创建SparkContext

    val conf = new SparkConf().setAppName("RddOperate").setMaster("local[2]")
    val sc = new SparkContext(conf)

    //1.1 RDD 创建
    val data=Array(1,2,3,4,5,6,7,8,9)
    val disData=sc.parallelize(data,3)

    disData.foreach(println)
    val distFile1=sc.textFile("D:\test\SparkRdd_Test\01.txt")
    val distFile2=sc.textFile("D:\test\SparkRdd_Test\01.txt")
    val distFile3=sc.textFile("D:\test\SparkRdd_Test\01.txt")
    val distFile4=sc.textFile("D:\test\SparkRdd_Test\01.txt")
    val distFile5=sc.textFile("D:\test\SparkRdd_Test\01.txt")
    val distFile6=sc.textFile("D:\test\SparkRdd_Test\01.txt")
    val distFile7=sc.textFile("D:\test\SparkRdd_Test\word.txt")

    //单词统计
    val words = distFile7.flatMap{(line => line.split(" "))}
    val pairs = words.map { word => (word,1) }
    val wordCount = pairs.reduceByKey(_+_)
    println("*****wordCount")
    wordCount.foreach(println)


    //1.2 RDD转换操作
    println("**************1.2.1 map**********************")
    // map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD;RDD之间的元素是一对一关系;
    val rdd1=sc.parallelize(1 to 9,3)
    val rdd2=rdd1.map(x => x * 2)
    rdd2.collect()
    rdd2.foreach(println)

    println("*****************1.2.2 filter**************")
    // Filter是对RDD元素进行过滤;返回一个新的数据集,是经过func函数后返回值为true的原元素组成;
    val rdd3 = rdd2.filter(x => x > 10)
    rdd3.foreach(println)

    println("********1.2.3 filterMap******")
    // flatMap类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素),
    // RDD之间的元素是一对多关系;
    // res5: Array[Int] = Array(12, 13, 14, 15, 16, 17, 18, 19, 20, 14, 15, 16, 17, 18, 19, 20, 16, 17, 18, 19, 20, 18, 19, 20)
    //
    val rdd4=rdd3.flatMap(x => x to 20)
    rdd4.foreach(println)

    println("**********1.2.4 mapPartitions************")
    //rdd1 数据是1~9 ,三个分区,每个分区三个数(1,2,3)(4,5,6)(7,8,9)
    // mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是每个分区的数据,也就是
    // 把每个分区中的内容作为整体来处理的
    val rdd5=rdd1.mapPartitions(myfunc)
    rdd5.collect()
    rdd5.foreach(println)

    println("***************1.2.5 sample*********************")
    // sample(withReplacement,fraction,seed)是根据给定的随机种子seed,随机抽样出数量为frac的数据。withReplacement:是否放回抽
    // 样;fraction:比例,0.1表示10% ;
    val a = sc.parallelize(1 to 1000,3)
    val b= a.sample(false,0.1,0).count()
    println("b的值:"+b)

    println("***************1.2.6 union***********************")
    // union(otherDataset)是数据合并,返回一个新的数据集,由原数据集和otherDataset联合而成。
    val rdd6=rdd1.union(rdd4)
    rdd6.foreach(println)

    println("***************1.2.7 intersection 交集****************************")
    // intersection(otherDataset)是数据交集,返回一个新的数据集,包含两个数据集的交集数据;
    val rdd7_1 =sc.parallelize(1 to 10,3)
    val rdd7_2 =rdd7_1.filter(x => x %2 ==0)
    val rdd7=rdd7_1.intersection(rdd7_2)
    rdd7.foreach(println)

    println("************1.2.8 distinct*********")
    // distinct([numTasks]))是数据去重,返回一个数据集,是对两个数据集去除重复数据,numTasks参数是设置任务并行数量。
    val rdd8_1 =sc.parallelize(1 to 10,3)
    val rdd8_2 = sc.parallelize(5 to 12 ,3)
    val rdd8 = rdd8_1.union(rdd8_2).distinct()
    rdd8.collect()
    rdd8.foreach(println)


    println("***********1.2.9 group by key***********")
    // groupByKey([numTasks])是数据分组操作,在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。
    val rdd9_1 = sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,4),(3,1),(3,5)),3)
    val rdd9_2 =rdd9_1.groupByKey()
    print(rdd9_2)
    rdd9_2.foreach(println)

    println("**********1.2.10 reduce by key**********")
    // reduceByKey(func, [numTasks])是数据分组聚合操作,在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,
    // 都被使用指定的reduce函数聚合到一起。
    val rdd10_1 = sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,4),(3,1),(3,5)),3)
    val rdd10_2 =rdd10_1.reduceByKey(_+_)
    val rdd10_3 =rdd10_1.reduceByKey((x,y) => x+y)
    rdd10_2.foreach(println)
    rdd10_3.foreach(x => println(x))


    println("****************1.2.11 aggregateByKey*********************")
    // aggreateByKey(zeroValue: U)(seqOp: (U, T)=> U, combOp: (U, U) =>U) 和reduceByKey的不同在于,reduceByKey输入输出都是(K,
    // V),而aggreateByKey输出是(K,U),可以不同于输入(K, V) ,aggreateByKey的三个参数:
    // zeroValue: U,初始值,比如空列表{} ;
    // seqOp: (U,T)=> U,seq操作符,描述如何将T合并入U,比如如何将item合并到列表 ;
    // combOp: (U,U) =>U,comb操作符,描述如果合并两个U,比如合并两个列表 ;
    // 所以aggreateByKey可以看成更高抽象的,更灵活的reduce或group 。
    val rdd11_1 =sc.parallelize(List((1,3),(1,2),(1,4),(2,5),(2,7),(3,4),(3,1)),1)
    val rdd11_2 = rdd11_1.aggregateByKey(0)(math.max(_,_),_+_).collect()
    rdd11_2.foreach(println)
    // 如果两个分区结果是:
    // (2,7)
    // (1,4)
    // (3,4)
    // 如果三个分区结果是:
    // (3,4)
    // (1,7)
    // (2,12)
    // 如果四个分区结果是:
    // (1,7)
    // (2,7)
    // (3,4)

    println("************1.2.12 combineByKey****************************")
    // 其中的参数:
    // createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C
    // mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C
    // mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C
    // numPartitions:结果RDD分区数,默认保持原有的分区数
    // partitioner:分区函数,默认为HashPartitioner
    // mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

    val rdd12_1 = Array((1,1.0),(1,2.0),(1,3.0),(2,4.0),(2,5.0),(3,6.0),(3,7.0),(3,8.0),(3,1.0))
    val rdd12_2 = sc.parallelize(rdd12_1,2)
    val rdd12_3=rdd12_2.combineByKey(createCombiner = (v:Double) => (v: Double, 1),
    mergeValue = (c: (Double,Int), v: Double) => (c._1 + v, c._2 + 1),
    mergeCombiners =(c1: (Double,Int),c2: (Double,Int)) =>(c1._1 + c2._1,c1._2 +c2._2),
    numPartitions =2)
    rdd12_3.collect()
    rdd12_3.foreach(println)


    println("****************1.2.13 treeAggregate**********")
    val treeAggregate_1 = rdd12_2.treeAggregate((0,0.0))(seqOp = ((u,t) => (u._1 + t._1,u._2 + t._2)),
    combOp =(u1,u2) => (u1._1 + u2._1,u1._2 +u2._2),
    depth =2)
    println(treeAggregate_1.toString())



    println("***********1.2.14 Sort by key****************")
    // sortByKey([ascending],[numTasks])是排序操作,对(K,V)类型的数据按照K进行排序,其中K需要实现Ordered方法。
    val rdd13_1 = Array((1,1),(1,2),(1,3),(2,1),(2,2),(3,5),(3,2),(3,2),(3,4))
    val rdd13_2 = sc.parallelize(rdd13_1,3)
    rdd13_2.sortByKey(true)
    rdd13_2.foreach(println)

    println("****************1.2.15 join**********************")
    // join(otherDataset, [numTasks])是连接操作,将输入数据集(K,V)和另外一个数据集(K,W)进行Join, 得到(K, (V,W));该操作是对于相同K
    // 的V和W集合进行笛卡尔积 操作,也即V和W的所有组合;
    val rdd14_1 =sc.parallelize(List((1,2),(1,3),(2,4),(2,5)))
    val rdd14_2 =sc.parallelize(List((1,10),(1,13),(2,14),(2,15)))
    val rdd14_3 =rdd14_1.join(rdd14_2)
    rdd14_3.foreach(println)
    // 输出结果:
    // (1,(2,10))
    // (1,(2,13))
    // (1,(3,10))
    // (1,(3,13))
    // (2,(4,14))
    // (2,(4,15))
    // (2,(5,14))
    // (2,(5,15))

    println("***************1.2.16 cogroup****************")
    // cogroup(otherDataset, [numTasks])是将输入数据集(K, V)和另外一个数据集(K, W)进行cogroup,得到一个格式为(K, Seq[V], Seq[W])
    // 的数据集。
    val rdd15_1 =sc.parallelize(List((1,2),(1,3),(2,4),(2,5)))
    val rdd15_2 =sc.parallelize(List((1,10),(1,13),(2,14),(2,15)))
    val rdd15_3 =rdd14_1.cogroup(rdd14_2)
    rdd15_3.foreach(println)
    // 输出结果:
    // (2,(CompactBuffer(4, 5),CompactBuffer(14, 15)))
    // (1,(CompactBuffer(2, 3),CompactBuffer(10, 13)))

    println("***************1.2.17 cartesian****************")
    // cartesian(otherDataset)是做笛卡尔积:对于数据集T和U 进行笛卡尔积操作, 得到(T, U)格式的数据集。
    val rdd16_1 =sc.parallelize(List((1,2),(1,3),(2,4),(2,5)))
    val rdd16_2 =sc.parallelize(List((1,10),(1,13),(2,14),(2,15)))
    val rdd16_3 =rdd14_1.cartesian(rdd14_2)
    rdd16_3.foreach(println)
    // 结果输出为:
    // ((1,2),(2,14))
    // ((1,2),(2,15))
    // ((1,2),(1,10))
    // ((1,2),(1,13))
    // ((1,3),(2,14))
    // ((1,3),(2,15))
    // ((1,3),(1,10))
    // ((1,3),(1,13))
    // ((2,4),(2,14))
    // ((2,4),(2,15))
    // ((2,4),(1,10))
    // ((2,4),(1,13))
    // ((2,5),(2,14))
    // ((2,5),(2,15))
    // ((2,5),(1,10))
    // ((2,5),(1,13))

    // val rdd17_1 =sc.parallelize(1 to 10,3)
    // val rdd17_2 = rdd17_1.pipe("head -n 1").collect()
    // rdd17_2.foreach(println)

    println("***************randomSplit*******************")
    val rdd18_1 = sc.parallelize(1 to 10,3)
    val rdd18_2 = rdd18_1.randomSplit(Array(0.1,0.9),1)
    println("rdd18_2 0")
    rdd18_2(0).foreach(println)
    println("rdd18_2 1")
    rdd18_2(1).foreach(println)

    println("****************substract******************************")
    val rdd19_1 = sc.parallelize(1 to 10,3)
    val rdd19_2 = sc.parallelize(1 to 4,3)
    val rdd19_3 =rdd19_1.subtract(rdd19_2)
    rdd19_3.foreach(println)

    println("********************zip***********************************")
    val rdd20_1 =sc.parallelize(Array(1,2,3,4,5,6),3)
    val rdd20_2 =sc.parallelize(Array('a','b','c','d','e','f'),3)
    val rdd20_3 =rdd20_1.zip(rdd20_2)
    rdd20_3.foreach(println)
    // 输出结果:
    // (1,a)
    // (2,b)
    // (3,c)
    // (4,d)

    println("*****************RDD Action操作***************")
    val rdd21_1 =sc.parallelize(1 to 10,3)
    // reduce(func)是对数据集的所有元素执行聚集(func)函数,该函数必须是可交换的。
    val rdd21_2 =rdd21_1.reduce(_+_)
    println("reduce")
    println(rdd21_2)
    println("first")
    println(rdd21_1.first())
    println("take")
    rdd21_1.take(3).foreach(println)
    println("**********************takeSample")
    // takeSample(withReplacement,num, [seed])返回包含随机的num个元素的数组,和Sample不同,takeSample 是行动操作,所以返回
    // 的是数组而不是RDD , 其中第一个参数withReplacement是抽样时是否放回,第二个参数num会精确指定抽样数,而不是比例。
    rdd21_1.takeSample(true,5).foreach(println)
    println("takeOrdered")
    // takeOrdered(n, [ordering])是返回包含随机的n个元素的数组,按照顺序输出。
    rdd21_1.takeOrdered(4).foreach(println)
    println("********saveAsTextFile")
    // 把数据集中的元素写到一个文本文件,Spark会对每个元素调用toString方法来把每个元素存成文本文件的一行。
    println("********countByKey")
    // 对于(K, V)类型的RDD. 返回一个(K, Int)的map, Int为K的个数。
    println("**********foreach")
    // foreach(func)是对数据集中的每个元素都执行func函数。


    }

    def myfunc[T](iter:Iterator[T]):Iterator[(T,T)]={
    var res=List[(T,T)]()
    var pre =iter.next
    while(iter.hasNext){
    val cur=iter.next
    res.::=(pre,cur)
    pre=cur
    }
    res.iterator
    }
    }
  • 相关阅读:
    Leetcode 16.25 LRU缓存 哈希表与双向链表的组合
    Leetcode437 路径总和 III 双递归与前缀和
    leetcode 0404 二叉树检查平衡性 DFS
    Leetcode 1219 黄金矿工 暴力回溯
    Leetcode1218 最长定差子序列 哈希表优化DP
    Leetcode 91 解码方法
    Leetcode 129 求根到叶子节点数字之和 DFS优化
    Leetcode 125 验证回文串 双指针
    Docker安装Mysql记录
    vmware虚拟机---Liunx配置静态IP
  • 原文地址:https://www.cnblogs.com/fishjar/p/9612666.html
Copyright © 2011-2022 走看看