pipe(command, [envVars])
对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD
1
|
|
使用Spark Pipe来给你的既有分析任务提速
专业:计算机。
有同学问我,怎么用Spark来调用外部程序,我想到了pipe可以做这个事情。文章封面图就是PySpark的实现方案,其中就用到了pipe这个机制。
同学的需求和问题如下:
- 他有2万个文件,每个10G,放在HDFS上了,总量200TB的数据需要分析。
- 分析程序本身已经写好了,程序接受一个参数:文件路径
- 如何用spark完成集群整个分析任务?
我灵机一动 想到了Spark Pipe 应该可以完成。查了一下资料,归纳和整理如下:
总的来说就是Spark有一个pipe的编程接口,用的是Unix的标准输入和输出,类似于 Unix的 |
管道,例如: ls | grep ^d
第一步:创建RDD
这一个步骤主要是罗列输入的任务,即,包含哪些文件。
// 此处文件的List可以从另一个HDFS上的文件读取过来
val data = List("hdfs://xxx/xxx/xxx/1.txt","hdfs://xxx/xxx/xxx/2.txt",...)
val dataRDD = sc.makeRDD(data) //sc 是你的 SparkContext
第二步:创建一个Shell脚本启动分析任务
我们已经有了RDD了,那么接下来写一个启动launch.sh
脚本来启动我们的分析程序
#!/bin/sh
echo "Running launch.sh shell script..."
while read LINE; do
echo "启动分析任务, 待分析文件路径为: ${LINE}"
bash hdfs://xxx/xxx/xx/analysis_program.sh ${LINE}
done
第三步:RDD对接到启动脚本
下面的步骤就是整合步骤了
val scriptPath = "hdfs://xxx/xxx/launch.sh"
val pipeRDD = dataRDD.pipe(scriptPath)
pipeRDD.collect()
总结一下,
dataRDD
里面包含了我们要分析的文件列表,这个列表会被分发到spark集群,- 然后spark的工作节点会分别启动一个
launch.sh
脚本,接受文件列表作为输入参数, - 在
launch.sh
脚本的循环体用这些文件列表启动具体的分析任务
这样之后的好处是:
- 既有程序
analysis_program.sh
不需要任何修改,做到了重用,这是最大的好处 - 使用集群来做分析,速度比以前更快了(线性提升)
- 提高了机器的利用率(以前可能是一台机器分析)
附:如何用ansible 搭建一个standalone的spark集群 https://github.com/lresende/ansible-spark-cluster#deploying-spark-standalone