zoukankan      html  css  js  c++  java
  • Spark程序设计

     一、Spark编程模型

    Spark常规步骤:

    1. 创建SparkContext对象:连接集群,提供创建RDD和广播变量等接口
    2. 输入数据:从Hadoop等外部数据源或Scala数据集创建RDD
      • 创建RDD-Scala集合/本地文件
        • 并行度3,创建3个partition
      •  HDFS

      •  文件被切分成block分布在多个节点上,通过textFile读入机器内存,转为RDD的partition对象。action触发之后,才真物理上去执行。

      • 总结
      • 外部的数据,比如HDFS上从外部来,变为RDD,不断转换也是RDD,
      • 内部的数据,比如一些变量,是Scala集合或容器,存在Driver当前APP单机的数据集,包括Broadcast变量,在单机有副本。而外部数据,最后通过collect等返回的数据,也返回到Driver单机数据
      • RDD上的数据,被每一台节点上spark的block manager管理的,里面存着。Driver节点上则知道全局RDD到底有哪些partition,究竟被每台机器怎么管理
      • broadcast变量声明初始化在Driver,可以分发到不同的节点上,节点上就有副本,供Task共享
    3. 处理:RDD上执行操作API
      • 核心API 按语义分类
    4. 输出:Action触发执行,保存结果到外部数据源或回收到Driver

    其他[Optional]

    1. “共享变量”:Driver分发的全局变量:广播变量等
      • 声明初始化共享变量
      • 闭包传递,每个Task里都会有该变量,是和任务打包一起分发到节点上的
      • 使用broadcast,则每个Excutor只有一个,Task共享该变量,减少了数据传输,提高空间使用率

           2.Checkpoint:检查点备份

           3.Cache:缓存复用数据

           4.采样(sample):小数据集验证

           5.调试:take,foreach等

    二、Spark优化

     通过程序或配置参数控制:

    • 控制任务并行度
    • 降低单条数据处理开销
    • 数据倾斜问题
    • RDD缓存复用
    • 操作符的选择
    • 为作业设置合理的资源

    1.控制任务并行度

    问题与影响

    • 数目过小:运行过慢,容易OOM
    • 数目过大:产生过多小任务,启动和调度开销较大

    推荐数量

    • 每个CPU core对应2~3个Task

    控制并行任务(Task)数量

    • Map任务并行度
      • 输入的Stage的Task并行度
      • 默认值:输入文件的数据块数量一致(HDFS block)
      • 通过API控制sc.textFile("input.txt", 100)
    • Reduce任务并行度
      • 默认值:使用parent RDD的partition数量
      • 通过设置配置spark.default.parallelism更改默认配置
      • 通过API控制groupByKey,reduceByKey等提供了相关参数
        • rdd.reduceByKey(_+_, 100)

    2.降低单条记录开销

     mapPartitions

    3.数据倾斜或任务倾斜问题

    问题:某个任务T负载过大,造成拖慢整体Stage进度或Task出现异常

    原因:任务T数据过多或任务T所在节点有问题

    解决:

    • 数据:选择合适的Partition Key
    • 调度:spark.speculation设置为true
    • 节点:剔除所有问题节点

    Stage0中下面Task数据量过大

    可以将该Task根据随机数再切分为两个Task,三个Task处理的数据量基本就一致了

    可以自定义方法解决该问题

     4.缓存重复使用RDD

     两个action

    5.操作符的选择

    6.为作业设置合理资源

    7.监控与诊断

    8.Spark参数配置方式

    监控实例

    作业提交到了Yarn

    打开resourcemanager的端口,可看到所有正在运行或运行结束的Application

     点开正在运行的UserClick

     点击TracingUrl,进入Spark的监控界面

     如果是standalone模式,直接打开4040端口即可

    三、案例

     电影受众分析

    1.电影受众分析背景

     2.电影受众分析数据

    用户数据:用户ID,性别,年龄,职业, 编码

    电影数据:电影ID,电影名,风格

    评价数据:用户ID,电影ID,评分,时间戳

    3.电影受众分析任务

    • 看过“Sixteen Candles”用户年龄和性别分布
      • 电影受众分析数据:过滤
      • 连接
      • 分布:聚合运算,年龄和性别为key统计数量

    创建Object

    package org.sparkcourse.movie
    
    import org.apache.spark._
    
    
    object MovieUser{
      def main(args: Array[String]): Unit = {
        //创建SparkContext
        val master = if(args.length > 0) args(0).toString else "local"
        val datapath = if(args.length > 1) args(1).toString else "data/ml-1m"
        val conf = new SparkConf().setMaster(master).setAppName("MovieUser")
        val sc = new SparkContext(conf)
        //输入数据
        val usersRdd = sc.textFile(datapath+"/users.dat")
        val ratingsRdd = sc.textFile(datapath+"/ratings.dat")
        //抽取数据的属性,过滤符合条件的电影
        //RDD[(userId, (gender, age))]
        val users = usersRdd.map(_.split("::")).map(x=>{
          (x(0), (x(1), x(2)))
        })
        //RDD[(userID, movieID)] split返回数组
        val rating = ratingsRdd.map(_.split("::")).map(x =>
          (x(0), x(1))).filter(x => x._2.equals("2144"))
        //join两个数据集
        val userRating = rating.join(users)
        userRating.take(num=1).foreach(println(_))
        //统计分析
        val userDistribution = userRating.map(x=>{
          (x._2._2, 1)
        }).reduceByKey(_+_)
          .foreach(println(_))
        sc.stop()
    
      }
    }

    users  (userID, (gender, age))

    rating  (userID, movieID)

    userRating (userID, (movieID, (gender, age))) 例如,(4425, (2144, (M, 35)))

    userDistribution ((gender, age), 1) 求和

    • 年龄段20-30的男性年轻人,最喜欢看哪10部电影
      • 年龄段:过滤
      • 最喜欢10部电影:聚合,排序
    package org.sparkcourse.movie
    
    import org.apache.spark._
    
    import scala.collection.immutable.HashSet
    
    object PopularMovie {
      def main(args: Array[String]): Unit = {
    
    
        val conf = new SparkConf().setMaster("local").setAppName("PopularMovie")
        val sc = new SparkContext(conf)
        //输入数据
        val usersRdd = sc.textFile("data/ml-1m/users.dat")
        val ratingsRdd =  sc.textFile("data/ml-1m/ratings.dat")
        //抽取数据和过滤
        val users = usersRdd.map(_.split("::")).map(x=>{
          (x(0), x(2)) //userid, age
        }).filter(x=>x._2.toInt>=20&&x._2.toInt<=30)
          .map(_._1)
          .collect() // 变为了Driver上单机的变量,用广播变量发出去
        val userSet = HashSet() ++ users
        val broadcastUserSet = sc.broadcast(userSet)
        //聚合和排序
        val topKMovies = ratingsRdd.map(_.split("::"))
          .map(x=>{(x(0), x(1))}) //userid, movieid
          .filter(x => {
          broadcastUserSet.value.contains(x._1)
        }).map(x=>{
          (x._2, 1)
        }).reduceByKey(_+_)
          .map(x =>(x._2, x._1))
          .sortByKey(false) // ascending=false
          .map(x=>{(x._2, x._1)})
          .take(3)
          .foreach(println(_))
      }
    }
    • 最受欢迎的前3部电影
    package org.sparkcourse.movie
    
    import org.apache.spark._
    
    object TopKMovie {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("TopKMovie")
        val sc = new SparkContext(conf)
    
        // 输入
        val ratingsRdd = sc.textFile("data/ml-1m/ratings.dat")
    
        // 数据抽取
        val ratings = ratingsRdd.map(_.split("::"))
          .map(x => {
            (x(0), x(1), x(2)) // userid, movieid, rating
          })
    
        // 分析
        val topKScoreMostMovie = ratings.map(x => {
          (x._2, (x._3.toInt, 1)) // (movieid, (rating, 1))
        }).reduceByKey((v1, v2) => { 
          (v1._1 + v2._1, v1._2 + v2._2) // {movieid: (rating之和,数量之和)}
        }).map(x => {
          (x._2._1.toFloat / x._2._2.toFloat, x._1) // (平均分, movieid)
        }).sortByKey(false)
            .take(3)
            .foreach(println(_))
        sc.stop()
      }
    }

     reduceByKey((v1, v2) => { (v1._1 + v2._1, v1._2 + v2._2)})

    v1和v2都是value

    即对于(movieid, (rating, 1))来说,是(rating, 1)

    reducByKey对相同的键的进行value的操作

    reduceByKey(binary_function)
    reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

    那么讲到这里,差不多函数功能已经明了了,而reduceByKey的是如何运行的呢?下面这张图就清楚了揭示了其原理:

     

    它会在数据搬移以前,即在reduce之前就进行了reduce操作。

    可以实现同样功能的还有GroupByKey函数,但是,groupbykey函数并不能提前进行reduce,也就是说,上面的处理过程会翻译成这样:

     

    所以在处理大规模应用的时候,应该使用reduceByKey函数。

  • 相关阅读:
    [C++]怎么将.h和.cpp文件分别放在不同的目录
    [C++]VAssistX文件头添加注释功能设置
    教程-Delphi调用百度地图API(XE8+WIN7)
    WebBrowser的各种使用方法(未完待续)(XE8+WIN7)
    Android教程-夜神模拟器连接IDE更新让Delphi发现你的手机或夜神模拟器
    首尔甜城常用电话
    Android问题-DelphiXE5开发Andriod连接Webservice乱码问题
    点乘的使用
    [转]脏读,不可重复读,幻读的理解
    Unity3D刚体不同力的测试(ForceMode,AddForce,RelativeAddForce)
  • 原文地址:https://www.cnblogs.com/aidata/p/11541657.html
Copyright © 2011-2022 走看看