zoukankan      html  css  js  c++  java
  • Spark学习之编程进阶——累加器与广播(5)

    Spark学习之编程进阶——累加器与广播(5)

    1. Spark中两种类型的共享变量:累加器(accumulator)与广播变量(broadcast variable)。累加器对信息进行聚合,而广播变量用来高效分发较大的对象。

    2. 共享变量是一种可以在Spark任务中使用的特殊类型的变量。

    3. 累加器的用法:

    • 通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumlator[T]对象,其中T是初始值initialValue的类型。
    • Spark闭包里的执行器代码可以使用累加器的+=方法(在Java中是add)增加累加器的值。
    • 驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue()来访问累加器的值。

      Python中实现累加空行

        file = sc.textFile(inputFile)
        #创建Accumulator[Int]并初始化为0
        blankLines = sc.accumulator(0)
    
        def extractCallSigns(Line):
            globle blankLines #访问全局变量
            if (line == ""):
                blankLines += 1
                return line.split("")
    
        callSigns = file.flatMap(extractCallSigns)
        callSigns.saveAsTextFile(outputDir + "/callsigns")
        print "Blank lines:%d" % blankLines.value

    4. Spark的广播变量,它可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。

    Scala代码使用广播变量查询国家
    
        //查询RDD contactCounts中的呼号的对应位置。将呼号前缀
        //读取为国家代码进行查询
        val signPrefixes = sc.broadcast(loadCallSignTable())
        val countryContactCounts = contactCounts.map{case (sign,count) =>
            val country = lookupInArray(sign,signPrefixes.value)
            (country,count)
            }.reduceByKey((x,y) => x+y)
            countryContactCounts.saveAsTextFile(outputDir + "/countries.text")

    5. Spark在RDD上提供pipe()方法。Spark的pipe()方法可以让我们使用任意一种语言实现Spark作业中的部分逻辑,只要它的读写Unix标准流就行。

  • 相关阅读:
    HDU 2201 熊猫阿波问题==金刚坐飞机问题
    HDU 2100 (模拟进制加法)
    HDU 2151 Worm
    qsort快速排序
    HDU 1007 (最近点对+qsort对结构体的排序!!!)
    HDU 1348 wall (简单凸包)
    HDU 1392 Surround the Trees(凸包模板)
    HDU 1431素数回文
    HDU 2108 Shape of HDU(判断拐点)
    HDU 2857 Mirror and Light(镜面反射模板)
  • 原文地址:https://www.cnblogs.com/lanzhi/p/6467797.html
Copyright © 2011-2022 走看看