zoukankan      html  css  js  c++  java
  • Spark高级数据分析-第2章 用Scala和Spark进行数据分析

    2.4 小试牛刀:Spark shell和SparkContext

    本章使用的资料来自加州大学欧文分校机器学习资料库(UC Irvine Machine Learning Repository),这个资料库为研究和教学提供了大量非常好的数据源,

    这些数据源非常有意义,并且是免费的。由于网络原因,无法从原始地址下载数据集,这里可以从以下链接获取:

    https://pan.baidu.com/s/1dENp41V 

    http://pan.baidu.com/s/1c29fBVy

    获取数据集以后,可以使用FileZilla等FTP工具上传到Hadoop集群(作者实验环境是VMware下的Hadoop集群),然后解压缩:

    $ unzip donation.zip 
    $ unzip 'block_*.zip'

    Hadoop 集群的 HDFS 上为块数据创建一个目录,然后将数据集文件
    复制到 HDFS 上:
    $ hadoop fs -mkdir linkage
    $ hadoop fs -put block_*.csv linkage (说明:当前目录为存放block_*.csv文件的目录)

    作者的 Hadoop 集群(Hadoop 2.7.2 + Spark 2.1.0 + Scala 2.12.1)支持 YARN,通过为 Spark master 设定
    yarn-client 参数值,就可以在集群上启动 Spark 作业:
    $ spark-shell --master yarn --deploy-mode client

    创建RDD:

    scala> var varblocks = sc.textFile("hdfs:///linkage")
    varblocks: org.apache.spark.rdd.RDD[String] = hdfs:///linkage MapPartitionsRDD[1] at textFile at <console>:24

    2.5 把数据从集群上获取到客户端

    使用 RDD 的 first 方法,该方法向客户端返回 RDD 的第一个元素:

    scala> rawblocks.first
    res15: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match

    如果知道 RDD 只包含少量记录,可以用 collect 方法向客户返回一个包含所有 RDD 内容的数组的数组。由于不知道当前数据集有多大,所以就不尝试了。
    还可以用 take 方法,这个方法在 first collect 之间做了一些折衷,可以向客户端返回
    一个包含指定数量记录的数组。使用 take 方法获取记录关联数据集的前 10 行记录:

    scala> val head = rawblocks.take(10)
    head: Array[String] = Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match", 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE, 39086,47614,1,?,1,?,1,1,1,1,1,TRUE, 70031,70237,1,?,1,?,1,1,1,1,1,TRUE, 84795,97439,1,?,1,?,1,1,1,1,1,TRUE, 36950,42116,1,?,1,1,1,1,1,1,1,TRUE, 42413,48491,1,?,1,?,1,1,1,1,1,TRUE, 25965,64753,1,?,1,?,1,1,1,1,1,TRUE, 49451,90407,1,?,1,?,1,1,1,1,0,TRUE, 39932,40902,1,?,1,?,1,1,1,1,1,TRUE)

    为了更容易读懂数组的内容,我们可以用 foreach 方法并结合 println 来打印
    出数组中的每个值,并且每一行打印一个值:

    scala> head.foreach(println)
    "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
    37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
    39086,47614,1,?,1,?,1,1,1,1,1,TRUE
    70031,70237,1,?,1,?,1,1,1,1,1,TRUE
    84795,97439,1,?,1,?,1,1,1,1,1,TRUE
    36950,42116,1,?,1,1,1,1,1,1,1,TRUE
    42413,48491,1,?,1,?,1,1,1,1,1,TRUE
    25965,64753,1,?,1,?,1,1,1,1,1,TRUE
    49451,90407,1,?,1,?,1,1,1,1,0,TRUE
    39932,40902,1,?,1,?,1,1,1,1,1,TRUE

    CSV文件有一个标题行需要过滤掉, 以免影响后续分析。我们可以将标题行中出现的 "id_1"
    符串作为过滤条件, 编写一个简单的 Scala 函数来测试一行记录中是否包含该字符串,代码如下:

    def isHeader(line: String) = line.contains("id_1")
    isHeader: (line: String)Boolean

     

    我们其实想要的是所有非标题行。为了完成这个目标,Scala 可以提供 2 种方法。第一种时利用 Array 类的 filterNot 方法:

    scala> head.filterNot(isHeader).length
    res17: Int = 9


    还可以利用 Scala 对匿名函数的支持,在 filter 函数里面对 isHeader 函数取非:

    scala> head.filter(x => !isHeader(x)).length
    res18: Int = 9

    2.6 把代码从客户端发送到集群

    用于过滤集群上整个数据集的语法和过滤本地机器上的 head 数组的语法一模一样。

    scala> val noheader = rawblocks.filter(x => !isHeader(x))
    noheader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[45] at filter at <console>:29

  • 相关阅读:
    java编程基础--方法
    MySQL中使用LIMIT进行分页的方法
    Java编程基础--数据类型
    Java开发入门
    SpringBoot实战项目(十七)--使用拦截器实现系统日志功能
    SpringBoot实战项目(十六)--拦截器配置及登录拦截
    SpringBoot实战项目(十五)--修改密码及登录退出功能实现
    SpringBoot实战项目(十四)--登录功能之登录表单验证
    PHP setcookie 网络函数
    PHP mysqli_kill MySQLi 函数
  • 原文地址:https://www.cnblogs.com/followyourdream/p/6822603.html
Copyright © 2011-2022 走看看