官网对Spark的介绍
http://spark.apache.org/
Apache Spark™ is a unified analytics engine for large-scale data processing
Lightning-fast cluster computing。
快如闪电的集群计算。
大规模快速通用的计算引擎。
速度: 比hadoop 100x,磁盘计算快10x
使用: java / Scala /R /python
提供80+算子(操作符),容易构建并行应用。
通用: 组合SQL ,流计算 + 复杂分析。
运行: Hadoop, Mesos, standalone, or in the cloud,local.
Spark的模块
Spark core //核心模块
Spark SQL //SQL
Spark Streaming //流计算
Spark MLlib //机器学习
Spark graph //图计算
DAG //direct acycle graph,有向无环图。
Spark的安装
1.下载spark-2.2.1-bin-hadoop2.7.tgz
..
2.解压
..
3.环境变量
[/etc/profile]
SPARK_HOME=/soft/spark
PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
[source]
$>source /etc/profile
4.验证spark
$>cd /soft/spark
$>./spark-shell
5.webui
http://localhost:4040/
Spark的初体验
0.sc
SparkContext,Spark程序的入口点,封装了整个spark运行环境的信息。
1.进入spark-shell
$>spark-shell
$scala>sc
[SparkContext]
Spark程序的入口点,封装了整个spark运行环境的信息。
[RDD]
resilient distributed dataset,弹性分布式数据集。等价于集合。
1 spark实现word count
------------------------
//加载文本文件,以换行符方式切割文本.Array(hello world2,hello world2 ,...)
val rdd1 = sc.textFile("/home/test.txt");
//单词统计1
$scala>val rdd1 = sc.textFile("/home/centos/test.txt")
$scala>val rdd2 = rdd1.flatMap(line=>line.split(" "))
$scala>val rdd3 = rdd2.map(word = > (word,1))
$scala>val rdd4 = rdd3.reduceByKey(_ + _)
$scala>rdd4.collect
//单词统计2
sc.textFile("/home/test.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect
//统计所有含有wor字样到单词个数。filter
//过滤单词
sc.textFile("/home/centos/test.txt").flatMap(_.split(" ")).filter(_.contains("wor")).map((_,1)).reduceByKey(_ + _).collect
2 编程实现wordcount
依赖
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
开发scala程序
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Administrator on 2017/4/20.
*/
object WordCountDemo {
def main(args: Array[String]): Unit = {
//创建Spark配置对象
val conf = new SparkConf();
conf.setAppName("WordCountSpark")
//设置master属性
conf.setMaster("local") ;
//通过conf创建sc
val sc = new SparkContext(conf);
//加载文本文件
val rdd1 = sc.textFile("d:/scala/test.txt");
//压扁
val rdd2 = rdd1.flatMap(line => line.split(" ")) ;
//映射w => (w,1)
val rdd3 = rdd2.map((_,1))
val rdd4 = rdd3.reduceByKey(_ + _)
val r = rdd4.collect()
r.foreach(println)
}
}
提交作业到spark集群运行
1.导出jar包
2.spark-submit提交命令运行job
//Scala版本
$>spark-submit --master local --name MyWordCount --class com.it18zhang.spark.scala.WordCountScala SparkDemo1-1.0-SNAPSHOT.jar /home/centos/test.txt
//java版
$>spark-submit --master local --name MyWordCount --class com.it18zhang.spark.java.WordCountJava SparkDemo1-1.0-SNAPSHOT.jar /home/centos/test.txt
集群模式
1.local
nothing!
spark-shell --master local; //默认
2.standalone
独立。
a)复制spark目录到其他主机
b)配置其他主机的所有环境变量
[/etc/profile]
SPARK_HOME
PATH
c)配置master节点的slaves
[/soft/spark/conf/slaves]
s202
s203
s204
d)启动spark集群
/soft/spark/sbin/start-all.sh
e)查看进程
$>xcall.jps jps
master //s201
worker //s202
worker //s203
worker //s204
e)webui
http://s201:8080/
提交作业jar到完全分布式spark集群
--------------------------------
1.需要启动hadoop集群(只需要hdfs)
$>start-dfs.sh
2.put文件到hdfs.
3.运行spark-submit
$>spark-submit
--master spark://s201:7077
--name MyWordCount
--class com.it18zhang.spark.scala.WordCountScala
SparkDemo1-1.0-SNAPSHOT.jar
hdfs://s201:8020/user/centos/test.txt
脚本分析
-----------------------
[start-all.sh]
sbin/spark-config.sh
sbin/spark-master.sh //启动master进程
sbin/spark-slaves.sh //启动worker进程
[start-master.sh]
sbin/spark-config.sh
org.apache.spark.deploy.master.Master
spark-daemon.sh start org.apache.spark.deploy.master.Master --host --port --webui-port ...
[spark-slaves.sh]
sbin/spark-config.sh
slaves.sh //conf/slaves
[slaves.sh]
for conf/slaves{
ssh host start-slave.sh ...
}
[start-slave.sh]
CLASS="org.apache.spark.deploy.worker.Worker"
sbin/spark-config.sh
for (( .. )) ; do
start_instance $(( 1 + $i )) "$@"
done
$>cd /soft/spark/sbin
$>./stop-all.sh //停掉整个spark集群.
$>./start-master.sh //停掉整个spark集群.
$>./start-master.sh //启动master节点
$>./start-slaves.sh //启动所有worker节点