zoukankan      html  css  js  c++  java
  • 各种版本 WordCount

    1 hadoop的MapReduce版

    ①mapper类

    //KEYIN 输入数据的key
    //VALUEIN 输入数据的value
    //KEYOUT 输出数据的key
    //VALUEOUT 输出数据的value
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        private Text k = new Text();
        private IntWritable v = new IntWritable(1);
    
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //1.一行单词
            String[] words = value.toString().split(" ");
            //2.循环写出
            for (String word : words) {
                k.set(word);
                context.write(k, v);
            }
        }
    }

    ②reducer类

    //KEYIN VALUEIN map阶段输出的key和value
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        private IntWritable v = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //1.累加求和 
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            
            //2.写出
            v.set(sum);
            context.write(key, v);
        }
    }

    ③driver类

    public class WordCountDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            //1.获取Job对象
            Job job = Job.getInstance(conf );
            
            //2.设置Jar存储位置
            job.setJarByClass(WordCountDriver.class);
            
            //3.关联Mapper和Reducer类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
            
            //4.设置map阶段输出数据的key和value类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            //5.设置最终数据输出的key和value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            //6.设置输入路径和输出路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            //7.提交Job
            boolean result = job.waitForCompletion(true);
            
            System.exit(result ? 0 : 1);
        }
    
    }

    运行时须指定输入输出路径

    2 Java8 Stream版

    public static void main(String[] args) throws ParseException {
        ArrayList<String> list = new ArrayList<>();
        list.add("scala hadoop linux scala");
        list.add("word hello world hadoop spark");
        list.add("hello world spark scala");
    
        Map<String, Long> result = list.stream()
                .flatMap(str -> Arrays.stream(str.split(" ")))
                .collect(Collectors.groupingBy(
                        Function.identity() //按照单词自身分组
                        , Collectors.counting()) //分组后直接count
                );
    
        System.out.println(result); //{world=2, spark=2, scala=3, linux=1, hello=2, hadoop=2, word=1}
    
    }

    3 scala原始版

    def main(args: Array[String]): Unit = {
      //word count
      val lines = List[String]("scala hadoop linux scala","word hello world hadoop spark","hello world spark scala")
    
      //空格切割扁平化
      val res1: immutable.Seq[String] = lines.flatMap(_.split(" "))
      println(res1) //List(scala, hadoop, linux, scala, word, hello, world, hadoop, spark, hello, world, spark, scala)
    
      //映射成元组
      val res2: immutable.Seq[(String, Int)] = res1.map((_, 1))
      println(res2) //List((scala,1), (hadoop,1), (linux,1), (scala,1), (word,1), (hello,1), (world,1), (hadoop,1), (spark,1), (hello,1), (world,1), (spark,1), (scala,1))
    
      //将上一步的元组分组
      val res3: Map[String, immutable.Seq[(String, Int)]] = res2.groupBy(_._1)
      println(res3) //Map(world -> List((world,1), (world,1)), hadoop -> List((hadoop,1), (hadoop,1)), spark -> List((spark,1), (spark,1)), scala -> List((scala,1), (scala,1), (scala,1)), linux -> List((linux,1)), hello -> List((hello,1), (hello,1)), word -> List((word,1)))
    
      //value的长度就是单词的个数
      val res4: Map[String, Int] = res3.map(x => (x._1, x._2.size))
      println(res4) //Map(world -> 2, hadoop -> 2, spark -> 2, scala -> 3, linux -> 1, hello -> 2, word -> 1)
    
      //倒序展示
      val res = res4.toList.sortBy(_._2).reverse
      println(res) //List((scala,3), (hello,2), (spark,2), (hadoop,2), (world,2), (word,1), (linux,1))
    
    }
    

      

    4 spark版

    单词文件

    Hello Scala
    Hello Spark
    Hello BigData
    Hadoop Yarn Scala Java

    代码

    def main(args: Array[String]): Unit = {
    
      val conf = new SparkConf().setMaster("local[*]").setAppName("word count")
      val sc = new SparkContext(conf)
    
      //从文件生成RDD
      val rdd = sc.textFile("input/word.txt")
    
      //切割扁平化
      val flatMapRDD: RDD[String] = rdd.flatMap(_.split(" "))
    
      //映射成元组
      val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1))
    
      //按照key进行聚合
      val reduceByKeyRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
    
      //倒序展示
      reduceByKeyRDD.sortBy(_._2,false).collect().foreach(println)
      //(Hello,3)
      //(Scala,2)
      //(Yarn,1)
      //(Java,1)
      //(BigData,1)
      //(Spark,1)
      //(Hadoop,1)
    
    }
    

      

  • 相关阅读:
    C#图片存到资源文件,以及调用图片
    error C2039: “create”: 不是“cocos2d::GLView”的成员,出错解决办法
    error LNK2038: 检测到“_MSC_VER”的不匹配项: 值“1600”不匹配值“1800”
    vs2013激活码
    AppDelegate.h文件
    AppDelegate.cpp文件详解
    【最终版】Cocos2d-x&JS v3.7 发布!
    cocos2d-x 3.0 示例代码分析3:BaseTest
    vue li click
    me
  • 原文地址:https://www.cnblogs.com/noyouth/p/13031407.html
Copyright © 2011-2022 走看看