zoukankan      html  css  js  c++  java
  • AAS代码第2章

    [root@node1 aas]# pwd
    /root/aas
    [root@node1 aas]# wget http://archive.apache.org/dist/spark/spark-1.2.1/spark-1.2.1-bin-hadoop2.4.tgz 
    [root@node1 aas]# tar zxvf spark-1.2.1-bin-hadoop2.4.tgz 

    修改将conf目录下的spark-env.sh.template复制为/conf/spark-env.sh并增加如下内容:

    HADOOP_CONF_DIR=/etc/hadoop/conf/
    SPARK_EXECUTOR_INSTANCES=3
    SPARK_EXECUTOR_CORES=4
    SPARK_EXECUTOR_MEMORY=4G
    SPARK_DRIVER_MEMORY=4G
    SPARK_YARN_APP_NAME=AAS

    修改conf目录下的log4j.properties.template为log4j.properties,并修改日志输出级别为WARN

    log4j.rootCategory=WARN, console

    启动spark-shell

    [root@node1 spark-1.2.1-bin-hadoop2.4]# ./bin/spark-shell --master yarn-client
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 1.2.1
          /_/
    
    Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_09-icedtea)
    Type in expressions to have them evaluated.
    Type :help for more information.
    15/12/03 14:54:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Spark context available as sc.
    
    scala> :help
    All commands can be abbreviated, e.g. :he instead of :help.
    Those marked with a * have more detailed help, e.g. :help imports.
    
    :cp <path>                 add a jar or directory to the classpath
    :help [command]            print this summary or command-specific help
    :history [num]             show the history (optional num is commands to show)
    :h? <string>               search the history
    :imports [name name ...]   show import history, identifying sources of names
    :implicits [-v]            show the implicits in scope
    :javap <path|class>        disassemble a file or class name
    :load <path>               load and interpret a Scala file
    :paste                     enter paste mode: all input up to ctrl-D compiled together
    :quit                      exit the repl
    :replay                    reset execution and replay all previous commands
    :reset                     reset the repl to its initial state, forgetting all session entries
    :sh <command line>         run a shell command (result is implicitly => List[String])
    :silent                    disable/enable automatic printing of results
    :fallback                  
    disable/enable advanced repl changes, these fix some issues but may introduce others. 
    This mode will be removed once these fixes stablize
    :type [-v] <expr>          display the type of an expression without evaluating it
    :warnings                  show the suppressed warnings from the most recent line which had any
    
    scala> 

    P10中的样例数据在国内无法下载。需要有代理才行。请到百度网盘上下载:http://pan.baidu.com/s/1pJvjHA7

    [root@node1 linkage]# unzip donation.zip 
    Archive:  donation.zip
     extracting: block_10.zip            
     extracting: block_1.zip             
     extracting: block_2.zip             
     extracting: block_3.zip             
     extracting: block_4.zip             
     extracting: block_5.zip             
     extracting: block_6.zip             
     extracting: block_7.zip             
     extracting: block_8.zip             
     extracting: block_9.zip             
      inflating: documentation           
      inflating: frequencies.csv         
    [root@node1 linkage]# ll
    total 110280
    -rw-r--r-- 1 root root  5643837 Mar  9  2011 block_10.zip
    -rw-r--r-- 1 root root  5643935 Mar  9  2011 block_1.zip
    -rw-r--r-- 1 root root  5642577 Mar  9  2011 block_2.zip
    -rw-r--r-- 1 root root  5644247 Mar  9  2011 block_3.zip
    -rw-r--r-- 1 root root  5644264 Mar  9  2011 block_4.zip
    -rw-r--r-- 1 root root  5645826 Mar  9  2011 block_5.zip
    -rw-r--r-- 1 root root  5645291 Mar  9  2011 block_6.zip
    -rw-r--r-- 1 root root  5645235 Mar  9  2011 block_7.zip
    -rw-r--r-- 1 root root  5646395 Mar  9  2011 block_8.zip
    -rw-r--r-- 1 root root  5643134 Mar  9  2011 block_9.zip
    -rwxrw-rw- 1 root root     4516 Mar 10  2011 documentation
    -rw-r--r-- 1 root root 56448373 Dec  3 14:22 donation.zip
    -rw-r--r-- 1 root root      272 Mar  9  2011 frequencies.csv

    解压:

    [root@node1 linkage]# unzip block_1.zip
    Archive:  block_1.zip
      inflating: block_1.csv             
    [root@node1 linkage]# unzip block_2.zip
    Archive:  block_2.zip
      inflating: block_2.csv             
    [root@node1 linkage]# unzip block_3.zip
    Archive:  block_3.zip
      inflating: block_3.csv             
    [root@node1 linkage]# unzip block_4.zip
    Archive:  block_4.zip
      inflating: block_4.csv             
    [root@node1 linkage]# unzip block_5.zip
    Archive:  block_5.zip
      inflating: block_5.csv             
    [root@node1 linkage]# unzip block_6.zip
    Archive:  block_6.zip
      inflating: block_6.csv             
    [root@node1 linkage]# unzip block_7.zip
    Archive:  block_7.zip
      inflating: block_7.csv             
    [root@node1 linkage]# unzip block_8.zip
    Archive:  block_8.zip
      inflating: block_8.csv             
    [root@node1 linkage]# unzip block_9.zip
    Archive:  block_9.zip
      inflating: block_9.csv  
    [root@node1 linkage]# unzip block_10.zip
    Archive:  block_10.zip
      inflating: block_10.csv              
    [root@node1 linkage]# ll
    total 366672
    -rw-r--r-- 1 root root 26255957 Mar  9  2011 block_10.csv
    -rw-r--r-- 1 root root  5643837 Mar  9  2011 block_10.zip
    -rw-r--r-- 1 root root 26248574 Mar  9  2011 block_1.csv
    -rw-r--r-- 1 root root  5643935 Mar  9  2011 block_1.zip
    -rw-r--r-- 1 root root 26241784 Mar  9  2011 block_2.csv
    -rw-r--r-- 1 root root  5642577 Mar  9  2011 block_2.zip
    -rw-r--r-- 1 root root 26253247 Mar  9  2011 block_3.csv
    -rw-r--r-- 1 root root  5644247 Mar  9  2011 block_3.zip
    -rw-r--r-- 1 root root 26247471 Mar  9  2011 block_4.csv
    -rw-r--r-- 1 root root  5644264 Mar  9  2011 block_4.zip
    -rw-r--r-- 1 root root 26249424 Mar  9  2011 block_5.csv
    -rw-r--r-- 1 root root  5645826 Mar  9  2011 block_5.zip
    -rw-r--r-- 1 root root 26256126 Mar  9  2011 block_6.csv
    -rw-r--r-- 1 root root  5645291 Mar  9  2011 block_6.zip
    -rw-r--r-- 1 root root 26261911 Mar  9  2011 block_7.csv
    -rw-r--r-- 1 root root  5645235 Mar  9  2011 block_7.zip
    -rw-r--r-- 1 root root 26253911 Mar  9  2011 block_8.csv
    -rw-r--r-- 1 root root  5646395 Mar  9  2011 block_8.zip
    -rw-r--r-- 1 root root 26254012 Mar  9  2011 block_9.csv
    -rw-r--r-- 1 root root  5643134 Mar  9  2011 block_9.zip
    -rwxrw-rw- 1 root root     4516 Mar 10  2011 documentation
    -rw-r--r-- 1 root root 56448373 Dec  3 14:22 donation.zip
    -rw-r--r-- 1 root root      272 Mar  9  2011 frequencies.csv

    将数据放到HDFS上

    [root@node1 linkage]# hdfs dfs -mkdir linkage
    [root@node1 linkage]# hdfs dfs -put block_*.csv linkage

    运行实例代码:

    scala> val rawblocks = sc.textFile("/root/aas/ch02/linkage/frequencies.csv")
    rawblocks: org.apache.spark.rdd.RDD[String] = /root/aas/ch02/linkage/frequencies.csv MappedRDD[1] at textFile at <console>:12

    scala> rawblocks.first
    res3: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"

    scala> val head = rawblocks.take(10)
    head: Array[String] = Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match", 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE, 39086,47614,1,?,1,?,1,1,1,1,1,TRUE, 70031,70237,1,?,1,?,1,1,1,1,1,TRUE, 84795,97439,1,?,1,?,1,1,1,1,1,TRUE, 36950,42116,1,?,1,1,1,1,1,1,1,TRUE, 42413,48491,1,?,1,?,1,1,1,1,1,TRUE, 25965,64753,1,?,1,?,1,1,1,1,1,TRUE, 49451,90407,1,?,1,?,1,1,1,1,0,TRUE, 39932,40902,1,?,1,?,1,1,1,1,1,TRUE)

    scala> head.length
    res4: Int = 10

    scala> head.foreach(println)
    "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
    37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
    39086,47614,1,?,1,?,1,1,1,1,1,TRUE
    70031,70237,1,?,1,?,1,1,1,1,1,TRUE
    84795,97439,1,?,1,?,1,1,1,1,1,TRUE
    36950,42116,1,?,1,1,1,1,1,1,1,TRUE
    42413,48491,1,?,1,?,1,1,1,1,1,TRUE
    25965,64753,1,?,1,?,1,1,1,1,1,TRUE
    49451,90407,1,?,1,?,1,1,1,1,0,TRUE
    39932,40902,1,?,1,?,1,1,1,1,1,TRUE

    scala> head.foreach(println)
    "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
    37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
    39086,47614,1,?,1,?,1,1,1,1,1,TRUE
    70031,70237,1,?,1,?,1,1,1,1,1,TRUE
    84795,97439,1,?,1,?,1,1,1,1,1,TRUE
    36950,42116,1,?,1,1,1,1,1,1,1,TRUE
    42413,48491,1,?,1,?,1,1,1,1,1,TRUE
    25965,64753,1,?,1,?,1,1,1,1,1,TRUE
    49451,90407,1,?,1,?,1,1,1,1,0,TRUE
    39932,40902,1,?,1,?,1,1,1,1,1,TRUE

    scala> def isHeader(line: String) = line.contains("id_1")
    isHeader: (line: String)Boolean

    scala> head.filter(isHeader).foreach(println)
    "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"

    scala> head.filterNot(isHeader).foreach(println)
    37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
    39086,47614,1,?,1,?,1,1,1,1,1,TRUE
    70031,70237,1,?,1,?,1,1,1,1,1,TRUE
    84795,97439,1,?,1,?,1,1,1,1,1,TRUE
    36950,42116,1,?,1,1,1,1,1,1,1,TRUE
    42413,48491,1,?,1,?,1,1,1,1,1,TRUE
    25965,64753,1,?,1,?,1,1,1,1,1,TRUE
    49451,90407,1,?,1,?,1,1,1,1,0,TRUE
    39932,40902,1,?,1,?,1,1,1,1,1,TRUE

    scala> head.filter(x => !isHeader(x)).length
    res8: Int = 9

    scala> val noheader = rawblocks.filter(x => !isHeader(x))
    noheader: org.apache.spark.rdd.RDD[String] = FilteredRDD[4] at filter at <console>:16

    scala> noheader.first
    res9: String = 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE

     

    scala> def toDouble(s: String) = {
    | if ("?" .equals(s)) Double.NaN else s.toDouble
    | }
    toDouble: (s: String)Double

    
    

    scala>

    
    

    scala> def parse(line: String) = {
    | val pieces = line.split(',' )
    | val id1 = pieces(0).toInt
    | val id2 = pieces(1).toInt
    | val scores = pieces.slice(2, 11).map(toDouble)
    | val matched = pieces(11).toBoolean
    | (id1, id2, scores, matched)
    | }
    parse: (line: String)(Int, Int, Array[Double], Boolean)

    scala> val line =head(5)
    line: String = 36950,42116,1,?,1,1,1,1,1,1,1,TRUE

    scala> val tup = parse(line)
    tup: (Int, Int, Array[Double], Boolean) = (36950,42116,Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0),true)

    scala> def parse(line: String) = {
    | val pieces = line.split(',' )
    | val id1 = pieces(0).toInt
    | val id2 = pieces(1).toInt
    | val scores = pieces.slice(2, 11).map(toDouble)
    | val matched = pieces(11).toBoolean
    | MatchData(id1, id2, scores, matched)
    | }
    parse: (line: String)MatchData

    scala> val md = parse(line)
    md: MatchData = MatchData(36950,42116,[D@3c935226,true)

    scala> val mds = head.filter(x => !isHeader(x)).map(x => parse(x))
    mds: Array[MatchData] = Array(MatchData(37291,53113,[D@3bc5ac5,true), MatchData(39086,47614,[D@42eb3d6d,true), MatchData(70031,70237,[D@620de16d,true), MatchData(84795,97439,[D@7d4aed65,true), MatchData(36950,42116,[D@4227c226,true), MatchData(42413,48491,[D@403b6eb8,true), MatchData(25965,64753,[D@7de212f9,true), MatchData(49451,90407,[D@54bda00,true), MatchData(39932,40902,[D@36d538b7,true))

    scala> val parsed = noheader.map(line => parse(line))
    parsed: org.apache.spark.rdd.RDD[MatchData] = MappedRDD[5] at map at <console>:28

    scala> parsed. cache()
    res10: parsed.type = MappedRDD[5] at map at <console>:28

    
    

     两次调用parsed.count()

    scala> parsed.count()
    res11: Long = 5749132                                                                                                                                                                                    
    
    scala> parsed.count()
    res12: Long = 5749132

    发现第二次的速度明显快乐许多。原因在于第一次调用后parsed这个数据已经到内存里了。

    观察Storage页面http://node1:8088/proxy/application_1448538943757_0008/storage/,发现占用缓存683.4 MB。如果将StorageLevel改成MEMOERY_SER,观察一下内存大小为683.4 MB

    先调用unpersist(),这时观察到Storage页面上已经没有了缓存。

    然后调用parsed.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)

    scala> parsed.unpersist()
    res13: parsed.type = MappedRDD[5] at map at <console>:28
    
    scala> import org.apache.spark.storage.StorageLevel
    import org.apache.spark.storage.StorageLevel
    
    scala> parsed.persist(StorageLevel.MEMORY_ONLY_SER)
    res18: parsed.type = MappedRDD[5] at map at <console>:28
    
    scala> parsed.count()
    res19: Long = 5749132                                                                                                                                                                                    
    
    scala> parsed.count()

    观察到采用MEMORY_ONLY_SER之后内存占用大小为580.1 MB。比采用MEMORY时的683.4 MB少了100MB

    2.8节聚合

    scala> val grouped = mds.groupBy(md => md.matched)
    grouped: scala.collection.immutable.Map[Boolean,Array[MatchData]] = Map(true -> Array(MatchData(37291,53113,[D@3bc5ac5,true), MatchData(39086,47614,[D@42eb3d6d,true), MatchData(70031,70237,[D@620de16d,true), MatchData(84795,97439,[D@7d4aed65,true), MatchData(36950,42116,[D@4227c226,true), MatchData(42413,48491,[D@403b6eb8,true), MatchData(25965,64753,[D@7de212f9,true), MatchData(49451,90407,[D@54bda00,true), MatchData(39932,40902,[D@36d538b7,true)))
    
    scala> grouped.mapValues(x => x.size).foreach(println)
    (true,9)

    2.9创建直方图

    scala> val grouped = mds.groupBy(md => md.matched)
    grouped: scala.collection.immutable.Map[Boolean,Array[MatchData]] = Map(true -> Array(MatchData(37291,53113,[D@3bc5ac5,true), MatchData(39086,47614,[D@42eb3d6d,true), MatchData(70031,70237,[D@620de16d,true), MatchData(84795,97439,[D@7d4aed65,true), MatchData(36950,42116,[D@4227c226,true), MatchData(42413,48491,[D@403b6eb8,true), MatchData(25965,64753,[D@7de212f9,true), MatchData(49451,90407,[D@54bda00,true), MatchData(39932,40902,[D@36d538b7,true)))
    
    scala> grouped.mapValues(x => x.size).foreach(println)
    (true,9)
    
    scala> val matchCounts = parsed.map(md => md.matched).countByValue()
    matchCounts: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 5728201)
    
    scala> val matchCountsSeq = matchCounts.toSeq
    matchCountsSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201))
    scala> matchCountsSeq.sortBy(_._1).foreach(println)
    (false,5728201)
    (true,20931)

    scala> matchCountsSeq.sortBy(_._2).reverse.foreach(println)
    (false,5728201)
    (true,20931)

    2.10连续变量的概要统计

    scala> parsed.map(md => md.scores(0)).stats()
    res27: org.apache.spark.util.StatCounter = (count: 5749132, mean: NaN, stdev: NaN, max: NaN, min: NaN)

    scala> import java.lang.Double.isNaN
    import java.lang.Double.isNaN

    scala> parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats()
    res28: org.apache.spark.util.StatCounter = (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000)

    2.11.为计算概要信息创建可重用代码

    编写一个新的文件,存放在/root/aas/ch02/StatsWithMissing.scala,代码如下:

    class NAStatCounter extends Serializable {
     val stats: StatCounter = new StatCounter()
     var missing: Long = 0
     def add(x: Double): NAStatCounter = {
       if (java. lang. Double. isNaN(x)) {
         missing += 1
       } else {
         stats. merge(x)
       }
       this
     }
     
     def merge(other: NAStatCounter): NAStatCounter = {
       stats. merge(other. stats)
       missing += other. missing
       this
     }
     
     
     override def toString = {
       "stats: " + stats. toString + " NaN: " + missing
     }
     
    }
    
    object NAStatCounter extends Serializable {
     def apply(x: Double) = new NAStatCounter(). add(x)
    }

    注意,这段代码中stats. merge(x)有点难理解:由于定义了apply方法,这里其实有一个NAStatCounter.apply(x)的隐式调用。

    scala> val nasRDD = parsed.map(md => {md.scores.map(d=> NAStatCounter(d))})
    nasRDD: org.apache.spark.rdd.RDD[Array[NAStatCounter]] = MappedRDD[18] at map at <console>:38
    
    scala> val nas1 = Array(1.0, Double. NaN). map(d => NAStatCounter(d))
    nas1: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 0, stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1)
    
    scala> val nas2 = Array(Double. NaN, 2.0). map(d => NAStatCounter(d))
    nas2: Array[NAStatCounter] = Array(stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 0)
    
    scala> val merged = nas1. zip(nas2). map(p => p. _1. merge(p. _2))
    merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1)
    
    scala> val reduced = nasRDD. reduce((n1, n2) => {
         |  n1. zip(n2). map { case (a, b) => a. merge(b) }
         | })
    reduced: Array[NAStatCounter] = Array(stats: (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) NaN: 1007, stats: (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) NaN: 5645434, stats: (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000) NaN: 5746668, stats: (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0....
    scala> reduced. foreach(println)

    2.12.变量的选择和评分简介

    scala> val statsm = statsWithMissing(parsed. filter(_. matched). map(_. scores))
    statsm: Array[NAStatCounter] = Array(stats: (count: 20922, mean: 0.997316, stdev: 0.036506, max: 1.000000, min: 0.000000) NaN: 9, stats: (count: 1333, mean: 0.989890, stdev: 0.082489, max: 1.000000, min: 0.000000) NaN: 19598, stats: (count: 20931, mean: 0.997015, stdev: 0.043118, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 475, mean: 0.969370, stdev: 0.153291, max: 1.000000, min: 0.000000) NaN: 20456, stats: (count: 20931, mean: 0.987292, stdev: 0.112013, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 20925, mean: 0.997085, stdev: 0.053914, max: 1.000000, min: 0.000000) NaN: 6, stats: (count: 20925, mean: 0.997945, stdev: 0.045285, max: 1.000000, min: 0.000000) NaN: 6, stats: (count: 20925, mean: 0.996129, stdev: 0.062097, max: 1.000000, min: 0.000000) NaN: 6, stats: (cou...
    scala> val statsn = statsWithMissing(parsed. filter(! _. matched). map(_. scores))
    statsn: Array[NAStatCounter] = Array(stats: (count: 5727203, mean: 0.711863, stdev: 0.389081, max: 1.000000, min: 0.000000) NaN: 998, stats: (count: 102365, mean: 0.898847, stdev: 0.272720, max: 1.000000, min: 0.000000) NaN: 5625836, stats: (count: 5728201, mean: 0.313138, stdev: 0.332281, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 1989, mean: 0.162955, stdev: 0.192975, max: 1.000000, min: 0.000000) NaN: 5726212, stats: (count: 5728201, mean: 0.954883, stdev: 0.207560, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 5727412, mean: 0.221643, stdev: 0.415352, max: 1.000000, min: 0.000000) NaN: 789, stats: (count: 5727412, mean: 0.486995, stdev: 0.499831, max: 1.000000, min: 0.000000) NaN: 789, stats: (count: 5727412, mean: 0.219923, stdev: 0.414194, max: 1.000000, min: 0.00...
    scala> statsm.zip(statsn).map { case(m, n) =>
         |  (m.missing + n.missing, m.stats.mean - n.stats.mean)
         | }.foreach(println)
    (1007,0.285452905746686)
    (5645434,0.09104268062279908)
    (0,0.6838772482597568)
    (5746668,0.8064147192926266)
    (0,0.03240818525033473)
    (795,0.7754423117834042)
    (795,0.5109496938298719)
    (795,0.7762059675300523)
    (12843,0.9563812499852178)
    
    scala> def naz(d: Double) = if (Double.NaN.equals(d)) 0.0 else d
    naz: (d: Double)Double
    
    scala> case class Scored(md: MatchData, score: Double)
    defined class Scored
    
    scala> val ct = parsed.map(md => {
         |  val score = Array(2, 5, 6, 7, 8).map(i => naz(md.scores(i))).sum
         |  Scored(md, score)
         | })
    ct: org.apache.spark.rdd.RDD[Scored] = MappedRDD[27] at map at <console>:40
    scala> ct.filter(s => s.score >= 4.0).map(s => s.md.matched).countByValue()
    res34: scala.collection.Map[Boolean,Long] = Map(true -> 20871, false -> 637)                                                                                                                             
    
    scala> ct.filter(s => s.score >= 2.0). map(s => s.md.matched).countByValue()
    res35: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 596414)                                                                                                                          
  • 相关阅读:
    算法导论习题
    org.apache.subversion.javahl.ClientException: Item is not readable 解决办法
    nginx安装部署
    tomcat+jdk安装部署
    sql基本知识
    javascript权威指南学习笔记
    JAVA设计模式之---工厂模式
    Effective Java 之 --- 用私有构造器或者枚举类型强化Singleton属性
    接口中定义变量必须为public static final的原因
    对List中每个对象元素按时间顺序排序
  • 原文地址:https://www.cnblogs.com/littlesuccess/p/5016579.html
Copyright © 2011-2022 走看看