zoukankan      html  css  js  c++  java
  • Spark 共享变量之——Accumulator(累加器)

    一、简介

    为了方便的统计和管理一些共同信息,Spark中定义了两种共享变量——Broadcast(广播变量)和 Accumulator(累加器),可以方便的把一些变量或数据共享给集群各个节点,今天来看看Accumulator。

    Accumulator是由Driver端总体维护的,读取当前值也是在Driver端,各个Task在其所在的Executor上也维护了Accumulator变量,但只是局部性累加操作,运行完会到Driver端去合并累加结果。Accumulator有两个性质:

    1、只可累加,合并即累加;

    2、不改变Spark作业Lazy执行的特点,也即没有action操作触发job的情况下累加器的值有可能是初始值。

    二、Accumulator的分类(Spark2.x):

    1、Spark自带类型的累加器

         (1)LongAccumulator(long类型的累加器累加整型)

         (2)DoubleAccumulator(double类型的累加器累加浮点型)

        (3)CollectionAccumulator(集合类型累加器累加集合元素)

    创建方式如下:

    LongAccumulator longAccumulator = sc.sc().longAccumulator("longAccumulator");//其中longAccumulator为该累加器在web UI上的名称

    2、自定义累加器 ——累加器类需继承AccumulatorV2抽象类

    需要实现其中add()方法、Merge()方法、value()方法等必要和非必要方法;

    以下我实现了一个字符串拼接的自定义累加器:

    package com.renyang.sparkproject.spark.session;
    
    import com.renyang.sparkproject.constant.Constants;
    import com.renyang.sparkproject.util.StringUtils;
    import org.apache.spark.util.AccumulatorV2;
    
    public class SessionAggrStatAccumulatorV2 extends AccumulatorV2<String, String> {
        private static final long serialVersionUID = 6311074555136039130L;
    
        private String data = "session_count=0|1s_3s=0|4s_6s=3|7s_9s=0|10s_30s=0|30s_60s=0|1m_3m=0|3m_10m=0|10m_30m=0|30m=0|1_3=0|4_6=1|7_9=0|10_30=0|30_60=0|60=0";
    
        private String zero = data;
    
        @Override
        public boolean isZero() {
            return data.equals(zero);
        }
    
        @Override
        public AccumulatorV2<String, String> copy() {
            return new SessionAggrStatAccumulatorV2();
        }
    
        @Override
        public void reset() {
            data = zero;
        }
    
        public void add(String v) {
            data = add(data, v);
        }
    
        @Override
        public void merge(AccumulatorV2<String, String> other) {
            SessionAggrStatAccumulatorV2 o =(SessionAggrStatAccumulatorV2)other;
            String[] words = data.split("\|");
            String[] owords = o.data.split("\|");
            for (int i = 0; i < words.length; i++) {
                for (int j = 0; j < owords.length; j++) {
                    if (words[i].split("=")[0].equals(owords[j].split("=")[0])){
                        int value = Integer.valueOf(words[i].split("=")[1]) +Integer.valueOf(owords[j].split("=")[1]);
                        String ns = StringUtils.setFieldInConcatString(data, "\|", owords[j].split("=")[0], String.valueOf(value));
                        //每次合并完,更新str
                        data = ns;
                    }
                }
            }
        }
    
        @Override
        public String value() {
            return data;
        }
    
        /**
         * session统计计算逻辑
         * @param v1 连接串
         * @param v2 范围区间
         * @return 更新以后的连接串
         */
        private String add(String v1, String v2) {
            // 校验:v1为空的话,直接返回v2
            if(StringUtils.isEmpty(v1)) {
                return v2;
            }
    
            // 使用StringUtils工具类,从v1中,提取v2对应的值,并累加1
            String oldValue = StringUtils.getFieldFromConcatString(v1, "\|", v2);
            if(oldValue != null) {
                // 将范围区间原有的值,累加1
                int newValue = Integer.valueOf(oldValue) + 1;
                // 使用StringUtils工具类,将v1中,v2对应的值,设置成新的累加后的值
                return StringUtils.setFieldInConcatString(v1, "\|", v2, String.valueOf(newValue));
            }
    
            return v1;
        }
    }

     三、Accumulator的运行逻辑

    1、Driver端负责定义和注册累加器

    累加器在Driver端被定义并初始化,同时需要注册入SparkContext,这样才能将累加器变量分发到集群各个节点,等到各个Task运行完之后会回收累加器结果进行Driver端合并,这个合并的过程是根据Task执行情况而定,只要有完成的Task就会更新累加器变量。

    2、Executor端

    Executor接收到Task之后,不但会反序列化RDD和Function,还会反序列化Accumulator,当Executor执行完Task之后,会将结果随同Accumulator一起返回给Driver端。

  • 相关阅读:
    蚂蚁问题
    LinuxC安装gcc
    怎样在VC里面使用graphics.h绘图
    C语言之固定格式输出当前时间
    C语言之猜数字游戏
    C语言之新同学年龄
    C语言之ASCII码
    C语言之辗转相除法
    C语言之自守数
    C语言之一数三平方
  • 原文地址:https://www.cnblogs.com/renyang/p/12606725.html
Copyright © 2011-2022 走看看