1、前序
2、spark程序的序列化
2.1 spark程序序列化描述
对于分布式计算来说,数据后期先需要进行序列化,然后发送给对应的其他计算节点,然后反序列化获取对象数据,最后进行任务执行。
2.2 程序序列化流程
- 1、在Driver端先把对象数据进行序列化
- 2、然后把序列化后的数据发送到worker节点上的executor进程中
- 3、然后在exectuor进程中把数据进行反序列化得到该对象数据
- 4、最后开始执行任务
2.3 如何解决spark程序中的序列化问题
(1)如果函数中使用了在外部构建的该类对象,该类要实现序列化
(2)如果函数中使用了该类对象的成员变量,该类除了要实现序列化之外,所有的成员变量必须要实现序列化
(3)对于不能序列化的成员变量使用“@transient”标注,告诉编译器不需要序列化
(4)也可将依赖的变量独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输量,提高效率。
class A extends Serializable{
变量1
变量2
变量3
变量4
}
把需要实现序列化的变量单独封装在一个class中,减少数据的网络传输
class B extends Serializable{
变量1
变量2
}
(5)可以把对象的创建直接在该函数中构建这样避免需要序列化
3、spark的共享变量
3.1 广播变量
spark程序允许用户在Driver端把一份公共数据下发到每一个参与计算的worker节点,后期每一个worker节点上运行了大量的task,这些task都共享同一份数据。不需要每一个task都去加载这一份数据。这样一来是可以减少内存开销。
-
怎么使用广播变量
- 1、在Driver端通过sparkContext对象 sc.broadcast(数据) 方法内部需要一份数据,
- val broadcastValue = sc.broadcast(数据)
- 就相当于把数据从Driver端下发到每一个参与计算的worker节点
- 2、后期可以在executor端获取该数据
- 通过broadcastValue广播变量调用value属性获取对应值
- broadcastValue.value
- 通过broadcastValue广播变量调用value属性获取对应值
- 1、在Driver端通过sparkContext对象 sc.broadcast(数据) 方法内部需要一份数据,
-
注意点
1、能不能将一个RDD使用广播变量广播出去? 不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。 2、 广播变量只能在Driver端定义,不能在Executor端定义。 3、 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。 4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。 5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
3.2 累加器
-
1、累加器的描述
累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。可以使用累加器来进行全局的计数。
-
2、如何使用累加器
(1)通过在driver中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始值的累加器。返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值initialValue 的类型。 (2)Spark闭包(函数序列化)里的executor代码可以使用累加器的add方法(scala)或者 +=(java)增加累加器的值。 (3)driver程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue() )来访问累加器的值。 注意:工作节点上的任务不能访问累加器的值
-
3、使用累加器需要注意的问题
看了上面的分析,大家都有这种印象了,那就是使用累加器的过程中只能使用一次action的操作才能保证结果的准确性。 事实上,还是有解决方案的,我们可以通过cache,persist,将要使用到的RDD进行缓存,后续的其他job操作中,可以直接使用其结果,不需要再次重新算,最终避免导致累加器再次执行。 修改后的代码: val accum= sc.accumulator(0, "Error Accumulator") val data = sc.parallelize(1 to 10) //用accumulator统计偶数出现的次数,同时偶数返回0,奇数返回1 val newData = data.map{x => { if(x%2 == 0){ accum += 1 0 }else 1 }} //对该rdd设置缓存 newData.cache //使用action操作触发执行 newData.count //此时accum的值为5,是我们要的结果 accum.value //继续操作,查看刚才变动的数据,foreach也是action操作 newData.foreach(println) //上个步骤没有进行累计器操作,因为当前newData这个rdd的结果是从缓存中获取得到的,也就没有再次执行map方法里面的逻辑。这个时候累加器的值就没有问题。 accum.value