zoukankan      html  css  js  c++  java
  • java spark 计算各个省份广告点击数的top3

    数据格式:

    时间戳 省份 城市 用户 广告

    1589677806 河南 洛阳 user1 ad1
    1589677807 河南 郑州 user1 ad1
    1589677808 河南 洛阳 user2 ad1
    1589677809 河南 洛阳 user3 ad2
    1589677811 河南 郑州 user1 ad2
    1589677813 河南 偃师 user1 ad2
    1589677815 浙江 杭州 user1 ad1
    1589677818 浙江 杭州 user2 ad1
    1589677806 河南 郑州 user2 ad1

    需求:

    计算各个省份的广告点击排序top3

    代码实现:

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.sql.sources.In;
    import scala.Tuple2;
    
    import java.util.*;
    
    /**
     * # _*_ coding:utf-8 _*_
     * # Author:xiaoshubiao
     * # Time : 2020/5/17 9:13
     **/
    
    public class anli_test {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("test");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> stringJavaRDD = sc.textFile("D:\tmp\rizhi.txt");
            JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = stringJavaRDD.
                    //转换成键值对的形式 如(省份-广告,点击数)
                    mapToPair(
                            new PairFunction<String, String, Integer>() {
                                @Override
                                public Tuple2<String, Integer> call(String s) throws Exception {
                                    String[] s1 = s.split(" ");
                                    return new Tuple2<String, Integer>(s1[1] + "-" + s1[4], 1);
                                }
                            }
                    )
                    // 计算相同key的点击数和:(省份-广告,点击数和)
                    .reduceByKey(new Function2<Integer, Integer, Integer>() {
                        @Override
                        public Integer call(Integer integer, Integer integer2) throws Exception {
                            return integer + integer2;
                        }
                    });
    
            stringIntegerJavaPairRDD.
                    //转换key的结构 (省份-广告,点击数和)=》(省份,(广告,点击数和))
                    mapToPair(
                    new PairFunction<Tuple2<String, Integer>, String, Tuple2>() {
                        @Override
                        public Tuple2<String, Tuple2> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                            String[] s = stringIntegerTuple2._1().split("-");
                            Tuple2<String, Integer> stringIntegerTuple21 = new Tuple2<>(s[1], stringIntegerTuple2._2());
    
                            return new Tuple2<>(s[0],stringIntegerTuple21);
                        }
                    }
            )
                    // 按照k聚合
                    .groupByKey()
                    //对值进行排序
                    .mapValues(
                    new Function<Iterable<Tuple2>, Iterable>() {
                        @Override
                        public Iterable call(Iterable<Tuple2> tuple2s) throws Exception {
                            ArrayList<Tuple2<String, Integer>> tuple2s1 = new ArrayList<>();
                            Iterator<Tuple2> iterator = tuple2s.iterator();
                            while (iterator.hasNext()){
                                Tuple2 next = iterator.next();
                                tuple2s1.add(next);
                            }
                            tuple2s1.sort(
                                    new Comparator<Tuple2<String, Integer>>() {
                                        @Override
                                        public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
    
                                            return o2._2() - o1._2();
                                        }
                                    }
                            );
                            ArrayList<Tuple2<String,Integer>> t = new ArrayList<>();
                            Iterator<Tuple2<String, Integer>> iterator1 = tuple2s1.iterator();
                            Integer i = 0;
                            Integer n = 2;
                            while (iterator1.hasNext() & i<n){
                                t.add(iterator1.next());
                                i++;
                            }
                            return t;
                        }
                    }
            )
                    .collect().forEach(x->System.out.println(x));
        }
    
    
    }
  • 相关阅读:
    nginx优化:使用expires在浏览器端缓存静态文件
    nginx优化:worker_processes/worker_connections/worker_rlimit_nofile
    centos8平台使用ulimit做系统资源限制
    centos8平台nginx服务配置打开文件限制max open files limits
    nginx安全:配置allow/deny控制ip访问(ngx_http_access_module)
    python 菜鸟入门
    正则表达式预查询
    selenium 关键字驱动部分设计思路
    Idea安装Python插件并配置Python SDK
    ORACLE LOG的管理
  • 原文地址:https://www.cnblogs.com/7749ha/p/12909115.html
Copyright © 2011-2022 走看看