zoukankan      html  css  js  c++  java
  • spark java wordCount实例

    1. 算子

    package com.test;
    
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import scala.Tuple2;
    
    
    public class Test {
        private static final int Function2 = 0;
    
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf()
                    .setAppName("Test")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            JavaRDD linesRdd = sc.textFile("/home/test/a.txt");
            /**
             * FlatMapFunction 中的2个String   分别代表输入参数类型和输出参数类型
             */
            JavaRDD<String> wordsRDD = linesRdd.flatMap(new FlatMapFunction<String, String>(){
                private static final long serialVersionUID = 1L;
                
                @Override
                public Iterable<String> call(String line) throws Exception {
                    /**
                     * 参数 line 就代表 linesRDD中的每一条记录
                     */
                    List<String> list = Arrays.asList(line.split(" "));
                    return list;
                }
            });
            
            /**
             * 要将每一个单词计数为1
             * wordsRDD 是一个非 K V 格式的Rdd,
             * 在java api 中要返回一个K V 格式的rdd, 必须使用 mapToPair 方法
             * return 结果就是一个 K V 格式
             */
            JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    return new Tuple2<String, Integer>(word, 1);
                }
            });
            
            /**
             * 使用reduceByKey 进行聚合操作
             * 1. 进行 groupByKey 将相同的 key 分割到一个组里去, 然后通过传入的函数对主内的数据进行聚合
             * call 方法将会自动将个数循环相加
             */
            JavaPairRDD<String, Integer> resultRdd = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
                
                private static final long serialVersionUID = 1L;
                
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1+v2;
                }
            });
            
            /**
             * 按照单词出现的次数进行排序,   应为排序需要对 v 值(出现个数)进行排序, 所以需要将 K V, 进行调换,  因为sortByKey只对key能进行排序
             * 先使用 mapToPair 来调换位置
             * sortByKey 进行排序
             * 再使用 mapToPair 来调换位置
             */
            resultRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {
                
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
                    
                    return new Tuple2<Integer, String>(tuple._2,tuple._1);
                }
            }).sortByKey().mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception {
                    return new Tuple2<String, Integer>(tuple._2, tuple._1);
                }
            }).foreach(new VoidFunction<Tuple2<String,Integer>>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Tuple2<String, Integer> tuple) throws Exception {
                    System.out.println(tuple);
                }
            });
        }
    }

    jar包   链接:https://pan.baidu.com/s/1UDp81G8tY7IgwJatlT_1Vg 密码:yj06

  • 相关阅读:
    jquery插件课程2 放大镜、多文件上传和在线编辑器插件如何使用
    php课程 5-19 php数据结构函数和常用函数有哪些
    如何解决计算机显示文字乱码
    NSURLConnection使用
    UOJ #5. 【NOI2014】动物园 扩大KMP
    [ACM] n划分数m部分,它要求每一个部分,并采取了最大的产品(间隔DP)
    基于低压电力采集平台DW710C的基础开发
    eclipse 对齐行号在括号中显示和字体调整
    蜗牛—苍茫IT文章大学的路(十)
    国产与第三方库FFmpeg SDK
  • 原文地址:https://www.cnblogs.com/redhat0019/p/9210436.html
Copyright © 2011-2022 走看看