zoukankan      html  css  js  c++  java
  • 031 广播变量与累加器

    1.广播变量机制

      将传递给task的值,变成传递给executor。

      为什么可以共用,因为task是executor下的线程。

      只读的变量,在task中不允许修改

      

    2.累加器介绍

      在只写的变量,在task中只允许被修改,不允许读的操作。

      但是在driver中就只能读操作。

      

     3.程序

      需求一:对应于MR中的累加器,累积计算次数

      需求二:将累加器做成共享变量来使用。避免了shuffle过程,提高了效率。

      1 package com.ibeifeng.senior.accumulator
      2 
      3 import org.apache.spark.{AccumulableParam, SparkConf, SparkContext}
      4 
      5 import scala.collection.mutable
      6 import scala.util.Random
      7 
      8 /**
      9   * Spark累加器
     10   * Created by ibf on 02/15.
     11   */
     12 object AccumulatorDemo {
     13   def main(args: Array[String]): Unit = {
     14     val conf = new SparkConf()
     15       //.setMaster("local[*]") // local模式下默认不进行失败重启机制
     16       .setMaster("local[*,4]") // 开启local模式的失败重启机制,重启次数4-1=3次
     17       .setAppName("accumulator")
     18     val sc = SparkContext.getOrCreate(conf)
     19 
     20     // ===============================
     21     val rdd = sc.parallelize(Array(
     22       "hadoop,spark,hbase",
     23       "spark,hbase,hadoop",
     24       "",
     25       "spark,hive,hue",
     26       "spark,hadoop",
     27       "spark,,hadoop,hive",
     28       "spark,hbase,hive",
     29       "hadoop,hbase,hive",
     30       "hive,hbase,spark,hadoop",
     31       "hive,hbase,hadoop,hue"
     32     ), 5)
     33 
     34     // 需求一:实现WordCount程序,同时统计输入的记录数量以及最终输出结果的数量
     35     val inputRecords = sc.accumulator(0, "Input Record Size")
     36     val outputRecords = sc.accumulator(0, "Output Record Size")
     37     rdd.flatMap(line => {
     38       // 累计数量
     39       inputRecords += 1
     40       val nline = if (line == null) "" else line
     41       // 进行数据分割、过滤、数据转换
     42       nline.split(",")
     43         .map(word => (word.trim, 1)) // 数据转换
     44         .filter(_._1.nonEmpty) // word非空,进行数据过滤
     45     })
     46       .reduceByKey(_ + _)
     47       .foreachPartition(iter => {
     48         iter.foreach(record => {
     49           // 累计数据
     50           outputRecords += 1
     51           println(record)
     52         })
     53       })
     54 
     55     println(s"Input Size:${inputRecords.value}")
     56     println(s"Ouput Size:${outputRecords.value}")
     57 



    58 // 需求二:假设wordcount的最终结果可以在driver/executor节点的内存中保存下,要求不通过reduceByKey相关API实现wordcount程序 59 /** 60 * 1. 每个分区进行wordcount的统计,将结果保存到累加器中 61 * 2. 当分区全部执行完后,各个分区的累加器数据进行聚合操作 62 */ 63 val mapAccumulable = sc.accumulable(mutable.Map[String, Int]())(MapAccumulableParam)//MapAccumulableParam是强制转换 64 try 65 rdd.foreachPartition(iter => { 66 val index = Random.nextInt(2) // index的取值范围[0,1] 67 iter.foreach(line => { 68 val r = 1 / index 69 print(r) 70 val nline = if (line == null) "" else line 71 // 进行数据分割、过滤、数据转换 72 nline.split(",") 73 .filter(_.trim.nonEmpty) // 过滤空单词 74 .map(word => { 75 mapAccumulable += word // 统计word出现的次数 76 }) 77 }) 78 }) 79 catch { 80 case e: Exception => println(s"异常:${e.getMessage}") 81 } 82 println("result================") 83 mapAccumulable.value.foreach(println) 84 85 Thread.sleep(100000) 86 } 87 } 88 89 90 object MapAccumulableParam extends AccumulableParam[mutable.Map[String, Int], String] { 91 /** 92 * 添加一个string的元素到累加器中 93 * 94 * @param r 95 * @param t 96 * @return 97 */ 98 override def addAccumulator(r: mutable.Map[String, Int], t: String): mutable.Map[String, Int] = { 99 r += t -> (1 + r.getOrElse(t, 0)) 100 } 101 102 /** 103 * 合并两个数据 104 * 105 * @param r1 106 * @param r2 107 * @return 108 */ 109 override def addInPlace(r1: mutable.Map[String, Int], r2: mutable.Map[String, Int]): mutable.Map[String, Int] = { 110 r2.foldLeft(r1)((a, b) => { 111 a += b._1 -> (a.getOrElse(b._1, 0) + b._2) 112 }) 113 } 114 115 /** 116 * 返回初始值 117 * 118 * @param initialValue 119 * @return 120 */ 121 override def zero(initialValue: mutable.Map[String, Int]): mutable.Map[String, Int] = initialValue 122 }
  • 相关阅读:
    03人脉搜索:学会这一招,就能轻松找到90%的人的联系方式
    02 资源搜索-全面、快速查找全网你想要的任何信息、情报
    01信息搜索:全面、快速查找全网你想要的任何信息、情报.
    ansible笔记(12):handlers的用法
    ansible笔记(11):初识ansible playbook(二)
    ansible笔记(10):初识ansible playbook
    ansible笔记(9):常用模块之包管理模块
    ansible笔记(8):常用模块之系统类模块(二)
    ansible笔记(7):常用模块之系统类模块
    ansible笔记(6):常用模块之命令类模块
  • 原文地址:https://www.cnblogs.com/juncaoit/p/6542166.html
Copyright © 2011-2022 走看看