zoukankan      html  css  js  c++  java
  • Spark编程进阶篇

                    Spark编程进阶篇

                                         作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

     

    一.spark三大数据结构

      Spark有三大数据结构,分别为RDD,广播变量和累加器。
        RDD:
          RDD全称为"Resilient Distributed Dataset",叫做弹性分布式数据集,是Spark中最基本的数据抽象。
          简单的理解RDD就是包含数据的计算。很多时候数据需要一定的访问规则,这个时候使用RDD来做就不太合适啦。
    
        广播变量:
          此处可以简单理解为分布式只读共享变量。
    
        累加器:
          此处可以简单理解为分布式只写共享变量。
    
      博主推荐阅读:
        https://www.cnblogs.com/yinzhengjie2020/p/13155362.html

    二.广播变量

    1>.广播变量概述

      广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。

      比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。 在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。   使用广播变量的过程如下:     (
    1)通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。 任何可序列化的类型都可以这么实现。     (2)通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。     (3)变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

    2>.广播变量应用案例

    package com.yinzhengjie.bigdata.spark.rdd
    
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object BroadcastVariable {
      def main(args: Array[String]): Unit = {
        //创建spark配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
    
        //创建SparkContext
        val sc = new SparkContext(sparkConf)
    
        val listRDD:RDD[(Int,String)] = sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))
    
        /**
          *     如果想要将2个RDD基于Key进行关联,使用join算子(设计到笛卡尔积,即会增加数据)是可以实现的;
          *
          *     但效率较低,因为使用join算子有一个弊端就是涉及到shuffle过程
          */
    //    val listRDD2:RDD[(Int,Int)] = sc.makeRDD(List((1,1),(2,2),(3,3)))
    //    val joinRDD:RDD[(Int,(String,Int))] = listRDD.join(listRDD2)
    //    joinRDD.foreach(println)
    
        val list = List((1,1),(2,2),(3,3))
    
        //将list变量构建为广播变量(使用广播变量减少数据的传输,即无需每个Executor都传输一个list变量,而是spark集群共享同一个只读变量)
        val boradcast:Broadcast[List[(Int,Int)]] = sc.broadcast(list)
    
        /**
          *   使用map算子来替代join算子的好处就是map不涉及到shuffle过程,因此我们说广播变量是一种调优策略。
          */
        val mapRDD:RDD[(Int,(String,Any))] = listRDD.map{
          case (key,value) => {
            var v2:Any = null
            //使用广播变量
            for (t <- boradcast.value){
              if (key == t._1){
                v2 = t._2
              }
            }
            (key,(value,v2))
          }
        }
        mapRDD.foreach(println)
    
        //释放资源
        sc.stop()
      }
    }

    三.累加器

    1>.累加器概述

      累加器用来对信息进行聚合,通常在向 Spark传递函数时,比如使用map()函数或者用filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。
    
      如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。
    
      累加器的用法如下所示。
        通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器;
        返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值 initialValue 的类型;
        Spark闭包里的执行器代码可以使用累加器的"+=”方法(在Java中是add)增加累加器的值;
        驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值。
    
      自定义累加器
        自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。
        实现自定义类型累加器需要继承AccumulatorV2并至少覆写相应的方法,累加器可以用于在程序运行过程中收集一些文本类信息,最终以Set[String]的形式返回。
    
      温馨提示:
        工作节点上的任务不能访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。
        对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动操作中。转化操作中累加器可能会发生不止一次更新。

    2>.系统累加器 

    package com.yinzhengjie.bigdata.spark.rdd
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.util.LongAccumulator
    import org.apache.spark.{SparkConf, SparkContext}
    
    object ShareData {
      def main(args: Array[String]): Unit = {
        //创建spark配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
    
        //创建SparkContext
        val sc = new SparkContext(sparkConf)
    
        val dataRDD:RDD[Int] = sc.parallelize(List(10,20,30,40,60,70,80,90),3)
    
        /**
          *   使用reduce算子计算dataRDD各个元素之和,其工作原理如下:
          *     1>.将Driver中的dataRDD中3个不同分区数据发送给不同的Executor;
          *     2>.各个Executor计算相应分区的数据之和;
          *     3>.最后,每个Executor的数据再相加得到最终的数据。
          */
    //    val sum1 = dataRDD.reduce(_+_)
    //    println("sum1 = " + sum1)
    
        /**
          *   我们使用foreach算子直接遍历dataRDD中的每个元素使他们相加,但打印sum2的结果为"0",其原因如下:
          *     1>.将Driver中的dataRDD中3个不同分区数据发送给不同的Executor;
          *     2>.sum2属于Driver中的变量,因此他要序列化sum2变量并其传递给各个Executor;
          *     3>.各个Executor使用Driver传递过来的变量进行计算,最终各个Exector算出sum2的结果,但此时各个Executor并没有将计算结果发送给Driver;
          *
          *    综上所述,各个Executor均有对应的sum2数据,但并没有将计算结果发送给Driver进行累加,因此我们看到的sum2始终是"0".
          *
          */
    //    var sum2:Int = 0
    //    dataRDD.foreach(data => sum2 += data)
    //    println("sum2 = " + sum2)
    
        /**
          *   使用累加器来共享变量,用来累加数据,其工作原理如下:
          *     1>.将Driver中的dataRDD中3个不同分区数据发送给不同的Executor;
          *     2>.sum3属于Driver中的变量,因此他要序列化sum3变量并其传递给各个Executor;
          *     3>.各个Executor使用Driver传递过来的变量进行计算,最终各个Executor算出sum3的结果;
          *     4>.最后spark发现sum3是一个累加器(Accumulator)变量,因此会将各个Executor的sum3的结果返回给Driver,由Driver将最终结果进行累加。
          */
        val sum3:LongAccumulator = sc.longAccumulator  //创建累加器对象sum3
    
        dataRDD.foreach{
          case data => {
            sum3.add(data)   //将每一个元素和累加器相加
          }
        }
        println("sum3 = " + sum3.value)
    
        //释放资源
        sc.stop()
      }
    }

    3>.自定义累加器

    package com.yinzhengjie.bigdata.spark.rdd
    
    import java.util
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.util.AccumulatorV2
    
    /**
      *   自定义累加器步骤如下:
      *     1>.继承AccumulatorV2;
      *     2>.实现抽象方法;
      *     3>.创建累加器
      */
    class WordAccumulator extends AccumulatorV2[String, util.ArrayList[String]] {
    
      //定义一个集合,用于返回结果
      val list = new util.ArrayList[String]()
    
      //当前的累加器是否为初始化状态
      override def isZero: Boolean = {
        list.isEmpty
      }
    
      //复制累加器对象,根据你得需求自行实现
      override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {
        new WordAccumulator()
      }
    
      //重置累加器对象
      override def reset(): Unit = {
        list.clear()
      }
    
      //往累加器中增加数据
      override def add(v: String): Unit = {
        if (v.contains("o")){
          list.add(v)   //判断字符串是否包含字母"o",如果为真则写入累加器
        }
      }
    
      //合并累加器,即Driverd端将各个Executor返回的list集合的数据进行累加操作
      override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {
        list.addAll(other.value)
      }
    
      //获取累加器的结果,我们直接将定义的list集合返回
      override def value: util.ArrayList[String] = {
        list
      }
    }
    
    
    object CustomAccumulator {
      def main(args: Array[String]): Unit = {
        //创建spark配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
    
        //创建SparkContext
        val sc = new SparkContext(sparkConf)
    
        val dataRDD:RDD[String] = sc.makeRDD(Array("Hadoop","Spark","Flume","HBase","Sqoop","Flink","Storm","Hive"),3)
    
        //创建咱们自定义的累加器
        val wordAccumulator = new WordAccumulator
        //注册自定义累加器,目的是让Spark知晓
        sc.register(wordAccumulator)
        //使用咱们的累加器变量
        dataRDD.foreach{
          case data => {
            wordAccumulator.add(data)   //将每一个元素和累加器相加
          }
        }
    
        //获取累加器的值
        println("words = " + wordAccumulator.value)
    
        //释放资源
        sc.stop()
      }
    }
  • 相关阅读:
    oracle数据库导出与导入
    Mysql导入表信息[Err] 1067
    Golang--不定参数类型
    (转)Docker容器的重启策略及docker run的--restart选项详解
    (转)Golang--使用iota(常量计数器)
    Golang--匿名变量
    Golang--Hello World
    Ubuntu Server16.04 配置网卡
    U盘安装ubuntu 16.04 遇到 gfxboot.c32:not a COM32R image boot 的解决方法
    ipfs私链服务
  • 原文地址:https://www.cnblogs.com/yinzhengjie2020/p/13174139.html
Copyright © 2011-2022 走看看