案例实操
Spark Shell 仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在 IDE
中编制程序,然后打成 jar 包,然后提交到集群,最常用的是创建一个 Maven 项目,利用
Maven 来管理 jar 包的依赖。
1 编写 WordCount 程序
1)创建一个 Maven 项目 WordCount 并导入依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.lxl</groupId> <artifactId>spark02</artifactId> <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <modules> <module>sparkCore</module> </modules> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2)编写代码
package com.lxl import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { /* 1.创建配置信息 */ val conf = new SparkConf().setAppName("wc") /* 2.创建sparkcontext */ val sc = new SparkContext(conf) /* 3.处理 */ //读取数据 val lines = sc.textFile(args(0)) //传入路径 //压平 flatMap val words = lines.flatMap(_.split(" ")) //map(word,1) val k2v = words.map((_, 1)) //resuceBykey(word, x) val result = k2v.reduceByKey(_ + _) //输出,展示 //result.collect() //保存数据到文件 result.saveAsTextFile(args(1)) //传入的保存文件的目录 //关闭连接 sc.stop() } }
3)打包插件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spark01</artifactId> <groupId>com.atlxl</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>sparkCore</artifactId> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <archive> <manifest> <mainClass>WordCount</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
4)打包到集群测试
先将 jar 包拷贝到 spark 的家目录下,改名为wordcount。
[lxl@hadoop102 spark]$ mv sparkCore-1.0-SNAPSHOT.jar wordcount.jar
[lxl@hadoop102 spark]$ bin/spark-submit --class com.lxl.WordCount --master spark://hadoop102:7077 --executor-memory 1G --total-executor-cores 2 ./wordcount.jar hdfs://hadoop102:9000/fruit.tsv hdfs://hadoop102:9000/out
2 本地调试
本地 Spark 程序调试需要使用 local 提交模式,即将本机当做运行环
境,Master 和 Worker 都为本机。运行时直接加断点调试即可。如下:
创建 SparkConf 的时候设置额外属性,表明本地执行:
val conf = new SparkConf().setAppName("WC").setMaster("local[*]")
完整代码:(只需将 WordCount 第二步的代码稍作修改即可)
package com.lxl import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { /* 1.创建配置信息 */ val conf = new SparkConf().setAppName("wc").setMaster("local[*]") /* 2.创建sparkcontext */ val sc = new SparkContext(conf) /* 3.处理 */ //读取数据 val lines = sc.textFile("C:\Users\67001\Desktop\word.txt") //本地路径 //压平 flatMap val words = lines.flatMap(_.split(" ")) //map(word,1) val k2v = words.map((_, 1)) //resuceBykey(word, x) val result = k2v.reduceByKey(_ + _) //输出,展示 // result.collect() //保存数据 // result.saveAsTextFile(args(1)) //打印到控制台 result.foreach(println) //关闭连接 sc.stop() } }
3 远程调试
通过 IDEA 进行远程调试,主要是将 IDEA 作为 Driver 来提交应用程序,
配置过程如下:
修改 sparkConf,添加最终运行的 Jar 包、Driver 程序的地址,
并设置 Master 的提交地址:
val conf = new SparkConf().setAppName("wc").setMaster("spark://hadoop102:7077") .setJars(List("D:\Workspace\IDEA_work\Spark_Work\spark02\sparkCore\target\sparkCore-1.0-SNAPSHOT.jar"))
完整代码:
package com.lxl import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { /* 1.创建配置信息 */ val conf = new SparkConf().setAppName("wc").setMaster("spark://hadoop102:7077") .setJars(List("D:\Workspace\IDEA_work\Spark_Work\spark02\sparkCore\target\sparkCore-1.0-SNAPSHOT.jar")) /* 2.创建sparkcontext */ val sc = new SparkContext(conf) /* 3.处理 */ //读取数据 val lines = sc.textFile("hdfs://hadoop102:9000/fruit.tsv") //HDFS路径 //压平 flatMap val words = lines.flatMap(_.split(" ")) //map(word,1) val k2v = words.map((_, 1)) //resuceBykey(word, x) val result = k2v.reduceByKey(_ + _) //输出,展示 // result.collect() //保存数据 result.saveAsTextFile("hdfs://hadoop102:9000/out1") //保存到HDFS的路径 //打印到控制台 // result.foreach(println) //关闭连接 sc.stop() } }