zoukankan      html  css  js  c++  java
  • Spark中的多线程并发处理

    Spark中的多任务处理

    Spark的一个非常常见的用例是并行运行许多作业。 构建作业DAG后,Spark将这些任务分配到多个Executor上并行处理。
    但这并不能帮助我们在同一个Spark应用程序中同时运行两个完全独立的作业,例如同时从多个数据源读取数据并将它们写到对应的存储,或同时处理多个文件等。

    每个spark应用程序都需要一个SparkSession(Context)来配置和执行操作。 SparkSession对象是线程安全的,可以根据需要传递给你的Spark应用程序。

    顺序执行的例子

    import org.apache.spark.sql.SparkSession
    
    object FancyApp {
      def def appMain(args: Array[String]) = {
        // configure spark
        val spark = SparkSession
            .builder
            .appName("parjobs")
            .getOrCreate()
    
        val df = spark.sparkContext.parallelize(1 to 100).toDF
        doFancyDistinct(df, "hdfs:///dis.parquet")
        doFancySum(df, "hdfs:///sum.parquet")
      }
    
      def doFancyDistinct(df: DataFrame, outPath: String) = df.distinct.write.parquet(outPath)
    
    
      def doFancySum(df: DataFrame, outPath: String) = df.agg(sum("value")).write.parquet(outPath)
    
    }

    优化后的例子

    import org.apache.spark.sql.SparkSession
    import import java.util.concurrent.Executors
    import scala.concurrent._
    import scala.concurrent.duration._
    
    object FancyApp {
      def def appMain(args: Array[String]) = {
        // configure spark
        val spark = SparkSession
            .builder
            .appName("parjobs")
            .getOrCreate()
    
        // Set number of threads via a configuration property
        val pool = Executors.newFixedThreadPool(5)
        // create the implicit ExecutionContext based on our thread pool
        implicit val xc = ExecutionContext.fromExecutorService(pool)
        val df = spark.sparkContext.parallelize(1 to 100).toDF
        val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
        val taskB = doFancySum(df, "hdfs:///sum.parquet")
        // Now wait for the tasks to finish before exiting the app
        Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
      }
    
      def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
        df.distinct.write.parquet(outPath)
      }
    
      def doFancySum(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
        df.agg(sum("value")).write.parquet(outPath) 
      }
    }

    java 实现例子

        val executors = Executors.newFixedThreadPool(threadPoolNum)
        val completionService = new ExecutorCompletionService[String](executors)
        for ((branch_id, dataList) <- summary) {
          logInfo(s"************** applicationId is ${applicationId} about Multi-threading starting: file is ${branch_id}")
          completionService.submit(new Callable[String] {
            override def call(): String = {
              new VerificationTest(spark, branch_id, dataList, separator).runJob()
              branch_id
            }
          })
        }
  • 相关阅读:
    思达BI软件Style Intelligence实例教程—股票K线图
    思达报表工具Style Report基础教程—参数化查询
    思达报表工具Style Report基础教程—公式表
    思达报表工具Style Report基础教程—交叉表
    思达报表工具Style Report基础教程—分组表
    思达报表工具Style Report基础教程—创建一个带条件的查询
    思达报表工具Style Report基础教程—查询
    思达报表工具Style Report基础教程—参数表单
    .net 详解异步编程
    使用NPOI导入导出Excel
  • 原文地址:https://www.cnblogs.com/yyy-blog/p/10856477.html
Copyright © 2011-2022 走看看