大数据实验(七)Spark单机安装及WordCount(TopKey)
前置环境
Ubuntu 16.04
hadoop 2.7.3
Spark 3.0.0-preview2
scala 2.12.10 (对应Spark版本)
maven 3+
spark-core_2.12 (maven中开发Spark所用的依赖)
一、Spark安装
001、下载Spark
在Apache官网找到合适的版本进行下载,下载了Spark 3.0.0-preview2。
002、安装
将Spark 3.0.0-preview2解压到/usr/local下,改名为spark
003、配置环境变量
sudo vim ~/.bashrc
加入位置
# Spark Environment
export SPARK_HOME=/usr/local/spark
export PATH=${SPARK_HOME}/bin:$PATH
source ~/.bashrc
二、测试及提交
在/usr/local下建立一个用于测试的123.txt文件。
001、local模式运行spark
#本地模式,使用两个核
spark-shell --master local[2]
可以看到信息如下:
002、使用交互式环境测试WordCount
#读取文件
scala> val file = spark.sparkContext.textFile("file:///usr/local/123.txt")
#逻辑处理
scala> val wordCounts = file.flatMap(line => line.split(",")).map((word => (word, 1))).reduceByKey(_ + _)
# 得到结果
scala> val res= wordCounts.collect
#遍历打印结果
scala> res.foreach(println)
003、在IDEA中编写TopKey程序并提交到Spark本地运行
实际开发流程,使用ide编写程序,然后提交到Spark环境。
spark-submit 提交命令,可以提交到各种模式。
1、开发准备
1、使用IDE开发工具开发程序,打成jar包,提交给Spark运行。
使用maven打包、开发需要以下依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0-preview2</version>
</dependency>
2、从以下文件中找出词频,并取出Top5
java hadoop spark
spark hadoop java hello
hive mysql
java hadoop spark
spark hadoop java hello
hive mysql
hadoop Spark hive
Spark Flink hadoop
java scala hadoop
SparkHadoopJava
2、Scala程序
在IDEA中编写程序
//测试环境中,全部写死参数
object TopKey {
def main(args: Array[String]): Unit = {
//指定AppName和运行模式
val conf:SparkConf=new SparkConf().setAppName("Top5")
.setMaster("local[3]")
val sparkContext = new SparkContext(conf)
//此处使用的是本地文件,实际生产中使用参数替代
val lines: RDD[String] = sparkContext.textFile("/SparkDemo.txt")
val text=lines.flatMap(_.split("\s+")).map((_,1)).reduceByKey((_+_))
//将kv交换,然后排序,取出前5,循环输出
text.map(_.swap).sortByKey(ascending = false).take(5).foreach(println)
}
}
//在测试环境中,直接输出,得出结果
(7,hadoop)
(5,java)
(4,spark)
(3,hive)
(2,Spark)
3、打包并提交
实际开发中的模式,参数不写死,提交的时候指定参数。
打成Jar包后,把jar包提交到Hdfs中,Jar打包后为Top5.jar.
object TopKey {
def main(args: Array[String]): Unit = {
//提交时候指定参数
val conf:SparkConf=new SparkConf().setAppName(s"${args(0)}")
//.setMaster("local[3]")
val sparkContext = new SparkContext(conf)
val lines: RDD[String] = sparkContext.textFile(s"${args(1)}")
val text=lines.flatMap(_.split("\s+")).map((_,1)).reduceByKey((_+_))
//这里的参数5也可以指定
text.map(_.swap).sortByKey(ascending = false).take(5).foreach(println)
}
}
将数据都传到hdfs中。
hdfs dfs -mkdir /user/spark
hdfs dfs -put SparkDemo.txt /user/spark
hdfs dfs -put Top5.jar /user/spark
4、提交程序
#使用spark-submit提交程序,这里提交到本地2核运行,主类是com.TopKey
spark-submit --master local[2] --class com.TopKey
hdfs://hdfs1:9000/user/spark/Top5.jar #之后是Jar包地址
Top5 hdfs://hdfs1:9000/user/spark/SparkDemo.txt #这里是参数