zoukankan      html  css  js  c++  java
  • sparksql系列(九) spark多job提交,spark多目录处理

    在生产环境中遇到了这种情况:spark程序需要处理输入是多个目录,输出也是多个目录。但是处理的逻辑都是相同的。
    使用方法经历多次修改,最终成功完成任务。其中涉及到spark多job提交和spark多个目录同时处理,在此记录一下。
    程序中所有异常处理,建议都在函数里面处理好,不要直接写一些处理代码。这样的话直接调用函数就行。

    方法一:for直接上

    典型的:减少使用资源,拉长运行时间

    代码

    递进程序

    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()

    val dataArray = "10450013、3593084 、13568083、32456733、78783125、5689865、23459806".split("、")
    for(index <- dataArray){
    val path = "/data/"+index
    sparkSession.read.json(path).//逻辑代码
    }

    优点

    1.使用最少的资源

    2.前后有依赖的任务适合

    缺点

    1.运行时间最长

    方法二:多个job同时运行

    使用多倍的资源,减少运行的时间。这种方法在sparkUI界面上看是同时有多个job在运行的。

    代码

    递进程序

    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()

    val dataArray = "10450013、3593084 、13568083、32456733、78783125、5689865、23459806".split("、")
    val executorService = Executors.newFixedThreadPool(5)

    优点

    1.运行时间是方法一的1/N。

    缺点

    1.使用的资源是方法一的N倍。

    2.没有办法确定运行结束的顺序

    3.前后有依赖的任务不适合

    方法三:partationby函数

    生成的目录是根据

    partitionBy("colmn")的值确定的,也可以写多个partitionBy("colmn"),这样就生成了多级目录

    代码

    val dataArray = "10450013、3593084 、13568083、32456733、78783125、5689865、23459806".split("、")

    sparkSession.read.json(dataArray.map(x=>("/data/"+x)):_*)
    .write.partitionBy("colmn").csv("")

    使用范围

    最好能确定多个文件之间的大小差不多,要不然容易文件倾斜。

  • 相关阅读:
    HNOI 2006 BZOJ 1195 最短母串
    BZOJ 3029 守卫者的挑战
    Codeforces 401D Roman and Numbers
    ZJOI2010 数字计数
    BZOJ 3329 Xorequ
    Codeforces 235 C
    SPOJ 8222 Substrings
    BZOJ 1396 识别子串
    (模板)归并排序
    poj3122 Pie (二分)
  • 原文地址:https://www.cnblogs.com/wuxiaolong4/p/13196602.html
Copyright © 2011-2022 走看看