zoukankan      html  css  js  c++  java
  • SparkCore系列(三)广播变量和累加器

    一:广播变量

    //广播变量其实就是将一个变量传播到每个excetor,实现excetor共享同一个只读变量.

    //其中有一个难题就是,动态广播变量.我在实验过程中只是实现了不同job的广播变量更改,对于有N分钟修改一次还没有试验出更好的方法

                 import org.apache.spark.api.java.JavaSparkContext
                 import org.apache.spark.SparkContext
                 import org.apache.spark.broadcast.Broadcast
                 import org.apache.spark.sql.SparkSession
                 import org.apache.spark.util.CollectionAccumulator

                 def main(args: Array[String]): Unit = {
                              val sparkSession = SparkSession.builder()//.master("local")
                                           .appName("WordCount").getOrCreate()
                              val sc = new JavaSparkContext(sparkSession.sparkContext).sc

                              var runStatus = true;
                              var broadcast:Broadcast[Long] = sc.broadcast(System.currentTimeMillis())
                              class ThreadExample(sc : SparkContext) extends Thread{
                                           override def run(): Unit ={
                                                        while(runStatus){
                                                                     val time = System.currentTimeMillis()
                                                                     broadcast.unpersist()
                                                                     broadcast = sc.broadcast(time)
                                                                     Thread.sleep(1000)
                                                        }
                                           }             
                              }
                              new ThreadExample(sc).start()
                              var firstrdd = sc.textFile("").map(x=>broadcast)
                              firstrdd.saveAsTextFile("")
                              var secondrdd = sc.textFile("").map(x=>broadcast)
                              firstrdd.saveAsTextFile("")

                              runStatus = false
                 }

    二:累加器

                //累加器实际上就是共享变量,实现多个excetor对同一份变量的多次操作

                val sparkSession = SparkSession.builder()//.master("local")
                            .appName("WordCount").getOrCreate()
                val sc = new JavaSparkContext(sparkSession.sparkContext).sc

                // 内置的累加器有三种,LongAccumulator、DoubleAccumulator、CollectionAccumulator// 内置的累加器有三种,LongAccumulator、DoubleAccumulator、                                    CollectionAccumulator

                // LongAccumulator: 数值型累加
                val longAccumulator = sc.longAccumulator("long-account")
                // DoubleAccumulator: 小数型累加
                val doubleAccumulator = sc.doubleAccumulator("double-account")
                // CollectionAccumulator:集合累加
                val collectionAccumulator:CollectionAccumulator[Int] = sc.collectionAccumulator("collection-account")

                var firstrdd = sc.textFile("s3://transsion-bigdata-warehouse/ods/athena_10410001/dt=20191210/hour=00/*")
                            .map(x=>{
                                        longAccumulator.value;
                                        doubleAccumulator.add(1)
                                        collectionAccumulator.add(1)
                                        x
                            })
                firstrdd.saveAsTextFile("s3://transsion-athena/test/push_msg/first")

                print("longAccumulator:"+longAccumulator.count)
                print("doubleAccumulator:"+doubleAccumulator.count)
                print("collectionAccumulator:"+collectionAccumulator.value.size())
                //longAccumulator:0 doubleAccumulator:2817336 collectionAccumulator:2817336

  • 相关阅读:
    Fortran编译器之一GUN Fortran安装(Windows XP)
    c++动态绑定的技术实现
    c++标准库比较
    java array
    java常用的基础容器
    mac sublime text 3 add ctags plugin
    git fetch
    查看远程分支的log
    git删除分支
    detached HEAD state
  • 原文地址:https://www.cnblogs.com/wuxiaolong4/p/12046722.html
Copyright © 2011-2022 走看看