zoukankan      html  css  js  c++  java
  • spark学习(五)总结及其demo

    RDD及其特点

    1、RDD是Spark的核心数据模型,但是个抽象类,全称为Resillient Distributed Dataset,即弹性分布式数据集。

    2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)

    3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。

    4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDDpartition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。

    5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)

    创建RDD

    进行Spark核心编程的第一步就是创建一个初始的RDD。该RDD,通常就代表和包含了Spark应用程序的输入源数据。然后通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。

    Spark Core提供了三种创建RDD的方式:

    1.使用程序中的集合创建RDD(主要用于测试)

    List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);
    

    2.使用本地文件创建RDD(主要用于临时性处理有大量数据的文件)

    SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
    JavaRDD<String> lines = spark.read().textFile("D:\Users\Administrator\Desktop\spark.txt").javaRDD();
    

    3.使用HDFS文件创建RDD(生产环境的常用方式)

    SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
    JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();
    

    使用HDFS文件创建RDD对比使用本地文件创建RDD,需要修改的,只有两个地方:
    第一,将SparkSession对象的master("local")方法去掉
    第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件

    操作RDD

    Spark支持两种RDD操作:transformation和action。

    transformation操作

    transformation操作会针对已有的RDD创建一个新的RDD。transformation具有lazy特性,即transformation不会触发spark程序的执行,它们只是记录了对RDD所做的操作,不会自发的执行。只有执行了一个action,之前的所有transformation才会执行。

    常用的transformation介绍:

    map :将RDD中的每个元素传人自定义函数,获取一个新的元素,然后用新的元素组成新的RDD。

    filter:对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。

    flatMap:与map类似,但是对每个元素都可以返回一个或多个元素。

    groupByKey:根据key进行分组,每个key对应一个Iterable<value>。

    reduceByKey:对每个key对应的value进行reduce操作。

    sortByKey:对每个key对应的value进行排序操作。

    join:对两个包含<key,value>对的RDD进行join操作,每个keyjoin上的pair,都会传入自定义函数进行处理。

    cogroup:同join,但是每个key对应的Iterable<value>都会传入自定义函数进行处理。

    action操作

    action操作主要对RDD进行最后的操作,比如遍历,reduce,保存到文件等,并可以返回结果给Driver程序。action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行,这是action的特性。

    常用的action介绍:

    reduce:将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。

    collect:将RDD中所有元素获取到本地客户端(一般不建议使用)。

    count:获取RDD元素总数。

    take(n):获取RDD中前n个元素。

    saveAsTextFile:将RDD元素保存到文件中,对每个元素调用toString方法。

    countByKey:对每个key对应的值进行count计数。

    foreach:遍历RDD中的每个元素。

    RDD持久化

    要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。但是cache()或者persist()的使用是有规则的,必须在transformation或者textFile等创建了一个RDD之后,直接连续调用cache()或persist()才可以。

    如果你先创建一个RDD,然后单独另起一行执行cache()或persist()方法,是没有用的,而且会报错,大量的文件会丢失。

    val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()
    

    Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。

    通用的持久化级别的选择建议:

    1、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作。

    2、如果MEMORY_ONLY策略,无法存储所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操作还是非常快,只是要消耗CPU进行反序列化。

    3、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了。

    4、能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如重新计算一次。

    共享变量

    Spark提供了两种共享变量:Broadcast Variable(广播变量)和Accumulator(累加变量)。

    BroadcastVariable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。广播变量是只读的。

    val factor = 3
    val broadcastVars = sc.broadcast(factor);
    val numberList = Array(1,2,3,4,5)
    val number = sc.parallelize(numberList).map( num => num * broadcastVars.value)  //广播变量读值broadcastVars.value
    

    Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

    val numberList = Array(1,2,3,4,5)
    val numberRDD = sc.parallelize(numberList,1)
    val sum = sc.accumulator(0)
    numberRDD.foreach{m => sum += m}
    

    小案例实战1

    案例需求:

    1、对文本文件内的每个单词都统计出其出现的次数。
    2、按照每个单词出现次数的数量,降序排序。

    步骤:

    • 1.创建RDD
    • 2.将文本进行拆分 (flatMap)
    • 3.将拆分后的单词进行统计 (mapToPair,reduceByKey)
    • 4.反转键值对 (mapToPair)
    • 5.按键升序排序 (sortedByKey)
    • 6.再次反转键值对 (mapToPair)
    • 7.打印输出(foreach)

    Java版本jdk1.8以下

    public class SortWordCount {
        public static void main(String[] args) throws Exception {
            SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            // 创建lines RDD
            JavaRDD<String> lines = sc.textFile("D:\Users\Administrator\Desktop\spark.txt");
            // 将文本分割成单词RDD
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    return Arrays.asList(s.split(" ")).iterator();
                }
            });
            //将单词RDD转换为(单词,1)键值对RDD
            JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() {
                @Override
                public Tuple2<String,Integer> call(String s) throws Exception {
                    return new Tuple2<String,Integer>(s,1);
                }
            });
           //对wordPair 进行按键计数
            JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer +integer2;
                }
            });
            // 到这里为止,就得到了每个单词出现的次数
            // 我们的新需求,是要按照每个单词出现次数的顺序,降序排序
            // wordCounts RDD内的元素是这种格式:(spark, 3) (hadoop, 2)
            // 因此我们需要将RDD转换成(3, spark) (2, hadoop)的这种格式,才能根据单词出现次数进行排序
    
            // 进行key-value的反转映射
            JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception {
                    return new Tuple2<Integer, String>(s._2,s._1);
                }
            });
            // 按照key进行排序
            JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false);
            // 再次将value-key进行反转映射
            JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception {
                    return new Tuple2<String, Integer>(s._2,s._1);
                }
            });
            // 到此为止,我们获得了按照单词出现次数排序后的单词计数
            // 打印出来
            sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                @Override
                public void call(Tuple2<String, Integer> s) throws Exception {
                    System.out.println("word ""+s._1+"" appears "+ s._2+" times.");
                }
            });
            sc.close();
        }
    }
    

    Java版本jdk1.8

    可以使用lambda表达式,简化代码:

    public class SortWordCount {
        public static void main(String[] args) throws Exception {
            SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            // 创建lines RDD
            JavaRDD<String> lines = sc.textFile("D:\Users\Administrator\Desktop\spark.txt");
            JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
            JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1));
            JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b));
            JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1));
            JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false);
            JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1));
            sortedWordCount.foreach(s->System.out.println("word ""+s._1+"" appears "+ s._2+" times."));
            sc.close();
        }
    }
    

    scala版本

    由于spark2 有了统一切入口SparkSession,在这里就使用了SparkSession。

    package cn.spark.study.core
    import org.apache.spark.sql.SparkSession
    object SortWordCount {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate()
        val lines = spark.sparkContext.textFile("D:\Users\Administrator\Desktop\spark.txt")
        val words = lines.flatMap{line => line.split(" ")}
        val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
        val countWord = wordCounts.map{word =>(word._2,word._1)}
        val sortedCountWord = countWord.sortByKey(false)
        val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
        sortedWordCount.foreach(s=>
        {
          println("word ""+s._1+ "" appears "+s._2+" times.")
        })
        spark.stop()
      }
    }
    

    小案例实战2

    需求:

    1、按照文件中的第一列排序。
    2、如果第一列相同,则按照第二列排序。

    实现步骤:

    • 1、实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法
    • 2、将包含文本的RDD,映射成key为自定义key,value为文本的JavaPairRDD(map)
    • 3、使用sortByKey算子按照自定义的key进行排序(sortByKey)
    • 4、再次映射,剔除自定义的key,只保留文本行(map)
    • 5、打印输出(foreach)

    这里主要用scala编写

    class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{
      override def compare(that: SecondSortKey): Int = {
        if(this.first - that.first !=0){
          this.first-that.first
        }else{
          this.second-that.second
        }
      }
    }
    object SecondSort {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate()
        val lines = spark.sparkContext.textFile("D:\sort.txt")
        val pairs = lines.map{line => (
          new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
        )}
        val sortedParis = pairs.sortByKey()
        val sortedLines = sortedParis.map(pairs => pairs._2)
        sortedLines.foreach(s => println(s))
        spark.stop()
      }
    }
    

    小案例实战3

    需求:

    对每个班级内的学生成绩,取出前3名。(分组取topn)

    实现步骤:

    1.创建初始RDD

    2.对初始RDD的文本行按空格分割,映射为key-value键值对

    3.对键值对按键分组

    4.获取分组后每组前3的成绩:

    • 4.1 遍历每组,获取每组的成绩
    • 4.2 将一组成绩转换成一个数组缓冲
    • 4.3 将数组缓冲按从大到小排序
    • 4.4 对排序后的数组缓冲取其前三

    5.打印输出

    以下是使用scala实现:

    object GroupTop3 {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate()
        //创建初始RDD
        val lines = spark.sparkContext.textFile("D:\score.txt")
        //对初始RDD的文本行按空格分割,映射为key-value键值对
        val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
        //对pairs键值对按键分组
        val groupedPairs = pairs.groupByKey()
        //获取分组后每组前3的成绩
        val top3Score = groupedPairs.map(classScores => {
          var className = classScores._1
          //获取每组的成绩,将其转换成一个数组缓冲,并按从大到小排序,取其前三
          var top3 = classScores._2.toBuffer.sortWith(_>_).take(3)
          Tuple2(className,top3)
        })
        top3Score.foreach(m => {
          println(m._1)
          for(s <- m._2) println(s)
          println("------------------")
        })
      }
    }
    

    以上三个小案例都用Scala实现了,用到了Scala中的集合的操作、高阶函数、链式调用、隐式转换等知识,自己动手实现,对Scala有个比较好的理解和掌握。

  • 相关阅读:
    NetSuite Batch Process Status
    NetSuite generated bank files and the Bank Reconciliation process
    Global Search file content in Gitlab repositories, search across repositories, search across all files
    FedEx Package Rate Integration with NetSuite direct integrate by WebServices
    git Merge branches
    git tag and NetSuite deployment tracking
    API 读写cookie的方法
    C# 生成缩略图
    解决jquery操作checkbox全选全不选无法勾选问题
    JS读取写入删除Cookie方法
  • 原文地址:https://www.cnblogs.com/aibabel/p/10834623.html
Copyright © 2011-2022 走看看