zoukankan      html  css  js  c++  java
  • Sparkcore学习(三)

    数据读取与保存

    Text文件

    • 基本语法
      1. 数据读取:textFile(String)
      2. 数据保存:saveAsTextFile(String)

    Sequence文件

    • SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass] (path)。

    • 语法

      1. 保存数据为SequenceFile

        dataRDD.saveAsSequenceFile("output")

      2. 读取SequenceFile文件

      ​ sc.sequenceFile[Int,Int] ("output").collect().foreach(println)

    Object对象文件

    • 对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v] (path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。
    • 语法:
      1. 保存数据:dataRDD.saveAsObjectFile("output")
      2. 读取数据:sc.objectFile[(Int)] ("output").collect().foreach(println)

    文件系统类数据读取与保存

    • Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。

    累加器

    • 累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)
    • 累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
    • 原因: 累加器可以在一定程度上减少shuffle操作
    • 场景: 用于聚合结果不是太大的场景,不然可能导致Driver内存溢出
    • 原理: 首先在每个分区中聚合,然后将每个分区的聚合结果发给Driver汇总

    系统累加器

    累加器使用

    1. 累加器定义(SparkContext.accumulator(initialValue)方法)

      val sum: LongAccumulator = sc.longAccumulator("sum")

    2. 累加器添加数据(累加器.add方法)

      sum.add(count)

    3. 累加器获取数据(累加器.value)

      sum.value

    4. 注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。从这些任务的角度来看,累加器是一个只写变量。

    5. 累加器放在行动算子中:放在foreach()这样的行动操作中,这样无论计算失败还是计算重复累加器都相对可靠。转化操作中累加器可能会发生不止一次更新。

    自定义累加器

    广播变量

    • 原因: spark算子里面的代码是在executor的task执行的,spark算子外面的代码是在driver执行
    • 场景:
      1. spark算子中使用了driver的数据的时候,并且该数据的大小还有点大,此时可以将数据广播出去减少数据
      2. 大表 join 小表的时候,将小表广播出去减少shuffle操作
    • 使用:
      1. 广播数据: val bc = sc.broadcast(数据)【大表join小表的时候需要使用collect收集小表数据然后才能广播。即不能广播RDD,RDD中没有数据,只有计算逻辑】
      2. task使用广播数据: bc.value

    场景一代码演示

    //spark算子中使用了driver的数据的时候,并且该数据的大小还有点大,此时可以将数据广播出去减少数据
    def main1(args: Array[String]): Unit = {
    
      import org.apache.spark.{SparkConf, SparkContext}
      val sc = new SparkContext( new SparkConf().setMaster("local[4]").setAppName("test") )
    
      val rdd = sc.parallelize(List("jd","pdd","tm","atguigu"))
      //
      val map = Map[String,String]("jd"->"www.jd.com","pdd"->"www.pdd.com","tm"->"www.tm.com","atguigu"->"www.atguigu.com")
    
      //广播数据
      val bc = sc.broadcast(map)
    
      val rdd2 = rdd.map(x=> {
        val m = bc.value
        m.getOrElse(x,"")
        //map.getOrElse(x,"")
      })
    
      println(rdd2.collect().toList)
    
      Thread.sleep(1000000)
    }
    

    场景二代码演示

    //大表 join 小表的时候,将小表广播出去减少shuffle操作
    def main(args: Array[String]): Unit = {
    
       import org.apache.spark.{SparkConf, SparkContext}
      val sc = new SparkContext( new SparkConf().setMaster("local[4]").setAppName("test") )
    
      //获取没有农贸市场的省份
      //读取数据
      //全国省份信息
      val allprovincerdd = sc.textFile("datas/allprovince.txt")
      //全国部分省份菜市场信息
      val productRdd = sc.textFile("datas/product.txt")
        /*大概长这样,数据量挺大
        生菜	2.00	2018/1/1	山西汾阳市晋阳农副产品批发市场	山西	汾阳
        芹菜	2.40	2018/1/1	山西汾阳市晋阳农副产品批发市场	山西	汾阳
        菜花	3.80	2018/1/1	北京朝阳区大洋路综合市场	北京	朝阳
        生姜	10.00	2018/1/1	北京朝阳区大洋路综合市场	北京	朝阳
        */
      //过滤,因为有些数据不太干净,缺失了部分,在此不考虑这些缺失数据
      val productFilterRdd = productRdd.filter(line=> line.split("\t").length==6)
      //列裁剪,获取需要的省份数据
      val provinceRdd = productFilterRdd.map(line=>{
        val arr = line.split("\t")
        arr(4)
      })
      //对切分后的数据进行去重
      val disProvinceRdd = provinceRdd.distinct()
      //将处理后的数据转换成KV的形式
      val provinceKVRDD = disProvinceRdd.map(x=>(x,""))
      //将所有省份的数据转换成KV的形式
      val allProvinceKVRDD = allprovincerdd.map(x=>(x,""))
    
      //广播经过处理得到的全国所有菜市场省份数据
      val provinceList = provinceKVRDD.collect
      val bc = sc.broadcast( provinceList )
    
      //val allRdd = allProvinceKVRDD.leftOuterJoin(provinceKVRDD)
    
      //val resRdd = allRdd.filter{
      //  case (province,( leftvalue,rightValue )) => rightValue.isEmpty
      //}
    
      val resRdd = allProvinceKVRDD.filter{
        case (province,_) =>
          ! bc.value.map(_._1).contains( province )
      }
    
      resRdd.foreach(println)
    
      Thread.sleep(10000000)
    
    }
    
    作者:Ya
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    【Lintcode】112.Remove Duplicates from Sorted List
    【Lintcode】087.Remove Node in Binary Search Tree
    【Lintcode】011.Search Range in Binary Search Tree
    【Lintcode】095.Validate Binary Search Tree
    【Lintcode】069.Binary Tree Level Order Traversal
    【Lintcode】088.Lowest Common Ancestor
    【Lintcode】094.Binary Tree Maximum Path Sum
    【算法总结】二叉树
    库(静态库和动态库)
    从尾到头打印链表
  • 原文地址:https://www.cnblogs.com/1463490Ya/p/15519998.html
Copyright © 2011-2022 走看看