zoukankan      html  css  js  c++  java
  • (二)Spark

    1、spark自定义分区

    2、spark中的共享变量

    3、spark程序的序列化问题

    4、spark中的application/job/stage/task之间的关系

    5、spark on yarn原理和机制

    6、spark的资源分配方式

    1 spark自定义分区

    1.1 自定义分区说明

    • 在对rdd数据进行分区时,默认使用的是HashPartitioner
    • 该函数对key进行哈希,然后对分区总数取模,取模结果相同的机会被分到同一个partition中

    HashPartitioner分区逻辑:

      key.hashcode % 分区总数 = 分区号

    • 如果嫌HashPartitioner功能单一,可以自定义partitioner

    1.2 自定义partitioner

    实现自定义partitioner大致分为3个步骤

    1)继承org.apache.spark.Partitioner

    2)重写numPartitions方法

    3)重写getPartition方法

    1.3 案例

    需求:要根据rdd的key的长度进行分区,相同key的长度进入到同一分区

    //自定义分区
    class MyPartitioner(num:Int) extends Partitioner{
      //指定rdd的总的分区数
      override def numPartitions:Int={
        num
      }
      //消息按照key的某种规则进入到指定的分区号中
      override def getPartition(key:Any):Int={
        //这里的key就是单词
        val length:Int = key.toString.length
        length match{
          case 4 => 0
          case 5 => 1
          case 6 => 2
          case _ => 0
        }
      }
    }

    2 spark的共享变量

    2.1 spark的广播变量(broadcast variable)

    • spark中分布式执行的代码需要传递到各个Executor的Task上运行。对于一些只读、固定的数据

    (比如从DB中读出的数据),每次都需要Driver广播到各个Task上,这样效率低下。

    • 广播变量允许将变量只广播给各个Executor。该Executor上的各个Task再从所在节点的BlockManager

    获取变量,而不是从Driver获取变量,以减少通信的成本,减少内存的占用,从而提升了效率。

    广播变量使用注意事项

    1)不能将一个RDD使用广播变量广播出去

    2)广播变量只能在Driver端定义,不能在Executor端定义

    3)在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值

    4)如果Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task

      就有多少Driver端的变量副本

    5)如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver

      端的变量副本

    2.2 spark的累机器(Accumulator)

    • 累机器(Accumulator)是spark中提供的一种分布式的变量机制,其原理类似于MapReduce,

    即分布式的改变,然后聚合这些改变

    • 累机器的一个常见用途是在调试时对作用执行过程中的事件进行计数。可以使用累机器来进行

    全局的计数

    2.2.1 累机器原理

    2.2.2 累机器的使用

    1)通过在driver中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累机器。

    返回值为org.ahche.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。

    2)spark闭包(函数序列化)里的Executor代码可以使用累机器的add方法增加累加器的值

    3)driver程序可以调用累机器的value属性来访问累加器的值

    //创建Accumulator 并初始化为0
    val accumulator = sc.accumulator(0)
    val result = rdd1.map(s=>{
      accumulator.add(1)
      s
    })
    result.collect
    println("words lines is:"+accumulator.value)

    3 spark程序的序列化问题

    3.1 transformation操作为什么需要序列化

    • spark是分布式执行引擎,其核心抽象是弹性分布式数据集RDD,其代表了分布在不同节点的数据。

    Spark的计算时在executor上分布式执行的,故用户开发的关于RDD的map、flatMap、

    reduceByKey等transformation操作(闭包)有如下执行过程:

    1)代码中对象在driver本地序列化

    2)对象序列化后传输到远程executor节点

    3)远程executor节点反序列化对象

    4)最终远程节点执行

    • 故对象在执行中需要序列化通过网络传输,则必须经过序列化过程

    3.2 spark的任务序列化异常

    • 在编写spark程序中,由于在map,foreachPartition等算子内部使用了外部定义的变量和函数,

    从而引发Task未序列化问题。

    • 然而spark算子在计算过程中使用外部变量在许多情形下确实在所难免,比如filter算子根据外部

    指定的条件进行过滤,map根据相应的配置进行变换。

    • 经常会出现"org.apache.spark.SparkException:Task not serializable" 这个错误
      • 其原因就在于这些算子使用了外部的变量,但是这个变量不能序列化
      • 当前类使用了"extends Serializable"声明支持序列化,但是由于某些字段不支持序列化,仍然
      • 会导致整个序列化时出现问题,最终导致出现Task未序列化问题。

    3.3 spark中解决序列化的办法

    1)如果函数中使用了该类对象,该类要实现序列化

      类 extends Serializable

    2)如果函数中使用了该类对象的成员变量,该类除了要实现序列化之外,所有的成员变量必须要

      实现序列化

    3)对于不能序列化的成员变量使用"@transient"标注,告诉编译器不需要序列化

    4)也可将依赖的变量独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输

      提高效率

    5)可以把对象的创建直接在该函数中构建这样避免需要序列化

    4 application、job、stage、task之间的关系

    5 spark on yarn 

    可以把spark程序提交到yarn中去运行,此时spark任务所需要的计算资源由yarn中的老大

    ResourceManager去分配

    ****************每个月总有那么几天upset ***********************************************************************

  • 相关阅读:
    Linux命令——getfacl、setfacl
    Linux命令——groups
    Linux命令——gdisk、fdisk、partprobe
    Linux命令——parted
    Linux命令——blkid
    Linux命令——chattr、lsattr
    Linux命令——od
    Linux命令——basename、dirname
    Linux命令——chgrp、chown、chmod
    Linux命令——pidof
  • 原文地址:https://www.cnblogs.com/hanchaoyue/p/13357986.html
Copyright © 2011-2022 走看看