zoukankan      html  css  js  c++  java
  • 4.RDD操作之Transform

    RDD 两种类型的操作: Transform转化操作 和Action行动操作。

    Transform操作会由一个RDD 生成一个新的RDD,这个过程中不进行实质计算,只有当第一次Action操作时才会真正计算。  称作Lazy计算,惰性计算。

    比如:

    scala> val a = sc.parallelize(1 to 9, 3)

    scala> val b = a.map(x => x*2)            // map() 是Transform函数

    scala> b.first                          // first() 是Action函数,此时才进行计算。

    行动操作会对RDD 计算出一个结果,可以把结果返回,或把结果存储到外部存储系统(如HDFS)中。

    RDD是类Iterator的数据结构,也具有Iterator类的Map()、filter()、flatMap()等高阶函数,这些函数是Scala里常用的。

    1.1 Transform操作

    分成单元素RDD和k-v元素RDD两种。

    简单的有Map()、filter()、flatMap(),见下图:

    flatMap() 对RDD中每个元素进行函数处理,和map函数不同的是,flatMap返回的可以是不同数据类型。

    对单个RDD的Transform函数:

    两个RDD之间的Transform函数:

    我们打开spark-shell

    [root@master ~]# spark-shell
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel).
    19/12/11 13:13:18 WARN spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.
    Spark context Web UI available at http://192.168.199.130:4040
    Spark context available as 'sc' (master = local[*], app id = local-1576041198214).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.0.0
          /_/
             
    Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> val arr=Array("abc abc d d a c f g","sdkj kji jji jkl ooj hhu jkl","jki ihb jl hjihi jiow jkw bvjg lkjsdf","iqweio kljlf kljdfj slkj tkgj")
    arr: Array[String] = Array(abc abc d d a c f g, sdkj kji jji jkl ooj hhu jkl, jki ihb jl hjihi jiow jkw bvjg lkjsdf, iqweio kljlf kljdfj slkj tkgj)
    
    scala> val rdd = sc.parallelize(arr)
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26
    
    scala> val rdd1 = rdd.flatMap(x=>x.split(" "))
    rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at flatMap at <console>:28
    
    scala> rdd.collect
    res0: Array[String] = Array(abc abc d d a c f g, sdkj kji jji jkl ooj hhu jkl, jki ihb jl hjihi jiow jkw bvjg lkjsdf, iqweio kljlf kljdfj slkj tkgj)
    
    scala> rdd1.collect
    res1: Array[String] = Array(abc, abc, d, d, a, c, f, g, sdkj, kji, jji, jkl, ooj, hhu, jkl, jki, ihb, jl, hjihi, jiow, jkw, bvjg, lkjsdf, iqweio, kljlf, kljdfj, slkj, tkgj)
    
    scala> val rdd2 =rdd1.map(i=>(i,1))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[2] at map at <console>:30
    
    scala> rdd2.collect
    res2: Array[(String, Int)] = Array((abc,1), (abc,1), (d,1), (d,1), (a,1), (c,1), (f,1), (g,1), (sdkj,1), (kji,1), (jji,1), (jkl,1), (ooj,1), (hhu,1), (jkl,1), (jki,1), (ihb,1), (jl,1), (hjihi,1), (jiow,1), (jkw,1), (bvjg,1), (lkjsdf,1), (iqweio,1), (kljlf,1), (kljdfj,1), (slkj,1), (tkgj,1))
    
    scala> val rdd3=rdd2.reduceByKey((x,y)=>(x+y))
    rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at <console>:32
    
    scala> rdd3.collect
    res3: Array[(String, Int)] = Array((d,2), (kljlf,1), (kji,1), (kljdfj,1), (a,1), (ooj,1), (jkl,2), (sdkj,1), (slkj,1), (jji,1), (bvjg,1), (jkw,1), (hjihi,1), (jl,1), (hhu,1), (f,1), (jki,1), (lkjsdf,1), (abc,2), (iqweio,1), (g,1), (tkgj,1), (jiow,1), (c,1), (ihb,1))
    
    scala> 
  • 相关阅读:
    【服务器】【Windows】【3】开放服务器端口
    【服务器】【Windows】【2】把jar包做成服务,在Service中管理
    FZU 1753
    poj 1017
    poj 1666
    poj 1132
    ZOJ 2562 More Divisors
    POJ 2992 Divisors
    poj 2773 happy 2006
    poj 2407 Relatives
  • 原文地址:https://www.cnblogs.com/braveym/p/12160269.html
Copyright © 2011-2022 走看看