zoukankan      html  css  js  c++  java
  • 【spark 深入学习 06】RDD编程之旅基础篇02-Spaek shell

    ---------------------

    本节内容:

    · Spark转换 RDD操作实例

    · Spark行动 RDD操作实例

    · 参考资料

    ---------------------

      关于学习编程方式的,每个人都有自己的方式。对我个人来说,最好的方法还是多动手写demo,要多写代码,才能理解的更加深刻,本节以例子的形式讲解各个Spark RDD的使用方法和注意事项,本文一共讲解了20个RDD的使用demo。

    一、Spark转换 RDD操作实例

       RDD转换操作返回的是RDD,而行动操作返回的是其他数据类型。

    1.例子:textFile/collect/foreach

    ---------------------

    val line =sc.textFile("/tmp/test/core-site.xml");

    line.collect().foreach(println);

    ------

    说明:

    textFile:读取hdfs数据

    collect:收集RDD数据集

    foreach:循环遍历打印出来

    ---------------------

    val line =sc.parallelize(List(1,2,3,4));

    line.map(x=>x*x);

    line.collect().mkString(",").foreach(print);

    ------

    说明:

    parallelize:从外部数据集创建RDD

    map:接收一个函数,把这个函数作用于RDD中的每一个元素,输入类型和返回类型不需要相同。

    mkString:增加分隔符

    ---------------------

    2.例子:flatMap/first

    ---------------------

    val lines =sc.parallelize(List("hello world","hi hi hhe"));

    val words=lines.flatMap(line => line.split(" "));

    words.collect().foreach(println);

    words.first();

     ------

    说明:

    flatMap:flatMap将返回的迭代器拍扁,将迭代器中的多个RDD中的元素取出来组成一个RDD.

    first:收集RDD数据集中的第一个数据

    ---------------------

    3.例子:filter/union

    ---------------------

    val lines =sc.textFile("/tmp/test/core-site.xml");

    val name=lines.filter(line =>line.contains("name"));

    val value=lines.filter(line =>line.contains("value"));

    val result=name.union(value);

    result.collect().foreach(println);

    ------

    说明:

    filter:不会改变RDD中的内容,将满足filter条件的元素返回,形成新的RDD.

    union:将两个RDD的内容合并成一个RDD,操作的是两个RDD.需要两个RDD中的元素类型是相同的.

    ---------------------

    4.例子:distinct/sample/intersection/subtract/cartesian

    ---------------------

    val lines =sc.parallelize(List(1,2,3,4,1,2,3,3));

    val result = lines.distinct();

    result.collect().foreach(println);

    val a = sc.parallelize(1 .to(1000),3);

    val result = a.sample(false,0.02,0);

    result.collect().foreach(println);

    val a=sc.parallelize(List(1,2,3,4));

    val b=sc.parallelize(List(1,2));

    val result=a.intersection(b);

    val result2=a.subtract(b);

    result.collect().foreach(println);

    result2.collect().foreach(println);

    val a=sc.parallelize(List("a","b","c"));

    val b=sc.parallelize(List("1","2"));

    val result=a.cartesian(b);

    result.collect().foreach(println);

    ------

    说明:

    distinct:对RDD中元素去重,会触达shuffle操作,低效

    sample:对RDD中的集合内元素进行采样,第一个参数withReplacement是true表示有放回取样,false表示无放回。第二个参数表示比例(取出元素个数占比),第三个参数是随机种子。

    intersection:求两个RDD共同的元素的RDD,回去重,引发shuffle操作.

    subtract:移除RDD中的内容,会引发shuffle操作.

    cartesian:求笛卡尔积,大规模数据时开销巨大.

    ---------------------

    二、Spark行动 RDD操作实例

    1.例子:reduce/fold/aggregate[action操作]

    --------reduce-------------

    val line =sc.parallelize(List(1,2,3,4));

    val sum = line.reduce((x,y) =>x+y);

    println(sum);

    --------fold-------------

    val line =sc.parallelize(List(1,2,3,4),2);

    val sum = line.fold(1)((x,y) =>x+y);

    println(sum);

    --------aggregate demo01-------------

    val line =sc.parallelize(List(1,2,3,4));

    val result= line.aggregate((0,0))(

    (acc,value)=>(acc._1+value,acc._2+1),

    (acc1,acc2) =>(acc1._1+acc2._1,acc1._2+acc2._2)

    );

    val avg=result._1/result._2.toDouble;

    println(avg);

    --------aggregate demo02-------------

    def seqOP(a:Int,b:Int):Int={

    println("seqOp:"+a+" "+b)

    math.min(a,b)

    }

    def comOp(a:Int,b:Int):Int={

    println("comOp:"+a+" "+b)

    a+b

    }

    val line=sc.parallelize(List(1,2,3,4,5),1);

    val result=line.aggregate(2)(seqOP,comOp);

    println(result);

    ------

    说明:

    reduce:接收一个函数作为参数,函数将两个相同元素类型的RDD数据并返回同一个类型的新元素.

    fold:将两个RDD的内容合并成一个RDD,操作的是两个RDD.需要两个RDD中的元素类型是相同的.fold计算过程是这样的,

    假如line 只有1个partition

    第一个partition计算

    第一次:1+1=2;

    第二次:2+2=4;

    第三次:3+4=7;

    第四次:4+7=11;

    combie计算:

    第一次:11+1=12,最终结果:12

    假如line 有2个partion【val line =sc.parallelize(List(1,2,3,4),2);】

    第一个partition计算

    第一次:1+1=2;

    第二次:2+2=4;

    第二个partition计算

    第三次:3+1=4;

    第四次:4+4=8;

    combie计算:

    第一次:4+1=5;

    第二次:5+8=13

    最终结果:13

    aggregate:执行过程

    //demo1执行过程

    step1:(0+1,0+1)=(1,1)

    step2:(1+2,1+1)=(3,2)

    step3:(3+3,2+1)=(6,3)

    step4:(4+6,3+1)=(10,4)

    step5:(0+10,0+4)=(10,4)

    avg=10/4=2.5

    //demo2执行过程

    step1:math.min(2,1)=1

    step2:math.min(1,2)=1

    step3:math.min(1,3)=1

    step4:math.min(1,4)=1

    step4:math.min(1,5)=1

    step5:2+1=3

    2.count/countByValue/take/top/takeOrdered

    ---------------------

    val line=sc.parallelize(List(1,2,3,3),1);

    val result=line.count();

    println(result);

    val line=sc.parallelize(List(1,2,3,3),1);

    val result=line.countByValue();

    println(result);

    val line=sc.parallelize(List(1,2,3,3),1);

    val result=line.take(3);

    result.foreach(println);

    val line=sc.parallelize(List(1,2,3,3),1);

    val result=line.top(2);

    result.foreach(println);

    val line=sc.parallelize(List(1,2,3,3),1);

    val result=line.takeOrdered(2);

    result.foreach(println);

    val line=sc.parallelize(List(1,2,3,3),1);

    val result=line.takeSample(false,2);

    result.foreach(println);

    ------

    说明:

    count:返回RDD中元素的个数.

    countByValue: 各元素在RDD中出现的次数.

    take:从RDD中取出前n个元素个数,与collect比,都是从远程集群上获取元素,只是collect操作获取的所有数据,而take操作是获取前n个元素.

    top:返回最前面的n个元素.

    takeOrdered:从RDD中按照提供的顺序返回最前面n个元素.

    takeSample:从RDD中返回任意一些元素.

    ---------------------

    三、参考资料

    1.fold计算过程-http://www.aboutyun.com/home.php?mod=space&uid=1&do=blog&id=368

    2.fold计算过程-http://www.cnblogs.com/MOBIN/p/5414490.html#12

    3.aggregate计算过程-https://www.iteblog.com/archives/1268.html

  • 相关阅读:
    Recommended Books for Algo Trading in 2020
    Market Making is simpler than you think!
    Top Crypto Market Makers of 2020
    Top Crypto Market Makers, Rated and Reviewed
    爬取伯乐在线文章(五)itemloader
    爬取伯乐在线文章(四)将爬取结果保存到MySQL
    爬取伯乐在线文章(三)爬取所有页面的文章
    爬取伯乐在线文章(二)通过xpath提取源文件中需要的内容
    爬取伯乐在线文章(一)
    爬虫去重策略
  • 原文地址:https://www.cnblogs.com/licheng/p/6815305.html
Copyright © 2011-2022 走看看