zoukankan      html  css  js  c++  java
  • Spark2.0自定义累加器

    Spark2.0 自定义累加器

    在2.0中使用自定义累加器需要继承AccumulatorV2这个抽象类,同时必须对以下6个方法进行实现:

    1.reset 方法: 将累加器进行重置;

    abstract defreset(): Unit

    Resets this accumulator, which is zero value.

    2.add 方法: 向累加器中添加另一个值;

    abstract defadd(v: IN): Unit

    3.merge方法: 合并另一个类型相同的累加器;

    abstract defmerge(other: AccumulatorV2[IN, OUT]): Unit

    Merges another same-type accumulator into this one and update its state, i.e.

    4.value 取值

    abstract defvalue: OUT

    Defines the current value of this accumulator

    5.复制:Creates a new copy of this accumulator.

    abstract defcopy(): AccumulatorV2[IN, OUT]

    6.

    abstract defisZero: Boolean

    Returns if this accumulator is zero value or not.

    需要注意的是,对累加器的更新只有在action中生效,spark对累加器的每个task的更新只会应用一次,即重新启动的任务不会更新累加器的值.而在transform中需要注意,每个任务可能会多次进行更新,如果task或者job被重复执行.同时累加器不会改变spark的lazy策略.

    由于业务需求经常要构造若干Dataframe间数据的映射关系,而使用collectionAccumulator又要有一定量的重复性的Map操作, 故写了这个生成Map的自定义累加器,IN为代表key和value的String 类型的tuple,最后生成Map, 如果累加器中已经含有了要添加的key且 key->value不重复则以字符串||对value进行分隔,并更新累加器的值;

    代码如下:

    
    
    /**
    * Created by Namhwik on 2016/12/27.
    */
    class MapAccumulator extends AccumulatorV2[(String,String),mutable.Map[String, String]] {
    private val mapAccumulator = mutable.Map[String,String]()
    def add(keyAndValue:((String,String))): Unit ={
    val key = keyAndValue._1
    val value = keyAndValue._2
    if (!mapAccumulator.contains(key))
    mapAccumulator += key->value
    else if(mapAccumulator.get(key).get!=value) {
    mapAccumulator += key->(mapAccumulator.get(key).get+"||"+value)
    }
    }
    def isZero: Boolean = {
    mapAccumulator.isEmpty
    }
    def copy(): AccumulatorV2[((String,String)),mutable.Map[String, String]] ={
    val newMapAccumulator = new MapAccumulator()
    mapAccumulator.foreach(x=>newMapAccumulator.add(x))
    newMapAccumulator
    }
    def value: mutable.Map[String,String] = {
    mapAccumulator
    }
    def merge(other:AccumulatorV2[((String,String)),mutable.Map[String, String]]) = other match
    {
    case map:MapAccumulator => {
    other.value.foreach(x =>
    if (!this.value.contains(x._1))
    this.add(x)
    else
    x._2.split("\|\|").foreach(
    y => {
    if (!this.value.get(x._1).get.split("\|\|").contains(y))
    this.add(x._1, y)
    }
    )
    )
    }
    case _ =>
    throw new UnsupportedOperationException(
    s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
    }
    def reset(): Unit ={
    mapAccumulator.clear()
    }
    }
     

    参考 <http://spark.apache.org/docs/latest/programming-guide.html>

    ps:使用的时候需要register.

  • 相关阅读:
    光纤网卡与HBA卡区别
    Windows远程桌面相关
    port bridge enable命令导致的环路
    堡垒机jumpserver测试记录--使用
    堡垒机jumpserver测试记录--安装
    Centos6.5升级openssh、OpenSSL和wget
    linux抓包工具tcpdump使用总结
    iOS -视频缩略图的制作
    Mac 上视图的坐标系统原点位于左下角
    Mac
  • 原文地址:https://www.cnblogs.com/namhwik/p/6225153.html
Copyright © 2011-2022 走看看