zoukankan      html  css  js  c++  java
  • 关于Spark中的多任务并发处理(Concurrency)

    Spark中的多任务处理

    Spark的一个非常常见的用例是并行运行许多作业。 构建作业DAG后,Spark将这些任务分配到多个Executor上并行处理。
    但这并不能帮助我们在同一个Spark应用程序中同时运行两个完全独立的作业,例如同时从多个数据源读取数据并将它们写到对应的存储,或同时处理多个文件等。

    每个spark应用程序都需要一个SparkSession(Context)来配置和执行操作。 SparkSession对象是线程安全的,可以根据需要传递给你的Spark应用程序。

    一个顺序作业的例子

    假设我们有一个spark 2.x应用程序,负责将几个数据写入到HDFS中。

    import org.apache.spark.sql.SparkSession
    
    object FancyApp {
      def appMain(args: Array[String]) = {
        // configure spark
        val spark = SparkSession
            .builder
            .appName("parjobs")
            .getOrCreate()
    
        val df = spark.sparkContext.parallelize(1 to 100).toDF
        doFancyDistinct(df, "hdfs:///dis.parquet")
        doFancySum(df, "hdfs:///sum.parquet")
      }
    
      def doFancyDistinct(df: DataFrame, outPath: String) = df.distinct.write.parquet(outPath)
      
      def doFancySum(df: DataFrame, outPath: String) = df.agg(sum("value")).write.parquet(outPath)
    
    }
    

    这个程序看起来没有什么问题,Spark将按顺序执行两个动作。但这两个动作是独立, 我们可以同时执行它们。

    一个有缺陷的并发作业的例子

    如果你快速的在网上搜索一下 “scala异步编程”,你就会被引到Scala Future这个解决方案中。
    例如以下为一个并行处理RDD的例子:

    
    import scala.concurrent._
    import ExecutionContext.Implicits.global
    
    def pipeline(f: String, n: Int) = {
        sqlContext
            .read
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .load(f)
            .repartition(n)
            .groupBy(...)
            .agg(...)
            .cache // Cache so we can force computation later
    }
    val n: Int = 2 
    val files: Array[String] = ['/tmp/test1.csv','/tmp/test2.csv']
    
    val rdds = files.map(f => pipeline(f, n))
    
    def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = Future {
        df.rdd.foreach(_ => ()) // Force computation
        df
    }
    
    val result = Future.sequence(
       rdds.map(rdd => pipelineToFuture(rdd)).toList
    )
    

    我们只要根据搜索到的文档中提供的例子修改一下,就会得到以下类似内容:

    import org.apache.spark.sql.SparkSession
    import scala.concurrent._
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    
    object FancyApp {
      def appMain(args: Array[String]) = {
        // configure spark
        val spark = SparkSession
            .builder
            .appName("parjobs")
            .getOrCreate()
    
        val df = spark.sparkContext.parallelize(1 to 100).toDF
        val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
        val taskB = doFancySum(df, "hdfs:///sum.parquet")
        // Now wait for the tasks to finish before exiting the app
        Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
      }
    
      def doFancyDistinct(df: DataFrame, outPath: String) = Future { df.distinct.write.parquet(outPath) }
    
      def doFancySum(df: DataFrame, outPath: String) = Future { df.agg(sum("value")).write.parquet(outPath) }
    }
    

    ExecutionContext是用于管理并行操作的Context。 实际的线程模型可以由开发者明确提供,也可以使用全局默认值(这是一个 ForkJoinPool ),就像我们在上面的代码中使用的一样:

    import scala.concurrent.ExecutionContext.Implicits.global
    

    使用Global execution context 的问题在于它并不知道我们是在群集上启动Spark作业。 默认情况下,Global execution context 提供与运行代码的系统中的处理器相同数量的线程。 在我们的Spark应用程序中,它将与Driver上的处理器相同数量的线程。

    一个优化过的并发作业的例子

    我们需要控制我们的线程策略,更一般化地编写我们的程序,以便可以在不同的线程模型中重用它们。

    例如以下是我们从重写的函数,它将允许我们精确控制execution context 来管理调用函数时提供的线程数。 例子中添加的隐式参数将允许调用的代码指定运行函数时使用哪个ExecutionContext。

    def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
      df.distinct.write.parquet(outPath)
    }
    

    现在让我们提出一个比默认的Global execution context更好的策略。我们希望能够指定我们想要的并行度。

    import org.apache.spark.sql.SparkSession
    import import java.util.concurrent.Executors
    import scala.concurrent._
    import scala.concurrent.duration._
    
    object FancyApp {
      def appMain(args: Array[String]) = {
        // configure spark
        val spark = SparkSession
            .builder
            .appName("parjobs")
            .getOrCreate()
    
        // Set number of threads via a configuration property
        val pool = Executors.newFixedThreadPool(5)
        // create the implicit ExecutionContext based on our thread pool
        implicit val xc = ExecutionContext.fromExecutorService(pool)
        val df = spark.sparkContext.parallelize(1 to 100).toDF
        val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
        val taskB = doFancySum(df, "hdfs:///sum.parquet")
        // Now wait for the tasks to finish before exiting the app
        Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
      }
    
      def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
        df.distinct.write.parquet(outPath)
      }
    
      def doFancySum(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
        df.agg(sum("value")).write.parquet(outPath) 
      }
    }
    

    在这个例子中,我们定义了Execution context变量xc,含有五个线程。

    参考资料

    Spark Parallel Job Execution
    How to run concurrent jobs(actions) in Apache Spark using single spark context
    Processing multiple files as independent RDD’s in parallel

  • 相关阅读:
    online ddl与pt-osc详解
    几个重点问题回顾
    死锁及常见死锁模型
    InnoDB中锁的算法(3)
    一个幻读模型引出的记录可见性判断
    jupyter notebook的使用
    l线程池抓取lianjia
    lagou数据爬取
    爬虫代理的设置
    linux如何安装和启动mongdb
  • 原文地址:https://www.cnblogs.com/lestatzhang/p/10611291.html
Copyright © 2011-2022 走看看