zoukankan      html  css  js  c++  java
  • 初见spark-03(高级算子)

    最近心情不是很好,但是需要调节自己,真的需要调节自己,还是要努力,这个世界有我喜欢的人,有我追求的人,也许真的是守的住寂寞,耐得住繁华吧。

    不说别的了,今天我们来接受啊spark的高级算子的系列

      1.map是对每个元素操作, mapPartitions是对其中的每个partition操作

      2.mapPartitionsWithIndex:把每个partition中的分区号和对应的值拿出来, 看源码

        val func = (index: Int, iter: Iterator[(Int)]) => {

          iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator  
        }
        val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
        rdd1.mapPartitionsWithIndex(func).collect

      3.aggregate(聚合)

        def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {

          iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
        }
        val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
        rdd1.mapPartitionsWithIndex(func1).collect
        ###是action操作,

        第一个参数是初始值,

        二:是2个函数[每个函数都是2个参数(第一个参数:先对个个分区进行合并, 第二个:对个个分区合并后的结果再进行合并), 输出一个参数]
        ###0 + (0+1+2+3+4 + 0+5+6+7+8+9)
        rdd1.aggregate(0)(_+_, _+_)
        rdd1.aggregate(0)(math.max(_, _), _ + _)
        ###5和1比, 得5再和234比得5 --> 5和6789比,得9 --> 5 + (5+9)
        rdd1.aggregate(5)(math.max(_, _), _ + _)
        这个是5是初始值,每一个分区和5进行比较,选最大的,最后每一个分区相加的时候,在把5加上即可

        或者我们可以这样操作

        val arr=Array(1,2,3)
        arr.reduce(math.max(_,_))其中这个样子也是可以比大小的
        arr.reduce:这个的方法就是取出两个数据

        

        scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
        rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:27

        scala> rdd2.aggregate("")(_+_,_+_)
        res23: String = abcdef

        scala> rdd2.aggregate("|")(_+_,_+_)
        res24: String = ||abc|def

        

        val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)  
        def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
          iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
        }
        rdd2.aggregate("")(_ + _, _ + _)
        rdd2.aggregate("=")(_ + _, _ + _)

        

        val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
        rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)

        val rdd4 = sc.parallelize(List("12","23","345",""),2)
        rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
        结果为:01或者是10(比完之后,可以跟后一项的进行比较),所以才会出现1这个值

        val arr = Array("","12","23")
        arr.reduce((x:String,y:String) => math.main(x.length,y.length).toString)
        结果为:1

        val rdd5 = sc.parallelize(List("12","23","","345"),2)
        rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
        结果是:11,造成这个结果的是初始化的“”以及列表里面的“”的这两个一,构成的11

      4.aggregateByKey(这个现在局部的进行操作,然后可以全局的进行操作)

        val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

        def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
          iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
        }
        pairRDD.mapPartitionsWithIndex(func2).collect //可以查看分区的情况

        //可以把每一项数据头添加起来
        pairRDD.aggregateByKey(0)(_+_,_+_).collect
        pariRDD.reduceByKey(_+_).collect
        上述的两个方法的实现都是一样的,底层调用相同的函数

        pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
        pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect

       

  • 相关阅读:
    Javascript(js)分页程序
    Erlang gb_trees
    ASP.NET MVC 扩展之 JsonpResult ,用来提供轻量级跨域调用服务
    facebook folly编译脚本
    开源项目 XDD
    ObjectiveC——消息、Category和Protocol
    Lua编译执行与错误
    ASP.NET MVC 4 RC的JS/CSS打包压缩功能
    web插件化
    精益企业应用平台
  • 原文地址:https://www.cnblogs.com/wnbahmbb/p/6231491.html
Copyright © 2011-2022 走看看