zoukankan      html  css  js  c++  java
  • Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)

    本博文的主要内容是:

    1、rdd基本操作实战

    2、transformation和action流程图

    3、典型的transformation和action

    RDD有3种操作:

    1、  Trandformation      对数据状态的转换,即所谓算子的转换

    2、  Action    触发作业,即所谓得结果的

    3、  Contoller  对性能、效率和容错方面的支持,如cache、persist、checkpoint

    Contoller包括cache、persist、checkpoint。

     

    /**
    * Return a new RDD by applying a function to all elements of this RDD.
    */
    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    }

    传入类型是T,返回类型是U。

     

    元素之间,为什么reduce操作,要符合结合律和交换律?
    答:因为,交换律,不知,哪个数据先过来。所以,必须符合交换律。
    在交换律基础上,想要reduce操作,必须要符合结合律。

    /**

    * Reduces the elements of this RDD using the specified commutative and
    * associative binary operator.
    */
    def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
    if (iter.hasNext) {
    Some(iter.reduceLeft(cleanF))
    } else {
    None
    }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
    if (taskResult.isDefined) {
    jobResult = jobResult match {
    case Some(value) => Some(f(value, taskResult.get))
    case None => taskResult
    }
    }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
    }

    RDD.scala(源码)


    这里,新建包com.zhouls.spark.cores

    package com.zhouls.spark.cores

    /**
    * Created by Administrator on 2016/9/27.
    */
    object TextLines {

    }


    下面,开始编代码

    本地模式

    自动 ,会写好

    源码来看,

    所以, val lines = sc.textFile("C:\Users\Administrator\Desktop\textlines.txt") //通过HadoopRDD以及MapPartitionsRDD获取文件中每一行的内容本身

     

    val lineCount = lines.map(line => (line,1)) //每一行变成行的内容与1构成的Tuple


    val textLines = lineCount.reduceByKey(_+_)


    textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))

     成功!



     现在,将此行代码,

         textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
    改一改
         textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
    
    

    总结:

    本地模式里,
       textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
    改一改
         textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
    运行正常,因为在本地模式下,是jvm,但这样书写,是不正规的。
    
    
    
    集群模式里,
       textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
    改一改
         textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
    运行无法通过,因为结果是分布在各个节点上。
    
    
    collect源码:
    /**
    * Return an array that contains all of the elements in this RDD.
    */
    def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
    }

    得出,collect后array中就是一个元素,只不过这个元素是一个Tuple。
    Tuple是元组。通过concat合并!


    foreach源码:

    /**
    * Applies a function f to all elements of this RDD.
    */
    def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }
      


    rdd实战(rdd基本操作实战)至此!

    
    

     rdd实战(transformation流程图)

     拿wordcount为例!

    启动hdfs集群

    spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

     

     启动spark集群

    spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

     

    启动spark-shell

    spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g

    scala> val partitionsReadmeRdd =  sc.textFile("hdfs://SparkSingleNode:9000/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("~/partition1README.txt")

     或者

     scala> val readmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md")

     scala>  val partitionsReadmeRdd = readmeRdd.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_,1)

    .saveAsTextFile("~/partition1README.txt")

     

    注意,~目录,不是这里。

     为什么,我的,不是这样的显示呢?

    RDD的transformation和action执行的流程图

    典型的transformation和action

  • 相关阅读:
    vs 2005 下 逐阶 海量测试堆算法 记录 【永久更新】
    预备 归并排序 –from wikipedia 演示
    有关堆栈溢出(in vs 2005)的读书笔记堆栈中 申请大数组
    Heapsort 代码 学习笔记 阳春三月版
    那些基础算法的 数学不等式 @快排分划 @kmp覆盖函数
    珠儿 快排 三月版本(主题:学代码,撘框架)(永久更新)
    c 语言格式输出 浮点数 不要用 整形输出 教训
    修改 堆栈大小 普适性方案总结 (跨平台 windows linux 栈设置大小)
    转tip 在VC下编译使用unistd.h,times.h等文件
    DB2用命令窗口连接数据库
  • 原文地址:https://www.cnblogs.com/zlslch/p/5913334.html
Copyright © 2011-2022 走看看