zoukankan      html  css  js  c++  java
  • SparkCore系列(三)广播变量和累加器

    一:广播变量

    //广播变量其实就是将一个变量传播到每个excetor,实现excetor共享同一个只读变量.

    //其中有一个难题就是,动态广播变量.我在实验过程中只是实现了不同job的广播变量更改,对于有N分钟修改一次还没有试验出更好的方法

                 import org.apache.spark.api.java.JavaSparkContext
                 import org.apache.spark.SparkContext
                 import org.apache.spark.broadcast.Broadcast
                 import org.apache.spark.sql.SparkSession
                 import org.apache.spark.util.CollectionAccumulator

                 def main(args: Array[String]): Unit = {
                              val sparkSession = SparkSession.builder()//.master("local")
                                           .appName("WordCount").getOrCreate()
                              val sc = new JavaSparkContext(sparkSession.sparkContext).sc

                              var runStatus = true;
                              var broadcast:Broadcast[Long] = sc.broadcast(System.currentTimeMillis())
                              class ThreadExample(sc : SparkContext) extends Thread{
                                           override def run(): Unit ={
                                                        while(runStatus){
                                                                     val time = System.currentTimeMillis()
                                                                     broadcast.unpersist()
                                                                     broadcast = sc.broadcast(time)
                                                                     Thread.sleep(1000)
                                                        }
                                           }             
                              }
                              new ThreadExample(sc).start()
                              var firstrdd = sc.textFile("").map(x=>broadcast)
                              firstrdd.saveAsTextFile("")
                              var secondrdd = sc.textFile("").map(x=>broadcast)
                              firstrdd.saveAsTextFile("")

                              runStatus = false
                 }

    二:累加器

                //累加器实际上就是共享变量,实现多个excetor对同一份变量的多次操作

                val sparkSession = SparkSession.builder()//.master("local")
                            .appName("WordCount").getOrCreate()
                val sc = new JavaSparkContext(sparkSession.sparkContext).sc

                // 内置的累加器有三种,LongAccumulator、DoubleAccumulator、CollectionAccumulator// 内置的累加器有三种,LongAccumulator、DoubleAccumulator、                                    CollectionAccumulator

                // LongAccumulator: 数值型累加
                val longAccumulator = sc.longAccumulator("long-account")
                // DoubleAccumulator: 小数型累加
                val doubleAccumulator = sc.doubleAccumulator("double-account")
                // CollectionAccumulator:集合累加
                val collectionAccumulator:CollectionAccumulator[Int] = sc.collectionAccumulator("collection-account")

                var firstrdd = sc.textFile("s3://transsion-bigdata-warehouse/ods/athena_10410001/dt=20191210/hour=00/*")
                            .map(x=>{
                                        longAccumulator.value;
                                        doubleAccumulator.add(1)
                                        collectionAccumulator.add(1)
                                        x
                            })
                firstrdd.saveAsTextFile("s3://transsion-athena/test/push_msg/first")

                print("longAccumulator:"+longAccumulator.count)
                print("doubleAccumulator:"+doubleAccumulator.count)
                print("collectionAccumulator:"+collectionAccumulator.value.size())
                //longAccumulator:0 doubleAccumulator:2817336 collectionAccumulator:2817336

  • 相关阅读:
    区别@ControllerAdvice 和@RestControllerAdvice
    Cannot determine embedded database driver class for database type NONE
    使用HttpClient 发送 GET、POST、PUT、Delete请求及文件上传
    Markdown语法笔记
    Property 'sqlSessionFactory' or 'sqlSessionTemplate' are required
    Mysql 查看连接数,状态 最大并发数(赞)
    OncePerRequestFilter的作用
    java连接MySql数据库 zeroDateTimeBehavior
    Intellij IDEA 安装lombok及使用详解
    ps -ef |grep xxx 输出的具体含义
  • 原文地址:https://www.cnblogs.com/wuxiaolong4/p/12046722.html
Copyright © 2011-2022 走看看