zoukankan      html  css  js  c++  java
  • spark浅谈(2):SPARK核心编程

    一、SPARK-CORE

      1.spark核心模块是整个项目的基础。提供了分布式的任务分发,调度以及基本的IO功能,Spark使用基础的数据结构,叫做RDD(弹性分布式数据集),是一个逻辑的数据分区的集合,可以跨机器。RDD可以通过两种方式进行创建,一种是从外部的数据集引用数据,第二种方式是通过在现有的RDD上做数据转换。RDD抽象是通过语言集成的API来进行暴露,它简化了编程的复杂度,因为这种操纵RDD的方式类似于操纵本地数据集合

    二、RDD变换(API阅读)

    **
     * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
     * partitioned collection of elements that can be operated on in parallel. This class contains the
     * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
     * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
     * pairs, such as `groupByKey` and `join`;
     * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
     * Doubles; and
     * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
     * can be saved as SequenceFiles.
     * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
     * through implicit.
     *
     * Internally, each RDD is characterized by five main properties:
     *
     *  - A list of partitions
     *  - A function for computing each split
     *  - A list of dependencies on other RDDs
     *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
     *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
     *    an HDFS file)
     *
     * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
     * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
     * reading data from a new storage system) by overriding these functions. Please refer to the
     * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
     * for more details on RDD internals.
     */

      1.RDD变换返回一个指向新RDD的指针并且允许你在RDD之间创建依赖,在依赖链条中的每个RDD都有一个计算数据的函数以及一个指向父RDD的指针。Spark是懒惰的,所以除非你调用一些除法任务创建以及执行的转换或者Action,否则什么都不干。

    因此RDD变换不是一个数据集,而是在一个程序中的一个步骤,用来告诉如何获取数据以及怎么进行数据的相关的处理。

      2.下面给出的是一个RDD变换列表

        (0)接下来的试验都是以test.txt这个文件为试验对象的,其中test.txt中的内容为如下情况:

    hello world1,
    hello world2,
    hello world3,
    hello world4

        (1)map(func):返回一个新的RDD(弹性分布式数据集),通过对这个RDD的每个元素素应用func函数形成一个新的RDD。

        (2)flatMap(func):与map函数相似,但是每个输入项可以被映射为0个或者多个输出项(所以func函数应该返回一个Seq而不是一个单独的数据项)。通过对这个RDD的所有元素应用一个函数来返回一个新的RDD,然后将这个结果进行扁平化处理。

    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCountMapDemo {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("WordCountMapDemo")
        val sc = new SparkContext(conf)
        val rdd1 = sc.textFile("E:/scala/test.txt")
        val rdd2 = rdd1.flatMap(_.split(" "))
        val rdd3 = rdd2.map((_,1))
        val rdd4 = rdd3.reduceByKey(_ + _);
        val rdd5 =rdd4.collect()
        rdd5.foreach(println)
      }
    }

      (3)使用filter过滤器。返回通过选择函数返回true的源元素形成的新数据集。

    package com.jd.www.wordCount
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCountFilterDemo {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("WordCountFilterDemo").setMaster("local");
        val sc = new SparkContext(conf)
        val rdd1 = sc.textFile("E:/scala/test.txt")
        val rdd2 = rdd1.flatMap(_.split(" "))
       //过滤器
        val rdd3 = rdd2.filter(_.startsWith("wor"))
        val rdd4 = rdd3.map((_, 1))
        val rdd5 = rdd4.reduceByKey(_ + _)
        val rdd6 = rdd5.collect()
        rdd6.foreach(println)
      }
    }

      (4)mapPartitions:通过将函数应用于此RDD的每个分区来返回新的RDD。与map类似,但在RDD的每个分区(块)上单独运行,因此当在类型T的RDD上运行时,func必须是Iterator <T> => Iterator <U>类型。 

    package com.jd.www.wordCount
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCountMapPartitionsDemo {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("WordCountFlatMapDemo")
        val sc = new SparkContext(conf);
        val rdd1 = sc.textFile("E:/scala/test.txt")
        val rdd2 = rdd1.flatMap(_.split(" "));
        val rdd3 = rdd2.mapPartitions(it=> {
          import scala.collection.mutable.ArrayBuffer
          val buf = new ArrayBuffer[String]()
          for (e <- it) {
            buf.+=("_" + e)
          }
          buf.iterator
        }
        )
        val rdd4 = rdd3.map((_, 1))
        val rdd5 = rdd4.reduceByKey(_ + _)
        val rdd6 = rdd5.collect()
        rdd6.foreach(println)
      }
    
    }

      (5)mapPartitionsWithIndex:通过对这个RDD的每个分区应用一个函数,然后返回一个新的RDD,同时对索引进行跟踪

    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCountMapPartitionsWithIndex {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("WordCountMapPartitionsWithIndex").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val rdd1 = sc.textFile("e:/scala/test.txt",4)//定义最小分区数
        val rdd2 = rdd1.flatMap(_.split(" "))
        val rdd3 = rdd2.mapPartitionsWithIndex((index,it)=>{
          import  scala.collection.mutable.ArrayBuffer
          val tName = Thread.currentThread().getName
          println(tName+":"+index+""+":mappartitions start")
          val buf = new ArrayBuffer[String]()
          for(e<-it){
            buf.+=("_"+e)
          }
          buf.iterator
        })
        val rdd4 = rdd3.map((_,1))
        val rdd5 = rdd4.reduceByKey(_ + _)
        rdd5.foreach(println)
      }
    
    }

      (6)sample(withReplacementfractionseed):使用给定的随机数生成器种子,在有或没有替换的情况下对数据的一小部分进行采样。

        

        

       

        

        

  • 相关阅读:
    ant design拖拽手柄列拖动时样式错乱的解决方案
    ant design pro columns属性valueEnum下拉框按顺序显示
    blob转换为file上传(七牛云等)
    最新前端面试题收集(一)
    node 服务端分层模型小结
    将博客搬至CSDN
    Koa 连接mysql数据,mysql数据库表初始化脚本
    Koa 数据库连接和查询分离, CommonJS 模块遇到的一个坑
    koa session 存储方案
    koa-router 入门与使用
  • 原文地址:https://www.cnblogs.com/bigdata-stone/p/9936622.html
Copyright © 2011-2022 走看看