zoukankan      html  css  js  c++  java
  • spark下统计单词频次

    写了一个简单的语句,还没有优化:

    scala> sc.
         | textFile("/etc/profile").
         | flatMap((s:String)=>s.split("\s")).
         | map(_.toUpperCase).
         | map((s:String)=>(s, 1)).
         | filter((pair)=>pair._1.forall((ch)=>ch>'A'&&ch<'Z')).
         | reduceByKey(_+_).
         | sortByKey().
         | foreach(println)

    注意这代码还可以优化:

    scala> sc.
         | textFile("/etc/profile").
         | flatMap(_.split("\s")).
         | map(_.toUpperCase).
         | map((_, 1)).
         | filter(_._1.forall((ch)=>ch>'A'&&ch<'Z')).
         | reduceByKey(_+_).
         | sortByKey().
         | foreach(println)

    输出结果如下:

    15/03/06 08:50:44 INFO MemoryStore: ensureFreeSpace(75904) called with curMem=259812, maxMem=277842493
    15/03/06 08:50:44 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 74.1 KB, free 264.7 MB)
    15/03/06 08:50:44 INFO FileInputFormat: Total input paths to process : 1
    15/03/06 08:50:44 INFO SparkContext: Starting job: sortByKey at <console>:20
    15/03/06 08:50:44 INFO DAGScheduler: Registering RDD 25 (filter at <console>:18)
    15/03/06 08:50:44 INFO DAGScheduler: Got job 4 (sortByKey at <console>:20) with 2 output partitions (allowLocal=false)
    15/03/06 08:50:44 INFO DAGScheduler: Final stage: Stage 10(sortByKey at <console>:20)
    15/03/06 08:50:44 INFO DAGScheduler: Parents of final stage: List(Stage 11)
    15/03/06 08:50:44 INFO DAGScheduler: Missing parents: List(Stage 11)
    15/03/06 08:50:44 INFO DAGScheduler: Submitting Stage 11 (FilteredRDD[25] at filter at <console>:18), which has no missing parents
    15/03/06 08:50:44 INFO MemoryStore: ensureFreeSpace(3736) called with curMem=335716, maxMem=277842493
    15/03/06 08:50:44 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 3.6 KB, free 264.6 MB)
    15/03/06 08:50:44 INFO DAGScheduler: Submitting 2 missing tasks from Stage 11 (FilteredRDD[25] at filter at <console>:18)
    15/03/06 08:50:44 INFO TaskSchedulerImpl: Adding task set 11.0 with 2 tasks
    15/03/06 08:50:44 INFO TaskSetManager: Starting task 0.0 in stage 11.0 (TID 16, localhost, PROCESS_LOCAL, 1162 bytes)
    15/03/06 08:50:44 INFO TaskSetManager: Starting task 1.0 in stage 11.0 (TID 17, localhost, PROCESS_LOCAL, 1162 bytes)
    15/03/06 08:50:44 INFO Executor: Running task 1.0 in stage 11.0 (TID 17)
    15/03/06 08:50:44 INFO Executor: Running task 0.0 in stage 11.0 (TID 16)
    15/03/06 08:50:44 INFO HadoopRDD: Input split: file:/etc/profile:1189+1189
    15/03/06 08:50:44 INFO HadoopRDD: Input split: file:/etc/profile:0+1189
    15/03/06 08:50:44 INFO Executor: Finished task 1.0 in stage 11.0 (TID 17). 1863 bytes result sent to driver
    15/03/06 08:50:44 INFO TaskSetManager: Finished task 1.0 in stage 11.0 (TID 17) in 43 ms on localhost (1/2)
    15/03/06 08:50:44 INFO Executor: Finished task 0.0 in stage 11.0 (TID 16). 1863 bytes result sent to driver
    15/03/06 08:50:44 INFO TaskSetManager: Finished task 0.0 in stage 11.0 (TID 16) in 51 ms on localhost (2/2)
    15/03/06 08:50:44 INFO DAGScheduler: Stage 11 (filter at <console>:18) finished in 0.054 s
    15/03/06 08:50:44 INFO DAGScheduler: looking for newly runnable stages
    15/03/06 08:50:44 INFO DAGScheduler: running: Set()
    15/03/06 08:50:44 INFO DAGScheduler: waiting: Set(Stage 10)
    15/03/06 08:50:44 INFO DAGScheduler: failed: Set()
    15/03/06 08:50:44 INFO TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool 
    15/03/06 08:50:44 INFO DAGScheduler: Missing parents for Stage 10: List()
    15/03/06 08:50:44 INFO DAGScheduler: Submitting Stage 10 (MapPartitionsRDD[28] at sortByKey at <console>:20), which is now runnable
    15/03/06 08:50:44 INFO MemoryStore: ensureFreeSpace(2856) called with curMem=339452, maxMem=277842493
    15/03/06 08:50:44 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 2.8 KB, free 264.6 MB)
    15/03/06 08:50:44 INFO DAGScheduler: Submitting 2 missing tasks from Stage 10 (MapPartitionsRDD[28] at sortByKey at <console>:20)
    15/03/06 08:50:44 INFO TaskSchedulerImpl: Adding task set 10.0 with 2 tasks
    15/03/06 08:50:44 INFO TaskSetManager: Starting task 0.0 in stage 10.0 (TID 18, localhost, PROCESS_LOCAL, 948 bytes)
    15/03/06 08:50:44 INFO TaskSetManager: Starting task 1.0 in stage 10.0 (TID 19, localhost, PROCESS_LOCAL, 948 bytes)
    15/03/06 08:50:44 INFO Executor: Running task 0.0 in stage 10.0 (TID 18)
    15/03/06 08:50:44 INFO Executor: Running task 1.0 in stage 10.0 (TID 19)
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms
    15/03/06 08:50:44 INFO Executor: Finished task 0.0 in stage 10.0 (TID 18). 1165 bytes result sent to driver
    15/03/06 08:50:44 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID 18) in 18 ms on localhost (1/2)
    15/03/06 08:50:44 INFO Executor: Finished task 1.0 in stage 10.0 (TID 19). 1293 bytes result sent to driver
    15/03/06 08:50:44 INFO TaskSetManager: Finished task 1.0 in stage 10.0 (TID 19) in 28 ms on localhost (2/2)
    15/03/06 08:50:44 INFO DAGScheduler: Stage 10 (sortByKey at <console>:20) finished in 0.031 s
    15/03/06 08:50:44 INFO TaskSchedulerImpl: Removed TaskSet 10.0, whose tasks have all completed, from pool 
    15/03/06 08:50:44 INFO SparkContext: Job finished: sortByKey at <console>:20, took 0.107864348 s
    15/03/06 08:50:44 INFO SparkContext: Starting job: foreach at <console>:21
    15/03/06 08:50:44 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 4 is 144 bytes
    15/03/06 08:50:44 INFO DAGScheduler: Registering RDD 26 (reduceByKey at <console>:19)
    15/03/06 08:50:44 INFO DAGScheduler: Got job 5 (foreach at <console>:21) with 2 output partitions (allowLocal=false)
    15/03/06 08:50:44 INFO DAGScheduler: Final stage: Stage 12(foreach at <console>:21)
    15/03/06 08:50:44 INFO DAGScheduler: Parents of final stage: List(Stage 14)
    15/03/06 08:50:44 INFO DAGScheduler: Missing parents: List(Stage 14)
    15/03/06 08:50:44 INFO DAGScheduler: Submitting Stage 14 (ShuffledRDD[26] at reduceByKey at <console>:19), which has no missing parents
    15/03/06 08:50:44 INFO MemoryStore: ensureFreeSpace(2472) called with curMem=342308, maxMem=277842493
    15/03/06 08:50:44 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 2.4 KB, free 264.6 MB)
    15/03/06 08:50:44 INFO DAGScheduler: Submitting 2 missing tasks from Stage 14 (ShuffledRDD[26] at reduceByKey at <console>:19)
    15/03/06 08:50:44 INFO TaskSchedulerImpl: Adding task set 14.0 with 2 tasks
    15/03/06 08:50:44 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 20, localhost, PROCESS_LOCAL, 937 bytes)
    15/03/06 08:50:44 INFO TaskSetManager: Starting task 1.0 in stage 14.0 (TID 21, localhost, PROCESS_LOCAL, 937 bytes)
    15/03/06 08:50:44 INFO Executor: Running task 1.0 in stage 14.0 (TID 21)
    15/03/06 08:50:44 INFO Executor: Running task 0.0 in stage 14.0 (TID 20)
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
    15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms
    15/03/06 08:50:44 INFO Executor: Finished task 1.0 in stage 14.0 (TID 21). 996 bytes result sent to driver
    15/03/06 08:50:44 INFO TaskSetManager: Finished task 1.0 in stage 14.0 (TID 21) in 14 ms on localhost (1/2)
    15/03/06 08:50:44 INFO Executor: Finished task 0.0 in stage 14.0 (TID 20). 996 bytes result sent to driver
    15/03/06 08:50:44 INFO TaskSetManager: Finished task 0.0 in stage 14.0 (TID 20) in 21 ms on localhost (2/2)
    15/03/06 08:50:44 INFO TaskSchedulerImpl: Removed TaskSet 14.0, whose tasks have all completed, from pool 
    15/03/06 08:50:44 INFO DAGScheduler: Stage 14 (reduceByKey at <console>:19) finished in 0.022 s
    15/03/06 08:50:44 INFO DAGScheduler: looking for newly runnable stages
    15/03/06 08:50:44 INFO DAGScheduler: running: Set()
    15/03/06 08:50:44 INFO DAGScheduler: waiting: Set(Stage 12)
    15/03/06 08:50:44 INFO DAGScheduler: failed: Set()
    15/03/06 08:50:44 INFO DAGScheduler: Missing parents for Stage 12: List()
    15/03/06 08:50:44 INFO DAGScheduler: Submitting Stage 12 (ShuffledRDD[29] at sortByKey at <console>:20), which is now runnable
    15/03/06 08:50:44 INFO MemoryStore: ensureFreeSpace(2304) called with curMem=344780, maxMem=277842493
    15/03/06 08:50:44 INFO MemoryStore: Block broadcast_14 stored as values in memory (estimated size 2.3 KB, free 264.6 MB)
    15/03/06 08:50:44 INFO DAGScheduler: Submitting 2 missing tasks from Stage 12 (ShuffledRDD[29] at sortByKey at <console>:20)
    15/03/06 08:50:44 INFO TaskSchedulerImpl: Adding task set 12.0 with 2 tasks
    15/03/06 08:50:44 INFO TaskSetManager: Starting task 0.0 in stage 12.0 (TID 22, localhost, PROCESS_LOCAL, 948 bytes)
    15/03/06 08:50:44 INFO TaskSetManager: Starting task 1.0 in stage 12.0 (TID 23, localhost, PROCESS_LOCAL, 948 bytes)
    15/03/06 08:50:45 INFO Executor: Running task 1.0 in stage 12.0 (TID 23)
    15/03/06 08:50:45 INFO Executor: Running task 0.0 in stage 12.0 (TID 22)
    15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
    15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
    15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms
    15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
    15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
    15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms
    (LOGIN,2)
    (MERGING,1)
    (MUCH,1)
    (NEED,1)
    (NOT,1)
    (PREVENT,1)
    (RESERVED,1)
    (SCRIPT,1)
    (SETS,1)
    (SETUP,1)
    (SHELL,2)
    (SYSTEM,2)
    (THE,1)
    (THEN,8)
    (THIS,3)
    (THRESHOLD,1)
    (TO,5)
    (UIDGID,1)
    (UNLESS,1)
    (UNSET,2)
    (USER,1)
    (WE,1)
    (WIDE,1)
    (WILL,1)
    (YOU,3)
    (YOUR,1)
    15/03/06 08:50:45 INFO Executor: Finished task 1.0 in stage 12.0 (TID 23). 826 bytes result sent to driver
    15/03/06 08:50:45 INFO TaskSetManager: Finished task 1.0 in stage 12.0 (TID 23) in 13 ms on localhost (1/2)
    (,260)
    (BETTER,1)
    (BY,1)
    (CHECK,1)
    (COULD,1)
    (CURRENT,1)
    (CUSTOM,1)
    (DO,1)
    (DONE,1)
    (ELSE,5)
    (ENVIRONMENT,1)
    (EXPORT,15)
    (FI,8)
    (FILE,2)
    (FOR,5)
    (FUNCTIONS,1)
    (FUTURE,1)
    (GET,1)
    (GO,1)
    (GOOD,1)
    (HISTCONTROL,1)
    (I,2)
    (IF,8)
    (IN,6)
    (IS,1)
    (IT,1)
    (KNOW,1)
    (KSH,1)
    15/03/06 08:50:45 INFO Executor: Finished task 0.0 in stage 12.0 (TID 22). 826 bytes result sent to driver
    15/03/06 08:50:45 INFO TaskSetManager: Finished task 0.0 in stage 12.0 (TID 22) in 27 ms on localhost (2/2)
    15/03/06 08:50:45 INFO TaskSchedulerImpl: Removed TaskSet 12.0, whose tasks have all completed, from pool 
    15/03/06 08:50:45 INFO DAGScheduler: Stage 12 (foreach at <console>:21) finished in 0.025 s
    15/03/06 08:50:45 INFO SparkContext: Job finished: foreach at <console>:21, took 0.07397057 s

    通过如下代码,可以输出参与计算的节点名称,注意start-all并指定shell的–master参数:

    spark-shell --master spark://bluejoe0:7077

    代码如下:

    rdd.mapPartitions(_=>Array[String](("hostname" !!).trim).iterator, false).collect
    
    res28: Array[String] = Array(bluejoe4, bluejoe5)
  • 相关阅读:
    SoapUI开源版简单定制报告1
    python json模块 字典 输出中文
    Django文件上传机制用法详解(转)
    选择排序
    插入排序
    Python单元测试框架 unittest
    Design Pattern —— Prototype /Template Method/Iterator/Composite/Bridge
    Design Pattern ——Factory Method&Abstract Factory
    Design Pattern ——Builder
    JAVA泛型那些事儿
  • 原文地址:https://www.cnblogs.com/bluejoe/p/5115857.html
Copyright © 2011-2022 走看看