zoukankan      html  css  js  c++  java
  • 在Spark上运行TopK程序

    1. scala程序如下

    package com.cn.gao
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    
    /**
     * @author hadoop
     * 对文本进行词频统计,并返回出现频率最高的K个词
     *
     */
    
    object topK {
      def main(args: Array[String]) {
        if(args.length < 1) {
          System.err.println("Usage: <file>")
          System.exit(1)
        }
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        //SparkContext 是把代码提交到集群或者本地的通道,我们编写Spark代码,无论是要本地运行还是集群运行都必须有SparkContext的实例
        val line = sc.textFile(args(0))
        //把读取的内容保存给line变量,其实line是一个MappedRDD,Spark的所有操作都是基于RDD的
        //其中的\s表示 空格,回车,换行等空白符,+号表示一个或多个的意思
        val result = line.flatMap(_.split("\s+")).map((_, 1)).reduceByKey(_+_)
        val sorted = result.map{case(key,value) => (value,key)}.sortByKey(true,1)
        val topk = sorted.top(args(1).toInt)
        topk.foreach(println)
        sc.stop
      }
    }

    正则表达式,
    \d表示 0-9 的数字,
    \s表示 空格,回车,换行等空白符,
    \w表示单词字符(数字字母下划线)
    +号表示一个或多个的意思

    2. 将上述程序打包成TopK.jar

    打包可以在eclipse中实现。

    3. 执行脚本文件如下

    topK.sh

    #!/bin/bash
    
    cd $SPARK_HOME/bin
    spark-submit 
    --master spark://192.168.1.154:7077 
    --class com.cn.gao.topK 
    --name topK 
    --executor-memory 400M 
    --driver-memory 512M 
    /usr/local/myjar/TopK.jar 
    hdfs://192.168.1.154:9000/user/hadoop/README.md 5

    最后一行是参数,上述统计词频出现在前5的单词。

    4. 启动Spark集群

    将要统计的文件README.md上传到HDFS中指定的目录,运行脚本文件即可。

  • 相关阅读:
    CppUnit
    vconfig in linux
    POSIX semaphore: sem_open, sem_close, sem_post, sem_wait
    (Windows Command) diskpart
    亮块检测及取出亮块的中心坐标
    CacheMetaData Method of Activity
    (C#) Encoding.
    (C#) 线程操作 限制可同时访问某一资源或资源池的线程数。Semaphore 类。Mutex类
    (C# 基础) 跳转 (break, continue, goto, return, throw ).
    (C#基础) 字符串数据和操作
  • 原文地址:https://www.cnblogs.com/gaopeng527/p/4371022.html
Copyright © 2011-2022 走看看