zoukankan      html  css  js  c++  java
  • spark2.1 自定义累加器的使用

    spark2.1 自定义累加器的使用

    • 继承AccumulatorV2类,并复写它的所有方法

      package spark
      
      import constant.Constant
      import org.apache.spark.util.AccumulatorV2
      import util.getFieldFromConcatString
      import util.setFieldFromConcatString
      
      
      open class SessionAccmulator : AccumulatorV2<String, String>() {
      
      
      
      
          private var result = Constant.SESSION_COUNT + "=0|"+
                  Constant.TIME_PERIOD_1s_3s + "=0|"+
                  Constant.TIME_PERIOD_4s_6s + "=0|"+
                  Constant.TIME_PERIOD_7s_9s + "=0|"+
                  Constant.TIME_PERIOD_10s_30s + "=0|"+
                  Constant.TIME_PERIOD_30s_60s + "=0|"+
                  Constant.TIME_PERIOD_1m_3m + "=0|"+
                  Constant.TIME_PERIOD_3m_10m + "=0|"+
                  Constant.TIME_PERIOD_10m_30m + "=0|"+
                  Constant.TIME_PERIOD_30m + "=0|"+
                  Constant.STEP_PERIOD_1_3 + "=0|"+
                  Constant.STEP_PERIOD_4_6 + "=0|"+
                  Constant.STEP_PERIOD_7_9 + "=0|"+
                  Constant.STEP_PERIOD_10_30 + "=0|"+
                  Constant.STEP_PERIOD_30_60 + "=0|"+
                  Constant.STEP_PERIOD_60 + "=0"
      
          override fun value(): String {
              return this.result
          }
      
          /**
           * 合并数据
           */
          override fun merge(other: AccumulatorV2<String, String>?) {
              if (other == null) return else {
                  if (other is SessionAccmulator) {
                      var newResult = ""
                      val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,
                              Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,
                              Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,
                              Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,
                              Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)
                      resultArray.forEach {
                          val oldValue = other.result.getFieldFromConcatString("|", it)
                          if (oldValue.isNotEmpty()) {
                              val newValue = oldValue.toInt() + 1
                              //找到原因,一直在循环赋予值,debug30分钟 很烦
                              if (newResult.isEmpty()){
                                  newResult = result.setFieldFromConcatString("|", it, newValue.toString())
                              }
                              //问题就在于这里,自定义没有写错,合并错了
                              newResult = newResult.setFieldFromConcatString("|", it, newValue.toString())
                          }
                      }
                      result = newResult
                  }
              }
          }
      
          override fun copy(): AccumulatorV2<String, String> {
              val sessionAccmulator = SessionAccmulator()
      
              sessionAccmulator.result = this.result
              return sessionAccmulator
          }
      
          override fun add(p0: String?) {
              val v1 = this.result
              val v2 = p0
              if (v2.isNullOrEmpty()){
                  return
              }else{
                  var newResult = ""
                  val oldValue = v1.getFieldFromConcatString("|", v2!!)
                  if (oldValue.isNotEmpty()){
                      val newValue = oldValue.toInt() + 1
                      newResult = result.setFieldFromConcatString("|", v2, newValue.toString())
      
                  }
                  result = newResult
              }
          }
      
          override fun reset() {
              val newResult = Constant.SESSION_COUNT + "=0|"+
                      Constant.TIME_PERIOD_1s_3s + "=0|"+
                      Constant.TIME_PERIOD_4s_6s + "=0|"+
                      Constant.TIME_PERIOD_7s_9s + "=0|"+
                      Constant.TIME_PERIOD_10s_30s + "=0|"+
                      Constant.TIME_PERIOD_30s_60s + "=0|"+
                      Constant.TIME_PERIOD_1m_3m + "=0|"+
                      Constant.TIME_PERIOD_3m_10m + "=0|"+
                      Constant.TIME_PERIOD_10m_30m + "=0|"+
                      Constant.TIME_PERIOD_30m + "=0|"+
                      Constant.STEP_PERIOD_1_3 + "=0|"+
                      Constant.STEP_PERIOD_4_6 + "=0|"+
                      Constant.STEP_PERIOD_7_9 + "=0|"+
                      Constant.STEP_PERIOD_10_30 + "=0|"+
                      Constant.STEP_PERIOD_30_60 + "=0|"+
                      Constant.STEP_PERIOD_60 + "=0"
              result = newResult
          }
      
          override fun isZero(): Boolean {
              val newResult = Constant.SESSION_COUNT + "=0|"+
                      Constant.TIME_PERIOD_1s_3s + "=0|"+
                      Constant.TIME_PERIOD_4s_6s + "=0|"+
                      Constant.TIME_PERIOD_7s_9s + "=0|"+
                      Constant.TIME_PERIOD_10s_30s + "=0|"+
                      Constant.TIME_PERIOD_30s_60s + "=0|"+
                      Constant.TIME_PERIOD_1m_3m + "=0|"+
                      Constant.TIME_PERIOD_3m_10m + "=0|"+
                      Constant.TIME_PERIOD_10m_30m + "=0|"+
                      Constant.TIME_PERIOD_30m + "=0|"+
                      Constant.STEP_PERIOD_1_3 + "=0|"+
                      Constant.STEP_PERIOD_4_6 + "=0|"+
                      Constant.STEP_PERIOD_7_9 + "=0|"+
                      Constant.STEP_PERIOD_10_30 + "=0|"+
                      Constant.STEP_PERIOD_30_60 + "=0|"+
                      Constant.STEP_PERIOD_60 + "=0"
              return this.result == newResult
          }
      }
      
      方法介绍

      value方法:获取累加器中的值

           merge方法:该方法特别重要,一定要写对,这个方法是各个task的累加器进行合并的方法(下面介绍执行流程中将要用到)

            iszero方法:判断是否为初始值

            reset方法:重置累加器中的值

            copy方法:拷贝累加器

    spark中累加器的执行流程:

              首先有几个task,spark engine就调用copy方法拷贝几个累加器(不注册的),然后在各个task中进行累加(注意在此过程中,被最初注册的累加器的值是不变的),执行最后将调用merge方法和各个task的结果累计器进行合并(此时被注册的累加器是初始值)      

  • 相关阅读:
    java 数据结构(八):Iterator接口与foreach循环
    java 数据结构(七):Collection接口
    java 数据结构(六):数组与集合
    java中equals(),hashcode()和==的区别
    Android中的IPC方式
    Android-如何防止apk被反编译
    Android-管理应用的内存(转)
    102. Binary Tree Level Order Traversal
    对于开发团队管理的理解
    TCP三次握手和四次挥手的全过程
  • 原文地址:https://www.cnblogs.com/zhangweilun/p/6684776.html
Copyright © 2011-2022 走看看