Spark 学习笔记 (一): 初探Spark 程序设计之RDD
本文主要介绍Spark基本数据结构RDD的原理和使用,以及搭建了基于Docker的Spark集群开发测试环境,最后给出了几个实际程序例子,算是Saprk入了门:)
一、 RDD
RDD是Spark中最核心的概念
1.初识RDD --- Resilient Distributed Datasets 弹性分布式数据集
-
数据集:RDD是数据集合的抽象,分布在集群中的只读对象的集合
- 一个RDD由多个Partition构成,也就是一个RDD被分区成Partiton存在不同节点上
- 一个Partition可以存储在此磁盘或内存中
- 通过并行transform操作进行一个RDD到另一个RDD的转换
-
分布式:
- Partition 是分布式存储的
- 数据的计算是多个节点协同计算得到的
-
弹性:RDD 可以在不改变内部存储数据记录的前提下,去调整并行计算计算单元的划分结构,弹性这一特性,也是为并行计算服务的
-
容错性:分布式的一般问题是需要具有容错性,那么RDD本身是具有容错性的,
RDD 内部的数据集合在逻辑上和物理上被划分成多个小子集合Partition,这样的每一个子集合我们将其称为分区,分区的个数会决定并行计算的粒度,而每一个分区数值的计算都是在一个单独的任务Task中进行,因此并行任务的个数,也是由 RDD分区的个数决定的
Partition -> Task
2. Spark运行模式
先放个图,看下Spark整体程序是怎么执行的
-
整个集群分为 Master 节点和 Worker 节点,相当于 Hadoop 的 Master 和 Slave 节点
-
Master 节点上常驻 Master 守护进程,负责管理全部的 Worker 节点
-
Worker 节点上常驻 Worker 守护进程,负责与 Master 节点通信并管理 executors
-
Driver 官方解释是 “The process running the main() function of the application and creating the SparkContext”。 Application 就是用户自己写的 Spark 程序(driver program)
-
每个 Worker 上存在一个或者多个 ExecutorBackend 进程。每个进程包含一个 Executor 对象,该对象持有一个线程池,每个线程可以执行一个 task。
3.Spark程序设计
-
Scala基础
- 用函数式编程的方式可以很方便处理集合:
var list = List(1, 2, 3) list.foreach(println)
Spark 的RDD,封装了各种类似于Scala集合的算子
map、filter、reduce
等,且都是分布式执行的
-
Spark程序设计基本流程
-
- 创建SparkContext对象:定义了Spark执行环境和配置参数;注意每个Spark程序有且仅有一个SparkContext
-
- 创建RDD:从Scala集合或者在Hadoops数据集上创建
- (1) 从Scala集合映射成RDD:
sc.parallelize()
创建,第二个参数是Partition数目
val slices = 10 //Partition数目,即并行的task数目启动10个map task进行处理 val n = 100000 * slices val count = sc.parallelize(1 to n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 (x*x + y*y < 1) 1 else 0 }.reduce(_ + _)
-
(2) 将本地文件/HDFS文件映射成RDD:
- 文本文件:
sc.textFile("tile.txt") //将本地文本文件加载成RDD sc.textFile(“hdfs://nn:9000/path/file”) //hdfs文件或目录
- sequenceFile文件:
sc.sequenceFile(“file.txt”) //将本地二进制文件加载成RDD
- 使用任意自定义的Hadoop InputFormat
sc.hadoopFile(path, inputFmt, keyClass, valClass)
- 读取HDFS创建RDD:
inputRdd = sc.textFile(“hdfs:///data/input”) inputRdd = sc.textFile(“hdfs://namenode:8020/data/input”)
HDFS的datanode的Block和Spark数据的partiton是一一映射的,也和task一一映射,也就是下图所示的就会启动5个task
- 读取HBase创建RDD:
import org.apache.spark._ import org.apache.hadoop.hbase.mapreduce.TableInputFormat //创建SparkContext val sparkConf = new SparkConf() val sc = new SparkContext(conf ) // 设置hbase configuration val hbaseConf = HBaseConfiguration.create() hbaseConf.addResource(new Path(“hbase-site.xml")) hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName) //创建hbase RDD val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result]) //获取总行数 al count = hBaseRDD.count()
-
- 在RDD上进行转换transformation和action
-
Transformation:将一个RDD通过一种规则,映射成另一种RDD;
Action: 返回结果或者保存结果,只有action才会触发程序的执行,注意Spark中遇到action的时候计算才会去分布式执行在 Spark 中,所有的转换(transformations)都是惰性(lazy)的,它们不会马上计算它们的结果。相反的,它们仅仅记录转换操作是应用到哪些基础数据集(例如一个文件)上的。转换仅仅在这个时候计算:当动作(action) 需要一个结果返回给驱动程序的时候。这个设计能够让 Spark 运行得更加高效。例如,我们可以实现:通过 map 创建一个新数据集在 reduce 中使用,并且仅仅返回 reduce 的结果给 driver,而不是整个大的映射过的数据集。
- 常见的操作集合:
- 常见的操作集合:
-
Transformation函数例子:
//创建RDD val listRdd = sc.parallelize(List(1, 2, 3), 3) // 将RDD传入函数,生成新的RDD val squares = listRdd.map(x => x*x) // {1, 4, 9} // 对RDD中元素进行过滤,生成新的RDD val even = squares.filter(_ % 2 == 0) // {4} // 将一个元素映射成多个,生成新的RDD nums.flatMap(x => 1 to x) // => {1, 1, 2, 1, 2, 3}
- Action函数例子:
//创建新的RDD val nums = sc.parallelize(List(1, 2, 3), 2) // 将RDD保存为本地集合(返回到driver端) nums.collect() // => Array(1, 2, 3) // 返回前K个元素 nums.take(2) // => Array(1, 2) // 计算元素总数 nums.count() // => 3 // 合并集合元素 nums.reduce(_ + _) // => 6 // 将RDD写到HDFS中 nums.saveAsTextFile(“hdfs://nn:8020/output”) nums.saveAsSequenceFile(“hdfs://nn:8020/output”)
- Key/Value类型的RDD的操作
val pets = sc.parallelize( List((“cat”, 1), (“dog”, 1), (“cat”, 2))) pets.reduceByKey(_ + _) // => {(cat, 3), (dog, 1)} pets.groupByKey() // => {(cat, Seq(1, 2)), (dog, Seq(1)} pets.sortByKey() // => {(cat, 1), (cat, 2), (dog, 1)}
reduceByKey自动在map端进行本地combine
- 控制ReduceTasks数目:有一个参数执行并发度:
words.reduceByKey(_ + _, 5)
用户也可以通过修改spark.default.parallelism设置默认并行度
默认并行度为最初的RDD partition数目
留一个思考问题:那么这些操作都是怎么分布式执行的呢?
-
- 返回结果:保存到HFDS或者Hive或者HBase
- 将结果保存的HBase:
-
4. 其他RDD操作
-
Sample()
从数据集采样 -
union()
合并多个RDD -
cartesian
求笛卡尔积 -
共享变量:Accumulators和Broadcast Variables
一般来说上述的操作都是对数据在远端worker node上拷贝的数据进行操作,对数据的效果并不会回传
-
Accumulator
(累加器,计数器)-
类似于MapReduce中的counter,将数据从一个节点发送到其他各个节点上去;
-
通常用于监控,调试,记录符合某类特征的数据数目等
import SparkContext._ val total_counter = sc.accumulator(0L, "total_counter") val counter0 = sc.accumulator(0L, "counter0") val counter1 = sc.accumulator(0L, "counter1") val count = sc.parallelize(1 to n, slices).map { i => total_counter += 1 val x = random * 2 - 1 val y = random * 2 – 1 if (x*x + y*y < 1) { counter1 += 1 } else { counter0 += 1 } if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _)
-
-
广播变量
- 广播机制 : 高效分发大对象,比如字典(map),集合(set)等,每个executor一份, 而不是每个task一份;
引用自Spark doc里的介绍:
Spark 的 action(动作)操作是通过一系列的 stage(阶段)进行执行的,这些 stage(阶段)是通过分布式的 “shuffle” 操作进行拆分的。Spark 会自动广播出每个 stage(阶段)内任务所需要的公共数据。这种情况下广播的数据使用序列化的形式进行缓存,并在每个任务运行前进行反序列化。这也就意味着,只有在跨越多个 stage(阶段)的多个任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的情况下,使用广播变量会有比较好的效果。
-
包括HttpBroadcast和TorrentBroadcast两种
-
HttpBroadcast与TorrentBroadcast
- 广播
- 广播
-
总结:如果一个变量非常大,每一个task计算逻辑都要用到这个变量,则应该将其广播出去,更高效
- 广播机制 : 高效分发大对象,比如字典(map),集合(set)等,每个executor一份, 而不是每个task一份;
-
5.Cache基本概念与使用
-
允许将RDD缓存到内存中或磁盘上,以便于重用,如果想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法.
Spark 中一个很重要的能力是将数据 persisting 持久化(或称为 caching 缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD 在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。
-
Spark提供了多种缓存级别,以便于用户根据实际需求进行调整
-
如何选择存储级别?
-
trade-off: Spark 的存储级别的选择,核心问题是在 memory 内存使用率和 CPU 效率之间进行权衡。
-
6.基于Docker的Spark开发测试环境搭建:
原项目可以直接在本机跑,默认是Spark的单机模式;
- (1)首先下载镜像并启动:
> docker pull sequenceiq/spark:1.5.1
> sudo docker run -it sequenceiq/spark:1.5.1 bash
遇到问题:在Docker中启动master的时候,ip是Docker 的地址,我的宿主机访问不到,原因是启动Docker 的时候没有端口映射,所以重新run一遍镜像:
docker run -p 127.0.0.1:8081:8080 -it sequenceiq/spark:1.5.1 bash
将宿主机的8081端口映射到Docker的8080端口
- (2)
cd /usr/local/spark
cp conf/spark-env.sh.template conf/spark-env.sh
vi conf/spark-env.sh
加入两行代码:
- (3) 启动Master和Slave
./sbin/start-master.sh
./sbin/start-slave.sh 172.17.0.109:7077
然后宿主机浏览器访问http://localhost:8081
就可以访问到Spark UI界面惹!
7.几个实际例子:
-
- 分布式估算Pi
/**
* 并行估算pi
* Area1 = x * x , Area2 = Pi * (x / 2) * (x / 2)
* Area1 / Area2 = 4 / pi
* 4 / pi = x / y => pi = 4 * y / x
*/
object SparkPi {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Spark Pi").setMaster("local[1]");
val sc = new SparkContext(conf);
val slices = if (args.length > 0) args(0).toInt else 2;
val areaSqure = 100000 * slices;
//并行估算areaCircle的值:也就是撒areaSqure这么多个点,求落在圆内的多少个点,就近似等于圆的面积
val areaCircle = sc.parallelize(1 to areaSqure, slices).map{i =>
val x = new Random().nextInt() * 2 - 1
val y = new Random().nextInt() * 2 - 1
if (x * x + y * y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * areaCircle / areaSqure)
}
}
-
-
log query
任务:如何统计每个用户在每台机器(ip)上查询(query)的次数和返回结果累积大小(byte)?
-
/**
* 日志查询任务:统计每个用户在每台机器(ip)上查询(query)的次数和返回结果累积大小(byte)
* 分析:key: 每个用户在每台机器上的query ,value:次数和结果累积大小(byte)
*/
object LogQuery {
val apacheLogRegex =
"""^([d.]+) (S+) (S+) [([wd:/]+s[+-‐]d{4})] "(.+?)" (d{3}) ([d-‐]+) "([^"]+)" "([^"]+)".*""".r
def extractKey(line : String): (String, String, String) = {
apacheLogRegex.findFirstIn(line) match {
case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
if (user != ""-‐"") (ip, user, query)
else (null, null, null)
case _ => (null, null, null)
}
}
def extractStats(line: String): Stats = {
apacheLogRegex.findFirstIn(line) match {
case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
new Stats(1, bytes.toInt)
case _ => new Stats(1, 0)
}
}
class Stats(val count: Int, val numBytes: Int) extends Serializable {
def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
override def toString = "bytes=%s n=%s".format(numBytes, count)
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("LogQuery").setMaster("local[1]")
val sc = new SparkContext(conf)
val dataset = sc.textFile(args(0))
dataset.map(line => (extractKey(line), extractStats(line)))
.reduceByKey((a, b) => a.merge(b))
.collect().foreach {
case (user, query) => println("%s %s".format(user, query))
}
}
}
-
- 逻辑回归
找出一条最优的线,将所有点分成两部分
- 逻辑回归