zoukankan      html  css  js  c++  java
  • spark pair RDD

    1.含有“is”的行数
    val lines = sc.textFile("file:///usr/spark/spark-1.0.0-bin-hadoop2/README.md")
    lines.count
    val isrdd = lines.filter(line => line.contains("is"))
    isrdd.count

    2.wordcount sort
    val input = sc.textFile("file:///usr/spark/spark-1.0.0-bin-hadoop2/README.md")
    input.take(10).foreach(println)

    val words = input.flatMap(line => line.split(" "))
    val count = words.map(word => (word,1)).reduceByKey(_+_).map(x=>(x._2, x._1)).sortByKey().map(x=>(x._2, x._1))
    count.collect().foreach(println)

    val input = sc.textFile("file:///usr/spark/spark-1.0.0-bin-hadoop2/README.md")
    val count = input.flatMap(x => x.split(" ")).countByValue()

    3.
    val input = sc.parallelize(List((1,2),(3,4),(3,6)))
    val r1 = input.groupByKey().collect()
    val r1 = input.reduceByKey(_+_).collect()
    val r1 = input.reduceByKey(_*_).collect()
    val r1 = input.reduceByKey((x,y)=>(x+y)).collect()
    val r1 = input.reduceByKey((x,y)=>(x-y)).collect()
    val r1 = input.reduceByKey((x,y)=>(x max y)).collect()
    input.keys.collect
    input.values.collect
    input.mapValues(x => x+1).collect

    input.filter{case (key,value) => key > 2}.collect
    input.filter{case (key,value) => value > 3}.collect

    collect函数可以用来获取整个RDD中的数据,如果你的程序是在处理小规模的数据,并且在本地处理,可以使用collecte。如果数据单台机器放不下,不能使用。采用saveAsTextFile()放到HDFS。

    当调用一个新的Action时,整个RDD都会从头开始计算,要避免这种低效行为,可以把中间结果持久化。

  • 相关阅读:
    Django Ajax
    Django模板层
    Django session源码剖析
    CBV源码剖析
    Django视图层
    Django版本区别
    Django路由层
    Django高级
    Django ORM
    Django入门
  • 原文地址:https://www.cnblogs.com/catcoding/p/5143403.html
Copyright © 2011-2022 走看看