zoukankan      html  css  js  c++  java
  • Sprak RDD简单应用

    来自:http://my.oschina.net/scipio/blog/284957#OSC_h5_11

    目录[-]

    1、准备文件

    wget http://statweb.stanford.edu/~tibs/ElemStatLearn/datasets/spam.data

    2、加载文件

    scala> val inFile = sc.textFile("/home/scipio/spam.data")

      输出

    14/06/28 12:15:34 INFO MemoryStore: ensureFreeSpace(32880) called with curMem=65736, maxMem=311387750
    14/06/28 12:15:34 INFO MemoryStore: Block broadcast_2 stored as values to memory (estimated size 32.1 KB, free 296.9 MB)
    inFile: org.apache.spark.rdd.RDD[String] = MappedRDD[7] at textFile at <console>:12

    3、显示一行

    scala> inFile.first()

      输出

    14/06/28 12:15:39 INFO FileInputFormat: Total input paths to process : 1
    14/06/28 12:15:39 INFO SparkContext: Starting job: first at <console>:15
    14/06/28 12:15:39 INFO DAGScheduler: Got job 0 (first at <console>:15) with 1 output partitions (allowLocal=true)
    14/06/28 12:15:39 INFO DAGScheduler: Final stage: Stage 0(first at <console>:15)
    14/06/28 12:15:39 INFO DAGScheduler: Parents of final stage: List()
    14/06/28 12:15:39 INFO DAGScheduler: Missing parents: List()
    14/06/28 12:15:39 INFO DAGScheduler: Computing the requested partition locally
    14/06/28 12:15:39 INFO HadoopRDD: Input split: file:/home/scipio/spam.data:0+349170
    14/06/28 12:15:39 INFO SparkContext: Job finished: first at <console>:15, took 0.532360118 s
    res2: String = 0.64 0.64 0.32 0.64 0.32 1.29 1.93 0.96 0.778 3.756 61 278 1

    4、函数运用

     (1)map
    scala> val nums = inFile.map(x=>x.split(' ').map(_.toDouble))
    nums: org.apache.spark.rdd.RDD[Array[Double]] = MappedRDD[8] at map at <console>:14
    
    scala> nums.first()
    14/06/28 12:19:07 INFO SparkContext: Starting job: first at <console>:17
    14/06/28 12:19:07 INFO DAGScheduler: Got job 1 (first at <console>:17) with 1 output partitions (allowLocal=true)
    14/06/28 12:19:07 INFO DAGScheduler: Final stage: Stage 1(first at <console>:17)
    14/06/28 12:19:07 INFO DAGScheduler: Parents of final stage: List()
    14/06/28 12:19:07 INFO DAGScheduler: Missing parents: List()
    14/06/28 12:19:07 INFO DAGScheduler: Computing the requested partition locally
    14/06/28 12:19:07 INFO HadoopRDD: Input split: file:/home/scipio/spam.data:0+349170
    14/06/28 12:19:07 INFO SparkContext: Job finished: first at <console>:17, took 0.011412903 s
    res3: Array[Double] = Array(0.0, 0.64, 0.64, 0.0, 0.32, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.64, 0.0, 0.0, 0.0, 0.32, 0.0, 1.29, 1.93, 0.0, 0.96, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.778, 0.0, 0.0, 3.756, 61.0, 278.0, 1.0)
     (2)collecct
    scala> val rdd = sc.parallelize(List(1,2,3,4,5))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:12
    
    scala> val mapRdd = rdd.map(2*_)
    mapRdd: org.apache.spark.rdd.RDD[Int] = MappedRDD[10] at map at <console>:14
    
    scala> mapRdd.collect
    14/06/28 12:24:45 INFO SparkContext: Job finished: collect at <console>:17, took 1.789249751 s
    res4: Array[Int] = Array(2, 4, 6, 8, 10)
     (3)filter
    scala> val filterRdd = sc.parallelize(List(1,2,3,4,5)).map(_*2).filter(_>5)
    filterRdd: org.apache.spark.rdd.RDD[Int] = FilteredRDD[13] at filter at <console>:12
    
    scala> filterRdd.collect
    14/06/28 12:27:45 INFO SparkContext: Job finished: collect at <console>:15, took 0.056086178 s
    res5: Array[Int] = Array(6, 8, 10)
     (4)flatMap
    scala> val rdd = sc.textFile("/home/scipio/README.md")
    14/06/28 12:31:55 INFO MemoryStore: ensureFreeSpace(32880) called with curMem=98616, maxMem=311387750
    14/06/28 12:31:55 INFO MemoryStore: Block broadcast_3 stored as values to memory (estimated size 32.1 KB, free 296.8 MB)
    rdd: org.apache.spark.rdd.RDD[String] = MappedRDD[15] at textFile at <console>:12
    
    scala> rdd.count
    14/06/28 12:32:50 INFO SparkContext: Job finished: count at <console>:15, took 0.341167662 s
    res6: Long = 127
    
    scala> rdd.cache
    res7: rdd.type = MappedRDD[15] at textFile at <console>:12
    
    scala> rdd.count
    14/06/28 12:33:00 INFO SparkContext: Job finished: count at <console>:15, took 0.32015745 s
    res8: Long = 127
    
    scala> val wordCount = rdd.flatMap(_.split(' ')).map(x=>(x,1)).reduceByKey(_+_)
    wordCount: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[20] at reduceByKey at <console>:14
    
    scala> wordCount.collect
    
    res9: Array[(String, Int)] = Array((means,1), (under,2), (this,4), (Because,1), (Python,2), (agree,1), (cluster.,1), (its,1), (YARN,,3), (have,2), (pre-built,1), (MRv1,,1), (locally.,1), (locally,2), (changed,1), (several,1), (only,1), (sc.parallelize(1,1), (This,2), (basic,1), (first,1), (requests,1), (documentation,1), (Configuration,1), (MapReduce,2), (without,1), (setting,1), ("yarn-client",1), ([params]`.,1), (any,2), (application,1), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (version,3), (file,1), (documentation,,1), (test,1), (MASTER,1), (entry,1), (example,3), (are,2), (systems.,1), (params,1), (scala>,1), (<artifactId>hadoop-client</artifactId>,1), (refer,1), (configure,1), (Interactive,2), (artifact,1), (can,7), (file's,1), (build,3), (when,2), (2.0.X,,1), (Apac...
    
    scala> wordCount.saveAsTextFile("/home/scipio/wordCountResult.txt")
     (5)union
    scala> val rdd = sc.parallelize(List(('a',1),('a',2)))
    rdd: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:12
    
    scala> val rdd2 = sc.parallelize(List(('b',1),('b',2)))
    rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[11] at parallelize at <console>:12
    
    scala> rdd union rdd2
    res3: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[12] at union at <console>:17
    
    scala> res3.collect
    
    res4: Array[(Char, Int)] = Array((a,1), (a,2), (b,1), (b,2))
     (6) join
    scala> val rdd1 = sc.parallelize(List(('a',1),('a',2),('b',3),('b',4)))
    rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:12
    
    scala> val rdd2 = sc.parallelize(List(('a',5),('a',6),('b',7),('b',8)))
    rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[11] at parallelize at <console>:12
    
    scala> rdd1 join rdd2
    res1: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = FlatMappedValuesRDD[14] at join at <console>:17
    
    res1.collect
    
    res2: Array[(Char, (Int, Int))] = Array((b,(3,7)), (b,(3,8)), (b,(4,7)), (b,(4,8)), (a,(1,5)), (a,(1,6)), (a,(2,5)), (a,(2,6)))
     (7)lookup
    val rdd1 = sc.parallelize(List(('a',1),('a',2),('b',3),('b',4)))
    rdd1.lookup('a')
    res3: Seq[Int] = WrappedArray(1, 2)
     (8)groupByKey
    val wc = sc.textFile("/home/scipio/README.md").flatMap(_.split(' ')).map((_,1)).groupByKey
    wc.collect
    
    14/06/28 12:56:14 INFO SparkContext: Job finished: collect at <console>:15, took 2.933392093 s
    res0: Array[(String, Iterable[Int])] = Array((means,ArrayBuffer(1)), (under,ArrayBuffer(1, 1)), (this,ArrayBuffer(1, 1, 1, 1)), (Because,ArrayBuffer(1)), (Python,ArrayBuffer(1, 1)), (agree,ArrayBuffer(1)), (cluster.,ArrayBuffer(1)), (its,ArrayBuffer(1)), (YARN,,ArrayBuffer(1, 1, 1)), (have,ArrayBuffer(1, 1)), (pre-built,ArrayBuffer(1)), (MRv1,,ArrayBuffer(1)), (locally.,ArrayBuffer(1)), (locally,ArrayBuffer(1, 1)), (changed,ArrayBuffer(1)), (sc.parallelize(1,ArrayBuffer(1)), (only,ArrayBuffer(1)), (several,ArrayBuffer(1)), (This,ArrayBuffer(1, 1)), (basic,ArrayBuffer(1)), (first,ArrayBuffer(1)), (documentation,ArrayBuffer(1)), (Configuration,ArrayBuffer(1)), (MapReduce,ArrayBuffer(1, 1)), (requests,ArrayBuffer(1)), (without,ArrayBuffer(1)), ("yarn-client",ArrayBuffer(1)), ([params]`.,Ar...
     (9)sortByKey
    val rdd = sc.textFile("/home/scipio/README.md")
    val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)
    val wcsort = wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1))
    wcsort.saveAsTextFile("/home/scipio/sort.txt")

     升序的话,sortByKey(true)

  • 相关阅读:
    IDEA热部署插件Jrebel
    Navicat Premium15安装及破解教程
    IDEA中查看类的关系图
    PV、UV、IP名词解释
    Promise由浅入深
    URLSearchParams
    二进制流学习-Blob、ArrayBuffer、File、FileReader和FormData的区别
    前端vue以数据流方式导出word----借助 jquery
    js中 == 、=== 和 Object.is() 的区别
    后端传的是二进制流,前端应该如何通过blob处理二进制文件流格式流,并实现前端下载文件流格式
  • 原文地址:https://www.cnblogs.com/fkissx/p/5484720.html
Copyright © 2011-2022 走看看