zoukankan      html  css  js  c++  java
  • Spark-运行模式&RDD操作

    记录一下spark RDD的基本操作,有Transformation、Action和Controller。

    (1)Transformation:属于懒操作算子,不会真正的执行RDD处理计算。

    (2)Action:执行它的方法才会真正触发RDD处理计算。

    (3)Controller控制:persist、cache、checkpoint等,后续补充。

    运行模式

    spark有三种运行模式,分别是local本地单机模式、standalone集群模式,on yarn模式(spark集群的资源管理交给yarn)。为了测试的方便,使用local模式,需要修改spark根目录/conf/spark-env.sh下的一个配置,其他均不用修改配置。

    # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
    # 节点名为hadoop01,这里就配置hadoop01
    SPARK_LOCAL_IP=hadoop01
    

    启动需启动bin目录下的spark-shell脚本,命令为"sh spark-shell —master=local"。

    [root@hadoop01 /home/software/spark-2.0.1-bin-hadoop2.7/bin]# sh spark-shell --master=local
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel).
    20/02/29 09:41:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    20/02/29 09:41:43 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
    # web ui地址
    Spark context Web UI available at http://192.168.200.140:4040
    # spark提供的环境对象
    Spark context available as 'sc' (master = local, app id = local-1582940503603).
    # spark提供的会话独享对象
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.0.1
          /_/
    
    Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
    Type in expressions to have them evaluated.
    Type :help for more information.
    # 可以创建和操作spark RDD了
    scala>
    

    RDD

    RDD(Resilient Distributed Dataset),弹性分布式数据集。类似Scala中的List和Array,它不是一种普通的数据集,具备数据分区、容错的功能。

    创建RDD有多种方式,一是使用sc.parallelize(普通集合,分区数),一是使用sc.makeRDD(普通集合,分区数),还有就是直接从hdfs、hbase等读取。其中sc是spark context,即spark上下文对象,通过它可以操作spark和创建RDD。

    # 准备普通集合
    scala> val arr=Array(1,2,3,4)
    arr: Array[Int] = Array(1, 2, 3, 4)
    # 创建RDD,第一种方式
    scala> val r1=sc.parallelize(arr,2)
    r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
    # 查看rdd内容,生产模式下不建议使用,很有可能会造成内存溢出
    scala> r1.collect
    res1: Array[Int] = Array(1, 2, 3, 4)
    # 查看分区数
    scala> r1.partitions.size
    res3: Int = 2
    # 查看每个分区的元素,结果以Array返回
    scala> r1.glom.collect
    res4: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4))
    # 创建RDD,第二种方式
    scala> val r2=sc.makeRDD(arr,3)
    r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:26
    # 查看
    scala> r2.glom.collect
    res5: Array[Array[Int]] = Array(Array(1), Array(2), Array(3, 4))
    # 创建RDD,第三种方式
    scala> val r3=sc.textFile("/home/test.txt")
    r3: org.apache.spark.rdd.RDD[String] = /home/test.txt MapPartitionsRDD[7] at textFile at <console>:24
    # 读取hdfs内容
    scala> val r7=sc.textFile("hdfs://hadoop01:9000/readme.txt")
    r7: org.apache.spark.rdd.RDD[String] = hdfs://hadoop01:9000/readme.txt MapPartitionsRDD[18] at textFile at <console>:24
    scala> r7.collect
    res12: Array[String] = Array(hello hdfs, hello buddy I come from changsha, what about you)
    # 读取linux本地内容
    scala> val r6=sc.textFile("file:///home/text.txt")
    r6: org.apache.spark.rdd.RDD[String] = file:///home/text.txt MapPartitionsRDD[20] at textFile at <console>:24
    scala> r6.collect
    res13: Array[String] = Array(hello buddy, I come from changsha, what about you)
    

    Transformation

    记录一下懒操作相关的方法。

    (1)map(func)。

    scala> val r1=sc.makeRDD(List(1,2,3,4),2)
    r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at makeRDD at <console>:24
    
    scala> r1.collect
    res14: Array[Int] = Array(1, 2, 3, 4)
    
    scala> val r2=r1.map{num=>num*2}
    r2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at map at <console>:26
    # 变成2倍
    scala> r2.collect
    res15: Array[Int] = Array(2, 4, 6, 8)
    

    (2)flatMap(func)。

    text.txt内容。

    [root@hadoop01 /home]# cat text.txt
    hello buddy
    I come from changsha
    what about you
    

    使用flatmap扁平化处理。

    scala> val r3=sc.textFile("file:///home/text.txt").flatMap{line=>line.split(" ")}
    r3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at flatMap at <console>:24
    # 先对每个元素处理,然后再扁平化
    scala> r3.collect
    res16: Array[String] = Array(hello, buddy, I, come, from, changsha, what, about, you)
    

    (3)filter(func)。

    scala> r1.collect
    res14: Array[Int] = Array(1, 2, 3, 4)
    # 过滤出偶数
    scala> r1.filter{x=>x%2==0}
    res17: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[26] at filter at <console>:27
    scala> res17.collect
    res18: Array[Int] = Array(2, 4)
    

    (4)mapPartitions(func)。

    # 准备数据
    scala> val r1=sc.makeRDD(List(1,2,3,4),2)
    r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at <console>:24
    scala> r1.glom.collect
    res19: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4))
    # 参数是分区迭代器,返回的也是迭代器
    scala> val r2=r1.mapPartitions{it=>
         | var result=List[Int]()
         | var sum=0
         | while(it.hasNext){
         | sum+=it.next
         | }
         | result.::(sum).iterator # .::()是头追加,再转换为迭代器,单次循环头追加完也就一个元素
         | }
    r2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at mapPartitions at <console>:26
    # 迭代1+2 3+4,结果3,7
    scala> r2.collect
    res20: Array[Int] = Array(3, 7)
    

    (5)mapPartitionsWithIndex(func)。

    # 类似mapPartitions,只是函数参数里多了个分区索引
    scala> val r3=r1.mapPartitionsWithIndex{(index,it)=>
         | var l=List[Int]()
         | var sum=0
         | while(it.hasNext){
         | sum+=it.next
         | }
         | l.::(index+":"+sum).iterator # 拼接上索引
         | }
    r3: org.apache.spark.rdd.RDD[Any] = MapPartitionsRDD[30] at mapPartitionsWithIndex at <console>:26
    # 成功拼接索引
    scala> r3.glom.collect
    res21: Array[Array[Any]] = Array(Array(0:3), Array(1:7))
    # 换一种方式拼接
    scala> val r3=r1.mapPartitionsWithIndex{(index,it)=>
         | var l=List[String]()
         | var sum=0
         | while(it.hasNext){
         | if(index==0) l=l:+(it.next+"a") # :+() 类似append()
         | else l=l:+(it.next+"b")
         | }
         | l.iterator
         | }
    r3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[32] at mapPartitionsWithIndex at <console>:26
    # 成功拼接索引
    scala> r3.glom.collect
    res22: Array[Array[String]] = Array(Array(1a, 2a), Array(3b, 4b))
    

    (6)union(otherDataset)。

    scala> val r1=sc.makeRDD(List(1,2,3),2)
    r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at makeRDD at <console>:24
    
    scala> val r2=sc.makeRDD(List(4,5,6),2)
    r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[35] at makeRDD at <console>:24
    # 并集
    scala> r1++r2
    res23: org.apache.spark.rdd.RDD[Int] = UnionRDD[36] at $plus$plus at <console>:29
    scala> res23.collect
    res24: Array[Int] = Array(1, 2, 3, 4, 5, 6)
    # 并集
    scala> r1.union(r2)
    res25: org.apache.spark.rdd.RDD[Int] = UnionRDD[37] at union at <console>:29
    scala> res25.collect
    res26: Array[Int] = Array(1, 2, 3, 4, 5, 6)
    

    (7)intersection(otherDataset)。

    准备数据。

    [root@hadoop01 /home]# cat ip1.txt
    192.168.200.100
    192.168.200.110
    192.168.200.120
    192.168.200.130
    [root@hadoop01 /home]# cat ip2.txt
    192.168.200.100
    192.168.200.80
    192.168.200.90
    

    求两组数据里相同的ip。

    # 读取数据
    scala> val r1=sc.textFile("file:///home/ip1.txt")
    r1: org.apache.spark.rdd.RDD[String] = file:///home/ip1.txt MapPartitionsRDD[39] at textFile at <console>:24
    scala> val r2=sc.textFile("file:///home/ip2.txt")
    r2: org.apache.spark.rdd.RDD[String] = file:///home/ip2.txt MapPartitionsRDD[41] at textFile at <console>:24
    # 交集
    scala> r1.intersection(r2)
    res28: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[47] at intersection at <console>:29
    scala> res28.collect
    res29: Array[String] = Array(192.168.200.100)
    

    (8)subtract(otherDataset)。

    scala> val r1=sc.makeRDD(List(1,2,3,4))
    r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at makeRDD at <console>:24
    
    scala> val r2=sc.makeRDD(List(3,4,5))
    r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at makeRDD at <console>:24
    # 取差集
    scala> val r=r1.subtract(r2)
    r: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at subtract at <console>:28
    
    scala> r.collect
    res35: Array[Int] = Array(1, 2)
    

    (9)distinct。

    # 读取数据
    scala> val r1=sc.textFile("file:///home/ip1.txt")
    r1: org.apache.spark.rdd.RDD[String] = file:///home/ip1.txt MapPartitionsRDD[39] at textFile at <console>:24
    scala> val r2=sc.textFile("file:///home/ip2.txt")
    r2: org.apache.spark.rdd.RDD[String] = file:///home/ip2.txt MapPartitionsRDD[41] at textFile at <console>:24
    # union后去重,求得两个文件不同的ip
    scala> r1++r2.distinct
    res33: org.apache.spark.rdd.RDD[String] = UnionRDD[54] at $plus$plus at <console>:29
    scala> res33.collect
    res34: Array[String] = Array(192.168.200.100, 192.168.200.110, 192.168.200.120, 192.168.200.130, 192.168.200.80, 192.168.200.90, 192.168.200.100)
    

    (10)groupByKey。

    # 操作的元素,必须是一个二元tuple
    scala> val r1=sc.makeRDD(List((1,"bj"),(2,"sh"),(3,"bj"),(4,"sh")),2)
    r1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[63] at makeRDD at <console>:24
    scala> r1.collect
    res38: Array[(Int, String)] = Array((1,bj), (2,sh), (3,bj), (4,sh))
    # 根据地名分组
    scala> val r=r1.map{case(k,v)=>(v,k)}.groupByKey
    r: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[65] at groupByKey at <console>:26
    scala> r.collect
    res39: Array[(String, Iterable[Int])] = Array((bj,CompactBuffer(1, 3)), (sh,CompactBuffer(2, 4)))
    # 方式2
    scala> val r=r1.map{t=>(t._2,t._1)}.groupByKey
    r: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[67] at groupByKey at <console>:26
    scala> r.collect
    res40: Array[(String, Iterable[Int])] = Array((bj,CompactBuffer(1, 3)), (sh,CompactBuffer(2, 4)))
    

    (11)reduceByKey(func)。

    # 处理的数据,必须是二元tuple
    scala> val r1=sc.makeRDD(List(("bj",2),("sh",2),("bj",1),("sh",1)),2)
    r1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[68] at makeRDD at <console>:24
    # 函数,必须是(v,v)=>v的形式
    scala> val r=r1.reduceByKey{(a,b)=>{a+b}}
    r: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[69] at reduceByKey at <console>:26
    # reduce聚合后结果
    scala> r.collect
    res41: Array[(String, Int)] = Array((bj,3), (sh,3))
    # 单词频次统计
    scala> val r=sc.textFile("file:///home/text.txt").flatMap{line=>line.split(" ")}.map{word=>(word,1)}.reduceByKey{_+_} # 简写形式_+_
    r: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[74] at reduceByKey at <console>:24
    scala> r.collect
    res42: Array[(String, Int)] = Array((buddy,1), (you,1), (about,1), (I,1), (changsha,1), (come,1), (hello,1), (what,1), (from,1))
    

    (12)aggregateByKey(zeroValue)(func1,func2, [numtasks])。

    zeroValue:初始值,会参与func1的计算。

    func1:分区内分组,按照func1来计算出结果。

    func2:将所有分区的分组计算结果,按照func2来计算。

    scala> val r1=sc.makeRDD(List(("bj",2),("sh",1),("bj",3),("sh",2),("bj",1),("sh",1)),2)
    r1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[78] at makeRDD at <console>:24
    scala> r1.glom.collect
    res48: Array[Array[(String, Int)]] = Array(Array((bj,2), (sh,1), (bj,3)), Array((sh,2), (bj,1), (sh,1)))
    # func1计算后
    # 第一个分区 (bj,5) (sh,1)
    # 第二个分区 (bj,1) (sh,3)
    # func2计算后
    # (bj,5+1) (sh,1+3)
    scala> r1.aggregateByKey(0)(_+_,_+_)
    res49: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[80] at aggregateByKey at <console>:27
    scala> res49.collect
    res50: Array[(String, Int)] = Array((bj,6), (sh,4))
    # func1计算后
    # 第一个分区 (bj,5) (sh,1)
    # 第二个分区 (bj,1) (sh,3)
    # func2计算后
    # (bj,5*1) (sh,1*3)
    scala> r1.aggregateByKey(0)(_+_,_*_)
    res51: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[81] at aggregateByKey at <console>:27
    scala> res51.collect
    res52: Array[(String, Int)] = Array((bj,5), (sh,3))
    

    (13)sortByKey。

    scala> val r1=sc.makeRDD(List((3,"tom"),(1,"rose"),(2,"jim"),(4,"jary")),2)
    r1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[82] at makeRDD at <console>:24
    # 按照key升序
    scala> val r=r1.sortByKey(true)
    r: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[85] at sortByKey at <console>:26
    scala> r.collect
    res53: Array[(Int, String)] = Array((1,rose), (2,jim), (3,tom), (4,jary))
    # 按照key降序
    scala> val r=r1.sortByKey(false)
    r: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[88] at sortByKey at <console>:26
    scala> r.collect
    res54: Array[(Int, String)] = Array((4,jary), (3,tom), (2,jim), (1,rose))
    # 按照名字降序
    scala> val r=r1.map{x=>(x._2,x._1)}.sortByKey(true)
    r: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[92] at sortByKey at <console>:26
    scala> r.collect
    res55: Array[(String, Int)] = Array((jary,4), (jim,2), (rose,1), (tom,3))
    # 可以使用sortBy
    scala> val r=r1.sortBy{x=>x._2}
    r: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[97] at sortBy at <console>:26
    scala> r.collect
    res56: Array[(Int, String)] = Array((4,jary), (2,jim), (1,rose), (3,tom))
    

    (14)join、leftOuterJoin、rightOuterJoin、fullOuterJoin。

    scala> val r1=sc.makeRDD(List(("bj",1),("sh",1)))
    r1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[98] at makeRDD at <console>:24
    
    scala> val r2=sc.makeRDD(List(("bj",2),("sh",3),("cs",4)))
    r2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[99] at makeRDD at <console>:24
    # join取交集
    scala> r1.join(r2)
    res57: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[102] at join at <console>:29
    scala> res57.collect
    res58: Array[(String, (Int, Int))] = Array((sh,(1,3)), (bj,(1,2)))
    # 左外连接,以r1为基准
    scala> r1.leftOuterJoin(r2)
    res59: org.apache.spark.rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[105] at leftOuterJoin at <console>:29
    scala> res59.collect
    res60: Array[(String, (Int, Option[Int]))] = Array((sh,(1,Some(3))), (bj,(1,Some(2))))
    # 右外连接,以r2为基准,因此还有cs的数据
    scala> r1.rightOuterJoin(r2)
    res61: org.apache.spark.rdd.RDD[(String, (Option[Int], Int))] = MapPartitionsRDD[108] at rightOuterJoin at <console>:29
    scala> res61.collect
    res62: Array[(String, (Option[Int], Int))] = Array((sh,(Some(1),3)), (bj,(Some(1),2)), (cs,(None,4)))
    # 全外连接
    scala> r1.fullOuterJoin(r2)
    res63: org.apache.spark.rdd.RDD[(String, (Option[Int], Option[Int]))] = MapPartitionsRDD[111] at fullOuterJoin at <console>:29
    scala> res63.collect
    res64: Array[(String, (Option[Int], Option[Int]))] = Array((sh,(Some(1),Some(3))), (bj,(Some(1),Some(2))), (cs,(None,Some(4))))
    

    (15)cartesian(otherDataset)。

    scala> val r1=sc.makeRDD(List(1,2,3))
    r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[112] at makeRDD at <console>:24
    scala> val r2=sc.makeRDD(List("messi","ronald"))
    r2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[113] at makeRDD at <console>:24
    # 求笛卡尔积
    scala> r1.cartesian(r2)
    res65: org.apache.spark.rdd.RDD[(Int, String)] = CartesianRDD[114] at cartesian at <console>:29
    scala> res65.collect
    res66: Array[(Int, String)] = Array((1,messi), (1,ronald), (2,messi), (2,ronald), (3,messi), (3,ronald))
    

    (16)coalesce(分区数)。

    scala> val r1=sc.makeRDD(List(("bj",1),("sh",1),("bj",2),("sh",2)),2)
    r1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[126] at makeRDD at <console>:24
    # 2个分区
    scala> r1.partitions.size
    res75: Int = 2
    # 扩大为3个分区,需要传入参数true,代表需要重新shuffle
    # 缩小分区,不需要传入true
    scala> val r2=r1.coalesce(3,true)
    r2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[130] at coalesce at <console>:26
    scala> r2.partitions.size
    res76: Int = 3
    

    Action

    记录一下会立即执行RDD计算的方法。

    (1)reduce(func)。

    scala> val r1=sc.makeRDD(List(1,2,3,4,5),2)
    r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[141] at makeRDD at <console>:24
    # reduce规约,传入2个参数,返回一个参数,返回的参数和下一个参数作为新的2个参数继续执行函数
    scala> r1.reduce((a,b)=>a+b)
    res97: Int = 15
    

    (2)collect()。

    很常用,将RDD分布式存储在不同分区的数据汇总起来,变成一个新的数组返回。这个方法容易产生内存溢出,生产条件慎用。

    (3)count()。

    scala> val r1=sc.makeRDD(List(("bj",1),("sh",1),("bj",2),("sh",2)),2)
    r1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[126] at makeRDD at <console>:24
    # 统计RDD里元素个数
    scala> r1.count
    res78: Long = 4
    

    (4)first()。

    scala> val r1=sc.makeRDD(List(("bj",1),("sh",1),("bj",2),("sh",2)),2)
    r1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[126] at makeRDD at <console>:24
    # 取出RDD里第一个元素,类似take(1)
    scala> r1.first()
    res79: (String, Int) = (bj,1)
    

    (5)take(n)。

    # take(1)类似first()
    scala> r1.take(1)
    res80: Array[(String, Int)] = Array((bj,1))
    

    (6)takeOrdered(n)。

    scala> val r1=sc.makeRDD(List(1,2,4,5,4424,45))
    r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[131] at makeRDD at <console>:24
    # 先将RDD数据进行升序排列,再取前3个
    scala> r1.takeOrdered(3)
    res81: Array[Int] = Array(1, 2, 4)
    

    (7)top(n)。

    scala> r1.collect
    res85: Array[Int] = Array(1, 2, 4, 5, 4424, 45)
    # 先将RDD中数据降序排列,再取前3个
    scala> r1.top(3)
    res84: Array[Int] = Array(4424, 45, 5)
    

    (8)saveAsTextFile(path)。

    每一个元素调用toString方法,保存为一行到文件中。

    # 保存到本地,或者保存到hdfs
    scala> r1.saveAsTextFile("file:///home/result")
    scala> r1.saveAsTextFile("hdfs://hadoop01:9000/result")
    
    # 举hdfs查看为例
    [root@hadoop01 /home]# hadoop fs -ls /result
    20/03/01 11:39:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Found 2 items
    -rw-r--r--   3 root supergroup          0 2020-02-29 20:23 /result/_SUCCESS
    -rw-r--r--   3 root supergroup         16 2020-02-29 20:23 /result/part-00000
    [root@hadoop01 /home]# hadoop fs -cat /result/part-00000
    20/03/01 11:39:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    1
    2
    4
    5
    4424
    45
    

    (9)countByKey()。

    # 数据需是元祖(k,v)的形式
    scala> val r1=sc.makeRDD(List(("bj",1),("sh",2),("bj",3),("sh",4)),2)
    r1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[137] at makeRDD at <console>:24
    # 统计key的数目
    scala> r1.countByKey
    res89: scala.collection.Map[String,Long] = Map(bj -> 2, sh -> 2)
    

    (10)foreach(func)。

    foreach也是action的一种。

    scala> val r1=sc.makeRDD(List(("bj",1),("sh",2),("bj",3),("sh",4)),2)
    r1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[137] at makeRDD at <console>:24
    scala> r1.foreach(println)
    (bj,1)
    (sh,2)
    (bj,3)
    (sh,4)
    

    以上,是RDD下的一些操作,包括Transformation和Action,前者是懒方法,不会真正执行RDD计算,后者才会触发。总体操作类似scala集合中的操作,只是RDD是一种具有容错和分区的数据集。

    参考博文:

    (1)https://www.cnblogs.com/youngchaolin/p/12375032.html

    (2)https://blog.csdn.net/u013063153/article/details/73733660

  • 相关阅读:
    使用docker sail镜像创建laravel项目
    Python使用阿里云镜像
    VMware安装OpenEuler虚拟机并配置图形界面
    机器学习模型评估指标汇总 (一)
    机器学习模型评估指标汇总 (二)
    运行docker命令需要sudo权限的问题
    如何不重装修复损坏的 Ubuntu 系统
    卡尔曼滤波
    升级anaconda
    jupyter notebook 代码自动补全
  • 原文地址:https://www.cnblogs.com/youngchaolin/p/12398819.html
Copyright © 2011-2022 走看看