1. Spark集群安装
1.1 机器部署
准备两台以上Linux服务器,安装好JDK
1.2 下载Spark安装包
提前到官网下载相应版本并上传spark-安装包到Linux上
解压安装包到指定位置
tar -zxvf spark-2.1.0-bin-hadoop2.6.tgz -C /usr/local
1.3 配置Spark
进入到Spark安装目录
cd /usr/local/spark-2.1.0-bin-hadoop2.6
进入conf目录并重命名并修改spark-env.sh.template文件
cd conf/ mv spark-env.sh.template spark-env.sh vi spark-env.sh
在该配置文件中添加如下配置
export JAVA_HOME=/usr/java/jdk1.8.0_111 #export SPARK_MASTER_IP=node1.edu360.cn #export SPARK_MASTER_PORT=7077
保存退出
重命名并修改slaves.template文件
mv slaves.template slaves vi slaves
在该文件中添加子节点所在的位置(Worker节点)
node2.edu360.cn
node3.edu360.cn
node4.edu360.cn
保存退出
将配置好的Spark拷贝到其他节点上
scp -r spark-2.1.0-bin-hadoop2.6/ node2.edu360.cn:/usr/local/ scp -r spark-2.1.0-bin-hadoop2.6/ node3.edu360.cn:/usr/local/ scp -r spark-2.1.0-bin-hadoop2.6/ node4.edu360.cn:/usr/local/
Spark集群配置完毕,目前是1个Master,3个Work,在node1.edu360.cn上启动Spark集群
/usr/local/spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh
启动后执行jps命令,主节点上有Master进程,其他子节点上有Work进行,登录Spark管理界面查看集群状态(主节点):http://node1.edu360.cn:8080/
到此为止,Spark集群安装完毕,但是有一个很大的问题,那就是Master节点存在单点故障,要解决此问题,就要借助zookeeper,并且启动至少两个Master节点来实现高可靠,配置方式比较简单:
Spark集群规划:node1,node2是Master;node3,node4,node5是Worker
安装配置zk集群,并启动zk集群
停止spark所有服务,修改配置文件spark-env.sh,在该配置文件中删掉SPARK_MASTER_IP并添加如下配置
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=zk1,zk2,zk3 -Dspark.deploy.zookeeper.dir=/spark"
1.在node1节点上修改slaves配置文件内容指定worker节点
2.在node1上执行sbin/start-all.sh脚本,然后在node2上执行sbin/start-master.sh启动第二个Master
2. 执行Spark程序
2.1 执行第一个spark程序
/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://node1.edu360.cn:7077 --executor-memory 1G --total-executor-cores 2 /usr/local/spark-2.1.0-bin-hadoop2.6/lib/spark-examples-2.1.0-hadoop2.6.0.jar 100
该算法是利用蒙特·卡罗算法求PI
2.2 启动Spark Shell
spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。
2.2.1 启动spark shell
/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-shell --master spark://node1.edu360.cn:7077 --executor-memory 2g --total-executor-cores 2
参数说明:
--master spark://node1.edu360.cn:7077 指定Master的地址 --executor-memory 2g 指定每个worker可用内存为2G --total-executor-cores 2 指定整个集群使用的cup核数为2个
注意:如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可.
2.2.2 在spark shell中编写WordCount程序
1.首先启动hdfs
2.向hdfs上传一个文件到hdfs://node1.edu360.cn:9000/words.txt
3.在spark shell中用scala语言编写spark程序
sc.textFile("hdfs://node1.edu360.cn:9000/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://node1.edu360.cn:9000/out")
4.使用hdfs命令查看结果
hdfs dfs -ls hdfs://node1.edu360.cn:9000/out/p*
说明:
sc是SparkContext对象,该对象时提交spark程序的入口
textFile(hdfs://node1.edu360.cn:9000/words.txt)是hdfs中读取数据
flatMap(_.split(" "))先map在压平
map((_,1))将单词和1构成元组
reduceByKey(_+_)按照key进行reduce,并将value累加
saveAsTextFile("hdfs://node1.edu360.cn:9000/out")将结果写入到hdfs中
1.3. 在IDEA中编写WordCount程序
spark shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。
1.创建一个项目
2.选择Maven项目,然后点击next
3.填写maven的GAV,然后点击next
4.填写项目名称,然后点击finish
5.创建好maven项目后,点击Enable Auto-Import
6.配置Maven的pom.xml

1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>learn.spark</groupId> 8 <artifactId>SparkDemo</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <properties> 12 <maven.compiler.source>1.8</maven.compiler.source> 13 <maven.compiler.target>1.8</maven.compiler.target> 14 <scala.version>2.11.8</scala.version> 15 <spark.version>2.2.0</spark.version> 16 <hadoop.version>2.6.5</hadoop.version> 17 <encoding>UTF-8</encoding> 18 </properties> 19 20 <dependencies> 21 <!-- 导入scala的依赖 --> 22 <dependency> 23 <groupId>org.scala-lang</groupId> 24 <artifactId>scala-library</artifactId> 25 <version>${scala.version}</version> 26 </dependency> 27 28 <!-- 导入spark的依赖 --> 29 <dependency> 30 <groupId>org.apache.spark</groupId> 31 <artifactId>spark-core_2.11</artifactId> 32 <version>${spark.version}</version> 33 </dependency> 34 35 <!-- 指定hadoop-client API的版本 --> 36 <dependency> 37 <groupId>org.apache.hadoop</groupId> 38 <artifactId>hadoop-client</artifactId> 39 <version>${hadoop.version}</version> 40 </dependency> 41 42 </dependencies> 43 44 <build> 45 <pluginManagement> 46 <plugins> 47 <!-- 编译scala的插件 --> 48 <plugin> 49 <groupId>net.alchim31.maven</groupId> 50 <artifactId>scala-maven-plugin</artifactId> 51 <version>3.2.2</version> 52 </plugin> 53 <!-- 编译java的插件 --> 54 <plugin> 55 <groupId>org.apache.maven.plugins</groupId> 56 <artifactId>maven-compiler-plugin</artifactId> 57 <version>3.5.1</version> 58 </plugin> 59 </plugins> 60 </pluginManagement> 61 <plugins> 62 <plugin> 63 <groupId>net.alchim31.maven</groupId> 64 <artifactId>scala-maven-plugin</artifactId> 65 <executions> 66 <execution> 67 <id>scala-compile-first</id> 68 <phase>process-resources</phase> 69 <goals> 70 <goal>add-source</goal> 71 <goal>compile</goal> 72 </goals> 73 </execution> 74 <execution> 75 <id>scala-test-compile</id> 76 <phase>process-test-resources</phase> 77 <goals> 78 <goal>testCompile</goal> 79 </goals> 80 </execution> 81 </executions> 82 </plugin> 83 84 <plugin> 85 <groupId>org.apache.maven.plugins</groupId> 86 <artifactId>maven-compiler-plugin</artifactId> 87 <executions> 88 <execution> 89 <phase>compile</phase> 90 <goals> 91 <goal>compile</goal> 92 </goals> 93 </execution> 94 </executions> 95 </plugin> 96 97 98 <!-- 打jar插件 --> 99 <plugin> 100 <groupId>org.apache.maven.plugins</groupId> 101 <artifactId>maven-shade-plugin</artifactId> 102 <version>2.4.3</version> 103 <executions> 104 <execution> 105 <phase>package</phase> 106 <goals> 107 <goal>shade</goal> 108 </goals> 109 <configuration> 110 <filters> 111 <filter> 112 <artifact>*:*</artifact> 113 <excludes> 114 <exclude>META-INF/*.SF</exclude> 115 <exclude>META-INF/*.DSA</exclude> 116 <exclude>META-INF/*.RSA</exclude> 117 </excludes> 118 </filter> 119 </filters> 120 </configuration> 121 </execution> 122 </executions> 123 </plugin> 124 </plugins> 125 </build> 126 127 </project>
7.新建一个scala class,类型为Object
8.编写spark程序

1 package spark.scala 2 3 import org.apache.spark.rdd.RDD 4 import org.apache.spark.{SparkConf, SparkContext} 5 6 object ScalaWordCount { 7 8 def main(args: Array[String]): Unit = { 9 // 创建spark配置,设置应用程序名字 10 //val conf = new SparkConf().setAppName("ScalaWordCount") 11 val conf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[*]") 12 // 创建spark执行入口 13 val sc = new SparkContext(conf) 14 // 指定以后从哪儿读取数据创建RDD(弹性分布式数据集) 15 // sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2, false).saveAsTextFile(args(1)) 16 17 val line: RDD[String] = sc.textFile(args(0)) 18 // 切分压平 19 val words: RDD[String] = line.flatMap(_.split(" ")) 20 // 将单词和1组合 21 val wordAndOne: RDD[(String, Int)] = words.map((_, 1)) 22 // 按照key进行聚合 23 val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) 24 // 排序 25 val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) 26 // 将结果保存到HDFS中 27 sorted.saveAsTextFile(args(1)) 28 // 释放资源 29 sc.stop() 30 } 31 }
9.使用Maven打包:首先修改pom.xml中的main class
点击idea右侧的Maven Project选项;点击Lifecycle,选择clean和package,然后点击Run Maven Build
10.选择编译成功的jar包,并将该jar上传到Spark集群中的某个节点上
11.首先启动hdfs和Spark集群
启动hdfs
/usr/local/hadoop-2.6.5/sbin/start-dfs.sh
启动spark
/usr/local/spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh
12.使用spark-submit命令提交Spark应用(注意参数的顺序)
/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --class cn.itcast.spark.WordCount --master spark://node1.edu360.cn:7077 --executor-memory 2G --total-executor-cores 4 /root/spark-mvn-1.0-SNAPSHOT.jar hdfs://node1.edu360.cn:9000/words.txt hdfs://node1.edu360.cn:9000/out
查看程序执行结果
hdfs dfs -cat hdfs://node1.edu360.cn:9000/out/part-00000