zoukankan      html  css  js  c++  java
  • Spark 学习(六) Spark 的线程安全和序列化问题

    一,必备知识

      1.1 经典14问

      1.2 问题前提

    二,序列化问题

      2.1 Spark序列化出现情况

      2.2 Spark序列化问题解决

    三,线程安全问题

      3.1 Spark线程安全出现情况

      3.2 Spark线程安全问题解决

    正文

    一,必备知识

      1.1 经典14问

    1.SparkContext哪一端生成的?
        Driver端
    
    2.DAG是在哪一端被构建的?
        Driver端
        
    3.RDD是在哪一端生成的?
        Driver端
    
    4.广播变量是在哪一端调用的方法进行广播的?
        Driver端
    
    5.要广播的数据应该在哪一端先创建好再广播呢? 
        Driver端
    
    6.调用RDD的算子(Transformation和Action)是在哪一端调用的
        Driver端
        
    7.RDD在调用Transformation和Action时需要传入一个函数,函数是在哪一端声明和传入的?
        Driver端
    
    8.RDD在调用Transformation和Action时需要传入函数,请问传入的函数是在哪一端执行了函数的业务逻辑?
        Executor中的Task执行的
    
    9.自定义的分区器这个类是在哪一端实例化的?
        Driver端
    
    10.分区器中的getParitition方法在哪一端调用的呢?
        Executor中的Task中调用的
    
    11.Task是在哪一端生成的呢? 
        Driver端
    
    12.DAG是在哪一端构建好的并被切分成一到多个State的
        Driver端
    
    13.DAG是哪个类完成的切分Stage的功能?
        DAGScheduler
        
    14.DAGScheduler将切分好的Stage以什么样的形式给TaskScheduler
        TaskSet

      1.2 需求前提

      在上面的12问的7-8问中,函数的申明和调用分别在Driver和Execute中进行,这其中就会牵扯到序列化问题和线程安全问题。接下来会对其进行解释。

    二,序列化问题

      2.1 Spark序列化出现情况

      工具类:

    package cn.edu360.spark05
    
    // 随意定义一工具类
    class MyUtil {
        def get(msg: String): String ={
            msg+"aaa"
        }
    }

      Spark实现类:

    package cn.edu360.spark05
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object SequenceTest {
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]")
            var sc = new SparkContext(conf)
            val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/")
            val words = lines.flatMap(_.split(" "))
            // 对类进行实例化
            val util = new MyUtil
            // 调用实例的方法
            val value: RDD[String] = words.map(word => util.get(word))
            value.collect()
            sc.stop()
        }
    }

      报错信息如下:

      

      

      上述报错信息就说明是MyUtil实例的序列化问题。该实例是在Driver端创建,通过网络发送到Worker的Executer端。但是这个实例并为序列化,所以会报这些错误。

      2.2 Spark序列化问题解决

      解决方案一:实现序列化接口

    package cn.edu360.spark05
    
    // 继承Serializable
    class MyUtil extends Serializable {
        def get(msg: String): String ={
            msg+"aaa"
        }
    }

      弊端:需要自己实现序列化接口,相对麻烦

      解决方案二:不实现序列化接口,在Executer进行MyUtil内进行实例化

    package cn.edu360.spark05
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object SequenceTest {
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]")
            var sc = new SparkContext(conf)
            val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/")
            val words = lines.flatMap(_.split(" "))
            val value: RDD[String] = words.map(word => {
                // 在这里进行实例化,这里的操作是在Executer中
                val util = new MyUtil
                util.get(word)
            })
            val result: Array[String] = value.collect()
            print(result.toBuffer)
            sc.stop()
        }
    }

      弊端:每一次调用都需要创建一个新的实例,浪费资源,浪费内存。

      解决方案三:采用单例模式

      MyUtil类:

    package cn.edu360.spark05
    
    // 将class 改为 object的单例模式
    object MyUtil {
        def get(msg: String): String ={
            msg+"aaa"
        }
    }

      Spark实现类:

    package cn.edu360.spark05
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object SequenceTest {
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]")
            var sc = new SparkContext(conf)
            val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/")
            val words = lines.flatMap(_.split(" "))
            val value: RDD[String] = words.map(word => {
                // 调用方法
                MyUtil.get(word)
            })
            val result: Array[String] = value.collect()
            print(result.toBuffer)
            sc.stop()
        }
    }

    三,线程安全问题

      3.1 Spark线程安全出现情况、

      有共享成员变量:

        1. 工具类使用object,说明工具类是单例的,有线程安全问题。在函数内部使用,是在Executer中被初始化,一个Executer中有一个实例,所以 就出现了线程安全问题。

        2. 工具类使用Class,说明是多例的,没有线程安全问题。每个task都会持有一份工具类的实例。

      没有共享成员变量:

        1. 工具类Object,没有线程安全问题

        2. 工具类使用class,实现序列化即可

      3.2 Spark线程安全问题解决

        工具类优先使用object,但尽可能不使用成员变量,若实在有这方面的需求,可以定义类的类型,或者把成员变量变成线程安全的成员变量,例如加锁等。

  • 相关阅读:
    团队作业---软件制作8
    团队作业---软件制作7
    团队绩效考核表
    团队报告
    团队作业---软件制作6
    团队作业---软件制作5
    团队作业---软件制作4
    团队作业---软件制作3
    团队作业---软件制作2
    第十周学习进度条
  • 原文地址:https://www.cnblogs.com/tashanzhishi/p/10994834.html
Copyright © 2011-2022 走看看