zoukankan      html  css  js  c++  java
  • Spark 累加器

    由于spark是分布式的计算,所以使得每个task间不存在共享的变量,而为了实现共享变量spark实现了两种类型 - 累加器与广播变量,

    对于其概念与理解可以参考:共享变量(广播变量和累加器)  。可能需要注意:Spark累加器(Accumulator)陷阱及解决办法

    因此,我们便可以利用累加器与广播变量来构造一些比较常用的关系,以Map的形式广播出去,提高效率。

    如下通过累加器构造了一个DF数据间
    的映射关系,

    defgetMap(spark:SparkSession,data:DataFrame){
    //通过collectionAccumulator构造Map关系
    valmyAccumulator=spark.sparkContext.collectionAccumulator[(String,Long)]
    data.foreach(
    row=>{
    valname=row.getAs[String]("name")
    valage=row.getAs[Long]("age")
    myAccumulator.add(name,age)
    }
    )
    valaiterator:util.Iterator[(String,Long)]=myAccumulator.value.iterator()
    varnewMap:Map[String,Long]=Map()
    while(aiterator.hasNext){
    vala=aiterator.next()
    valkey=a._1
    valvalue=a._2
    if(!newMap.contains(key)){
    newMap+=(key->value)
    }
    else{
    valoldvalue=newMap(key)
    newMap+=(key->(oldvalue+value))
    }
    }
    }
  • 相关阅读:
    Linux搭建JAVAWEB环境
    java异常捕获
    java流2
    java流
    32
    java代码List
    java代码输入流篇2
    java流类练习前篇
    java流网址:
    java中i/o练习
  • 原文地址:https://www.cnblogs.com/namhwik/p/6060509.html
Copyright © 2011-2022 走看看