zoukankan      html  css  js  c++  java
  • Spark-Spark setMaster & WordCount Demo

     Spark setMaster源码

    /**
       * The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
       * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
       */
      def setMaster(master: String): SparkConf = {
        set("spark.master", master)
      }

    要连接到的主URL,例如“local”用一个线程在本地运行,“local [ 4 ]”用4个内核在本地运行,或者“Spark : / / master : 7077”用Spark独立集群运行。

    package cn.rzlee.spark.scala
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    // object相当于静态的
    object ScalaWordCount {
      def main(args: Array[String]): Unit = {
    
        //创建spark配置,设置应用程序名字
        val conf = new SparkConf().setAppName("wordCountApp")
    
        // 创建spark执行入口
        val sc = new SparkContext()
    
        // 指定以后从哪里读取数据创建RDD(弹性分布式数据集)
        val lines: RDD[String] = sc.textFile("")
        // 切分压平
        val words: RDD[String] = lines.flatMap(_.split(" "))
        // 将单词和一组合
        val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
        // 按key进行聚合  相同key不变,将value相加
        val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
        // 排序
        val sorted = reduced.sortBy(_._2,false)
        // 将结果保存到HDFS中
        sorted.saveAsTextFile("")
        //释放资源
        sc.stop()
      }
    }

    基于排序机制的wordCount

    java 版本:

    package cn.rzlee.spark.core;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;
    import scala.actors.threadpool.Arrays;
    
    /**
     * @Author ^_^
     * @Create 2018/11/3
     */
    public class SortWordCount {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            // 创建line RDD
            JavaRDD<String> lines = sc.textFile("C:\Users\txdyl\Desktop\log\in\data.txt", 1);
    
            // 执行单词计数
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterable<String> call(String s) throws Exception {
                    return Arrays.asList(s.split("	"));
                }
            });
    
    
            JavaPairRDD<String, Integer> pair = words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<>(s, 1);
                }
            });
    
            JavaPairRDD<String, Integer> wordCounts = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            // 进行key-value的反转映射
            JavaPairRDD<Integer, String> countWords = wordCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                    return new Tuple2<>(t._2, t._1);
                }
            });
    
            // 按照key进行排序
            JavaPairRDD<Integer, String> sortedCountWords = countWords.sortByKey(false);
    
            // 再次进行key-value的反转映射
            JavaPairRDD<String, Integer> sortedWordCounts = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                    return new Tuple2<>(t._2, t._1);
                }
            });
    
    
            // 打印结果
            sortedWordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                @Override
                public void call(Tuple2<String, Integer> t) throws Exception {
                    System.out.println(t._1 + " appears " + t._2+ " times.");
                }
            });
            // 关闭JavaSparkContext
            sc.close();
        }
    }

    scala版本:

    package cn.rzlee.spark.scala
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SortWordCount {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local")
        val sc = new SparkContext(conf)
        
        
        val lines = sc.textFile("C:\Users\txdyl\Desktop\log\in\data.txt",1)
        val words: RDD[String] = lines.flatMap(line=>line.split("	"))
        val pairs: RDD[(String, Int)] = words.map(word=>(word,1))
        val wordCounts: RDD[(String, Int)] = pairs.reduceByKey(_+_)
        val countWords: RDD[(Int, String)] = wordCounts.map(wordCount=>(wordCount._2, wordCount._1))
        val sortedCountWords = countWords.sortByKey(false)
        val sortedWordCounts: RDD[(String, Int)] = sortedCountWords.map(sortedCountWord=>(sortedCountWord._2, sortedCountWord._1))
        sortedWordCounts.foreach(sortedWordCount=>{
          println(sortedWordCount._1+" appear "+ sortedWordCount._2 + " times.")
        })
    
        sc.stop()
      }
    
    }
  • 相关阅读:
    Cesium绘制一个旋转发光的四棱锥
    使用Nginx实现反向代理转载
    Nginx 代理本地文件夹
    WebGIS工程师的技能包
    nginx启动报错(1113: No mapping for the Unicode character exists in the target multibyte code page)
    Cesium动态扩散圆
    Billboard高度模式
    Nginx 配置问题 server directive is not allowed here in /etc/nginx/nginx.conf:69
    Nginx 安装(打开、关闭、重启)
    Cesium Primitive也可以设置贴地,通过外观设置
  • 原文地址:https://www.cnblogs.com/RzCong/p/9563509.html
Copyright © 2011-2022 走看看