[root@node1 aas]# pwd /root/aas [root@node1 aas]# wget http://archive.apache.org/dist/spark/spark-1.2.1/spark-1.2.1-bin-hadoop2.4.tgz [root@node1 aas]# tar zxvf spark-1.2.1-bin-hadoop2.4.tgz
修改将conf目录下的spark-env.sh.template复制为/conf/spark-env.sh并增加如下内容:
HADOOP_CONF_DIR=/etc/hadoop/conf/ SPARK_EXECUTOR_INSTANCES=3 SPARK_EXECUTOR_CORES=4 SPARK_EXECUTOR_MEMORY=4G SPARK_DRIVER_MEMORY=4G SPARK_YARN_APP_NAME=AAS
修改conf目录下的log4j.properties.template为log4j.properties,并修改日志输出级别为WARN
log4j.rootCategory=WARN, console
启动spark-shell
[root@node1 spark-1.2.1-bin-hadoop2.4]# ./bin/spark-shell --master yarn-client Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 1.2.1 /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_09-icedtea) Type in expressions to have them evaluated. Type :help for more information. 15/12/03 14:54:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context available as sc. scala> :help All commands can be abbreviated, e.g. :he instead of :help. Those marked with a * have more detailed help, e.g. :help imports. :cp <path> add a jar or directory to the classpath :help [command] print this summary or command-specific help :history [num] show the history (optional num is commands to show) :h? <string> search the history :imports [name name ...] show import history, identifying sources of names :implicits [-v] show the implicits in scope :javap <path|class> disassemble a file or class name :load <path> load and interpret a Scala file :paste enter paste mode: all input up to ctrl-D compiled together :quit exit the repl :replay reset execution and replay all previous commands :reset reset the repl to its initial state, forgetting all session entries :sh <command line> run a shell command (result is implicitly => List[String]) :silent disable/enable automatic printing of results :fallback disable/enable advanced repl changes, these fix some issues but may introduce others. This mode will be removed once these fixes stablize :type [-v] <expr> display the type of an expression without evaluating it :warnings show the suppressed warnings from the most recent line which had any scala>
P10中的样例数据在国内无法下载。需要有代理才行。请到百度网盘上下载:http://pan.baidu.com/s/1pJvjHA7
[root@node1 linkage]# unzip donation.zip Archive: donation.zip extracting: block_10.zip extracting: block_1.zip extracting: block_2.zip extracting: block_3.zip extracting: block_4.zip extracting: block_5.zip extracting: block_6.zip extracting: block_7.zip extracting: block_8.zip extracting: block_9.zip inflating: documentation inflating: frequencies.csv [root@node1 linkage]# ll total 110280 -rw-r--r-- 1 root root 5643837 Mar 9 2011 block_10.zip -rw-r--r-- 1 root root 5643935 Mar 9 2011 block_1.zip -rw-r--r-- 1 root root 5642577 Mar 9 2011 block_2.zip -rw-r--r-- 1 root root 5644247 Mar 9 2011 block_3.zip -rw-r--r-- 1 root root 5644264 Mar 9 2011 block_4.zip -rw-r--r-- 1 root root 5645826 Mar 9 2011 block_5.zip -rw-r--r-- 1 root root 5645291 Mar 9 2011 block_6.zip -rw-r--r-- 1 root root 5645235 Mar 9 2011 block_7.zip -rw-r--r-- 1 root root 5646395 Mar 9 2011 block_8.zip -rw-r--r-- 1 root root 5643134 Mar 9 2011 block_9.zip -rwxrw-rw- 1 root root 4516 Mar 10 2011 documentation -rw-r--r-- 1 root root 56448373 Dec 3 14:22 donation.zip -rw-r--r-- 1 root root 272 Mar 9 2011 frequencies.csv
解压:
[root@node1 linkage]# unzip block_1.zip Archive: block_1.zip inflating: block_1.csv [root@node1 linkage]# unzip block_2.zip Archive: block_2.zip inflating: block_2.csv [root@node1 linkage]# unzip block_3.zip Archive: block_3.zip inflating: block_3.csv [root@node1 linkage]# unzip block_4.zip Archive: block_4.zip inflating: block_4.csv [root@node1 linkage]# unzip block_5.zip Archive: block_5.zip inflating: block_5.csv [root@node1 linkage]# unzip block_6.zip Archive: block_6.zip inflating: block_6.csv [root@node1 linkage]# unzip block_7.zip Archive: block_7.zip inflating: block_7.csv [root@node1 linkage]# unzip block_8.zip Archive: block_8.zip inflating: block_8.csv [root@node1 linkage]# unzip block_9.zip Archive: block_9.zip inflating: block_9.csv [root@node1 linkage]# unzip block_10.zip Archive: block_10.zip inflating: block_10.csv [root@node1 linkage]# ll total 366672 -rw-r--r-- 1 root root 26255957 Mar 9 2011 block_10.csv -rw-r--r-- 1 root root 5643837 Mar 9 2011 block_10.zip -rw-r--r-- 1 root root 26248574 Mar 9 2011 block_1.csv -rw-r--r-- 1 root root 5643935 Mar 9 2011 block_1.zip -rw-r--r-- 1 root root 26241784 Mar 9 2011 block_2.csv -rw-r--r-- 1 root root 5642577 Mar 9 2011 block_2.zip -rw-r--r-- 1 root root 26253247 Mar 9 2011 block_3.csv -rw-r--r-- 1 root root 5644247 Mar 9 2011 block_3.zip -rw-r--r-- 1 root root 26247471 Mar 9 2011 block_4.csv -rw-r--r-- 1 root root 5644264 Mar 9 2011 block_4.zip -rw-r--r-- 1 root root 26249424 Mar 9 2011 block_5.csv -rw-r--r-- 1 root root 5645826 Mar 9 2011 block_5.zip -rw-r--r-- 1 root root 26256126 Mar 9 2011 block_6.csv -rw-r--r-- 1 root root 5645291 Mar 9 2011 block_6.zip -rw-r--r-- 1 root root 26261911 Mar 9 2011 block_7.csv -rw-r--r-- 1 root root 5645235 Mar 9 2011 block_7.zip -rw-r--r-- 1 root root 26253911 Mar 9 2011 block_8.csv -rw-r--r-- 1 root root 5646395 Mar 9 2011 block_8.zip -rw-r--r-- 1 root root 26254012 Mar 9 2011 block_9.csv -rw-r--r-- 1 root root 5643134 Mar 9 2011 block_9.zip -rwxrw-rw- 1 root root 4516 Mar 10 2011 documentation -rw-r--r-- 1 root root 56448373 Dec 3 14:22 donation.zip -rw-r--r-- 1 root root 272 Mar 9 2011 frequencies.csv
将数据放到HDFS上
[root@node1 linkage]# hdfs dfs -mkdir linkage [root@node1 linkage]# hdfs dfs -put block_*.csv linkage
运行实例代码:
scala> val rawblocks = sc.textFile("/root/aas/ch02/linkage/frequencies.csv") rawblocks: org.apache.spark.rdd.RDD[String] = /root/aas/ch02/linkage/frequencies.csv MappedRDD[1] at textFile at <console>:12
scala> rawblocks.first
res3: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
scala> val head = rawblocks.take(10)
head: Array[String] = Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match", 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE, 39086,47614,1,?,1,?,1,1,1,1,1,TRUE, 70031,70237,1,?,1,?,1,1,1,1,1,TRUE, 84795,97439,1,?,1,?,1,1,1,1,1,TRUE, 36950,42116,1,?,1,1,1,1,1,1,1,TRUE, 42413,48491,1,?,1,?,1,1,1,1,1,TRUE, 25965,64753,1,?,1,?,1,1,1,1,1,TRUE, 49451,90407,1,?,1,?,1,1,1,1,0,TRUE, 39932,40902,1,?,1,?,1,1,1,1,1,TRUE)
scala> head.length
res4: Int = 10
scala> head.foreach(println)
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE
scala> head.foreach(println)
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE
scala> def isHeader(line: String) = line.contains("id_1")
isHeader: (line: String)Boolean
scala> head.filter(isHeader).foreach(println)
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
scala> head.filterNot(isHeader).foreach(println)
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE
scala> head.filter(x => !isHeader(x)).length
res8: Int = 9
scala> val noheader = rawblocks.filter(x => !isHeader(x))
noheader: org.apache.spark.rdd.RDD[String] = FilteredRDD[4] at filter at <console>:16
scala> noheader.first
res9: String = 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
scala> def toDouble(s: String) = {
| if ("?" .equals(s)) Double.NaN else s.toDouble
| }
toDouble: (s: String)Double
scala>
scala> def parse(line: String) = {
| val pieces = line.split(',' )
| val id1 = pieces(0).toInt
| val id2 = pieces(1).toInt
| val scores = pieces.slice(2, 11).map(toDouble)
| val matched = pieces(11).toBoolean
| (id1, id2, scores, matched)
| }
parse: (line: String)(Int, Int, Array[Double], Boolean)
scala> val line =head(5)
line: String = 36950,42116,1,?,1,1,1,1,1,1,1,TRUE
scala> val tup = parse(line)
tup: (Int, Int, Array[Double], Boolean) = (36950,42116,Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0),true)
scala> def parse(line: String) = {
| val pieces = line.split(',' )
| val id1 = pieces(0).toInt
| val id2 = pieces(1).toInt
| val scores = pieces.slice(2, 11).map(toDouble)
| val matched = pieces(11).toBoolean
| MatchData(id1, id2, scores, matched)
| }
parse: (line: String)MatchData
scala> val md = parse(line)
md: MatchData = MatchData(36950,42116,[D@3c935226,true)
scala> val mds = head.filter(x => !isHeader(x)).map(x => parse(x))
mds: Array[MatchData] = Array(MatchData(37291,53113,[D@3bc5ac5,true), MatchData(39086,47614,[D@42eb3d6d,true), MatchData(70031,70237,[D@620de16d,true), MatchData(84795,97439,[D@7d4aed65,true), MatchData(36950,42116,[D@4227c226,true), MatchData(42413,48491,[D@403b6eb8,true), MatchData(25965,64753,[D@7de212f9,true), MatchData(49451,90407,[D@54bda00,true), MatchData(39932,40902,[D@36d538b7,true))
scala> val parsed = noheader.map(line => parse(line))
parsed: org.apache.spark.rdd.RDD[MatchData] = MappedRDD[5] at map at <console>:28
scala> parsed. cache()
res10: parsed.type = MappedRDD[5] at map at <console>:28
两次调用parsed.count()
scala> parsed.count() res11: Long = 5749132 scala> parsed.count() res12: Long = 5749132
发现第二次的速度明显快乐许多。原因在于第一次调用后parsed这个数据已经到内存里了。
观察Storage页面http://node1:8088/proxy/application_1448538943757_0008/storage/,发现占用缓存683.4 MB。如果将StorageLevel改成MEMOERY_SER,观察一下内存大小为683.4 MB
先调用unpersist(),这时观察到Storage页面上已经没有了缓存。
然后调用parsed.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
scala> parsed.unpersist() res13: parsed.type = MappedRDD[5] at map at <console>:28 scala> import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel scala> parsed.persist(StorageLevel.MEMORY_ONLY_SER) res18: parsed.type = MappedRDD[5] at map at <console>:28 scala> parsed.count() res19: Long = 5749132 scala> parsed.count()
观察到采用MEMORY_ONLY_SER之后内存占用大小为580.1 MB。比采用MEMORY时的683.4 MB少了100MB
2.8节聚合
scala> val grouped = mds.groupBy(md => md.matched) grouped: scala.collection.immutable.Map[Boolean,Array[MatchData]] = Map(true -> Array(MatchData(37291,53113,[D@3bc5ac5,true), MatchData(39086,47614,[D@42eb3d6d,true), MatchData(70031,70237,[D@620de16d,true), MatchData(84795,97439,[D@7d4aed65,true), MatchData(36950,42116,[D@4227c226,true), MatchData(42413,48491,[D@403b6eb8,true), MatchData(25965,64753,[D@7de212f9,true), MatchData(49451,90407,[D@54bda00,true), MatchData(39932,40902,[D@36d538b7,true))) scala> grouped.mapValues(x => x.size).foreach(println) (true,9)
2.9创建直方图
scala> val grouped = mds.groupBy(md => md.matched) grouped: scala.collection.immutable.Map[Boolean,Array[MatchData]] = Map(true -> Array(MatchData(37291,53113,[D@3bc5ac5,true), MatchData(39086,47614,[D@42eb3d6d,true), MatchData(70031,70237,[D@620de16d,true), MatchData(84795,97439,[D@7d4aed65,true), MatchData(36950,42116,[D@4227c226,true), MatchData(42413,48491,[D@403b6eb8,true), MatchData(25965,64753,[D@7de212f9,true), MatchData(49451,90407,[D@54bda00,true), MatchData(39932,40902,[D@36d538b7,true))) scala> grouped.mapValues(x => x.size).foreach(println) (true,9) scala> val matchCounts = parsed.map(md => md.matched).countByValue() matchCounts: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 5728201) scala> val matchCountsSeq = matchCounts.toSeq matchCountsSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201)) scala> matchCountsSeq.sortBy(_._1).foreach(println) (false,5728201) (true,20931)
scala> matchCountsSeq.sortBy(_._2).reverse.foreach(println)
(false,5728201)
(true,20931)
2.10连续变量的概要统计
scala> parsed.map(md => md.scores(0)).stats() res27: org.apache.spark.util.StatCounter = (count: 5749132, mean: NaN, stdev: NaN, max: NaN, min: NaN)
scala> import java.lang.Double.isNaN
import java.lang.Double.isNaN
scala> parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats()
res28: org.apache.spark.util.StatCounter = (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000)
2.11.为计算概要信息创建可重用代码
编写一个新的文件,存放在/root/aas/ch02/StatsWithMissing.scala,代码如下:
class NAStatCounter extends Serializable { val stats: StatCounter = new StatCounter() var missing: Long = 0 def add(x: Double): NAStatCounter = { if (java. lang. Double. isNaN(x)) { missing += 1 } else { stats. merge(x) } this } def merge(other: NAStatCounter): NAStatCounter = { stats. merge(other. stats) missing += other. missing this } override def toString = { "stats: " + stats. toString + " NaN: " + missing } } object NAStatCounter extends Serializable { def apply(x: Double) = new NAStatCounter(). add(x) }
注意,这段代码中stats. merge(x)有点难理解:由于定义了apply方法,这里其实有一个NAStatCounter.apply(x)的隐式调用。
scala> val nasRDD = parsed.map(md => {md.scores.map(d=> NAStatCounter(d))}) nasRDD: org.apache.spark.rdd.RDD[Array[NAStatCounter]] = MappedRDD[18] at map at <console>:38 scala> val nas1 = Array(1.0, Double. NaN). map(d => NAStatCounter(d)) nas1: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 0, stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1) scala> val nas2 = Array(Double. NaN, 2.0). map(d => NAStatCounter(d)) nas2: Array[NAStatCounter] = Array(stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 0) scala> val merged = nas1. zip(nas2). map(p => p. _1. merge(p. _2)) merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1) scala> val reduced = nasRDD. reduce((n1, n2) => { | n1. zip(n2). map { case (a, b) => a. merge(b) } | }) reduced: Array[NAStatCounter] = Array(stats: (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) NaN: 1007, stats: (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) NaN: 5645434, stats: (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000) NaN: 5746668, stats: (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.... scala> reduced. foreach(println)
2.12.变量的选择和评分简介
scala> val statsm = statsWithMissing(parsed. filter(_. matched). map(_. scores)) statsm: Array[NAStatCounter] = Array(stats: (count: 20922, mean: 0.997316, stdev: 0.036506, max: 1.000000, min: 0.000000) NaN: 9, stats: (count: 1333, mean: 0.989890, stdev: 0.082489, max: 1.000000, min: 0.000000) NaN: 19598, stats: (count: 20931, mean: 0.997015, stdev: 0.043118, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 475, mean: 0.969370, stdev: 0.153291, max: 1.000000, min: 0.000000) NaN: 20456, stats: (count: 20931, mean: 0.987292, stdev: 0.112013, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 20925, mean: 0.997085, stdev: 0.053914, max: 1.000000, min: 0.000000) NaN: 6, stats: (count: 20925, mean: 0.997945, stdev: 0.045285, max: 1.000000, min: 0.000000) NaN: 6, stats: (count: 20925, mean: 0.996129, stdev: 0.062097, max: 1.000000, min: 0.000000) NaN: 6, stats: (cou... scala> val statsn = statsWithMissing(parsed. filter(! _. matched). map(_. scores)) statsn: Array[NAStatCounter] = Array(stats: (count: 5727203, mean: 0.711863, stdev: 0.389081, max: 1.000000, min: 0.000000) NaN: 998, stats: (count: 102365, mean: 0.898847, stdev: 0.272720, max: 1.000000, min: 0.000000) NaN: 5625836, stats: (count: 5728201, mean: 0.313138, stdev: 0.332281, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 1989, mean: 0.162955, stdev: 0.192975, max: 1.000000, min: 0.000000) NaN: 5726212, stats: (count: 5728201, mean: 0.954883, stdev: 0.207560, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 5727412, mean: 0.221643, stdev: 0.415352, max: 1.000000, min: 0.000000) NaN: 789, stats: (count: 5727412, mean: 0.486995, stdev: 0.499831, max: 1.000000, min: 0.000000) NaN: 789, stats: (count: 5727412, mean: 0.219923, stdev: 0.414194, max: 1.000000, min: 0.00... scala> statsm.zip(statsn).map { case(m, n) => | (m.missing + n.missing, m.stats.mean - n.stats.mean) | }.foreach(println) (1007,0.285452905746686) (5645434,0.09104268062279908) (0,0.6838772482597568) (5746668,0.8064147192926266) (0,0.03240818525033473) (795,0.7754423117834042) (795,0.5109496938298719) (795,0.7762059675300523) (12843,0.9563812499852178) scala> def naz(d: Double) = if (Double.NaN.equals(d)) 0.0 else d naz: (d: Double)Double scala> case class Scored(md: MatchData, score: Double) defined class Scored scala> val ct = parsed.map(md => { | val score = Array(2, 5, 6, 7, 8).map(i => naz(md.scores(i))).sum | Scored(md, score) | }) ct: org.apache.spark.rdd.RDD[Scored] = MappedRDD[27] at map at <console>:40 scala> ct.filter(s => s.score >= 4.0).map(s => s.md.matched).countByValue() res34: scala.collection.Map[Boolean,Long] = Map(true -> 20871, false -> 637) scala> ct.filter(s => s.score >= 2.0). map(s => s.md.matched).countByValue() res35: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 596414)