代码改变世界
[登录 · 注册]
  • spark RDD pipe 调用外部脚本
  • pipe(command, [envVars])
    对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    scala> val rdd = sc.makeRDD(List("wangguo","yangxiu","xiaozhou","kangkang"),3)
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at makeRDD at <console>:24

    scala> rdd.pipe("/opt/test/spark/pipe.sh").collect
    res4: Array[String] = Array(wangcen, wangguohehe, wangcen, yangxiuhehe, wangcen, xiaozhouhehe, kangkanghehe)

    scala> val rdd = sc.makeRDD(List("wangguo","yangxiu","xiaozhou","kangkang"),4)
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at makeRDD at <console>:24

    scala> rdd.pipe("/opt/test/spark/pipe.sh").collect
    res5: Array[String] = Array(wangcen, wangguohehe, wangcen, yangxiuhehe, wangcen, xiaozhouhehe, wangcen, kangkanghehe)

    使用Spark Pipe来给你的既有分析任务提速

    有同学问我,怎么用Spark来调用外部程序,我想到了pipe可以做这个事情。文章封面图就是PySpark的实现方案,其中就用到了pipe这个机制。

    同学的需求和问题如下:

    1. 他有2万个文件,每个10G,放在HDFS上了,总量200TB的数据需要分析。
    2. 分析程序本身已经写好了,程序接受一个参数:文件路径
    3. 如何用spark完成集群整个分析任务?

    我灵机一动 想到了Spark Pipe 应该可以完成。查了一下资料,归纳和整理如下:

    总的来说就是Spark有一个pipe的编程接口,用的是Unix的标准输入和输出,类似于 Unix的 | 管道,例如: ls | grep ^d

    第一步:创建RDD

    这一个步骤主要是罗列输入的任务,即,包含哪些文件。

    // 此处文件的List可以从另一个HDFS上的文件读取过来
    val data = List("hdfs://xxx/xxx/xxx/1.txt","hdfs://xxx/xxx/xxx/2.txt",...)
    val dataRDD = sc.makeRDD(data) //sc 是你的 SparkContext

    第二步:创建一个Shell脚本启动分析任务

    我们已经有了RDD了,那么接下来写一个启动launch.sh脚本来启动我们的分析程序

    #!/bin/sh
    echo "Running launch.sh shell script..."
    while read LINE; do
       echo "启动分析任务, 待分析文件路径为: ${LINE}"
       bash hdfs://xxx/xxx/xx/analysis_program.sh ${LINE}
    done

    第三步:RDD对接到启动脚本

    下面的步骤就是整合步骤了

    val scriptPath = "hdfs://xxx/xxx/launch.sh"
    val pipeRDD = dataRDD.pipe(scriptPath)
    pipeRDD.collect()

     

    总结一下,

    1. dataRDD里面包含了我们要分析的文件列表,这个列表会被分发到spark集群,
    2. 然后spark的工作节点会分别启动一个launch.sh脚本,接受文件列表作为输入参数,
    3. launch.sh脚本的循环体用这些文件列表启动具体的分析任务

    这样之后的好处是:

    1. 既有程序analysis_program.sh 不需要任何修改,做到了重用,这是最大的好处
    2. 使用集群来做分析,速度比以前更快了(线性提升)
    3. 提高了机器的利用率(以前可能是一台机器分析)

     

    附:如何用ansible 搭建一个standalone的spark集群 

  • 上一篇:默克尔树(merkle tree)——就是hash树,比特币区块链里用于校验完整性的
    下一篇:AIDE(高级入侵检测环境)——就是讲文件的hash值存到db中,然后比较是否被篡改过
  • 【推广】 阿里云小站-上云优惠聚集地(新老客户同享)更有每天限时秒杀!
    【推广】 云服务器低至0.95折 1核2G ECS云服务器8.1元/月
    【推广】 阿里云老用户升级四重礼遇享6.5折限时折扣!
  • 原文:https://www.cnblogs.com/bonelee/p/13089297.html
走看看 - 开发者的网上家园