zoukankan      html  css  js  c++  java
  • UserView--第一种方式set去重,基于Spark算子的java代码实现


    UserView--第一种方式set去重,基于Spark算子的java代码实现

    测试数据
    java代码
    package com.hzf.spark.study;
    
    import java.util.HashSet;
    import java.util.Iterator;
    import java.util.Set;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.broadcast.Broadcast;
    
    import scala.Tuple2;
    
    public class UVAnalysis {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setAppName("UV_ANA").setMaster("local")
                    .set("spark.testing.memory", "2147480000");
            @SuppressWarnings("resource")
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> logRDD = sc.textFile("userLog1");
            String str = "View";
            final Broadcast<String> broadcast = sc.broadcast(str);
            uvAnalyze(logRDD, broadcast);
        }
    
        private static void uvAnalyze(JavaRDD<String> logRDD, final Broadcast<String> broadcast) {
            JavaRDD<String> filteredLogRDD = logRDD.filter(new Function<String, Boolean>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Boolean call(String v1) throws Exception {
                    String actionParam = broadcast.value();
                    String action = v1.split("	")[5];
                    return actionParam.equals(action);
                }
            });
    
            JavaPairRDD<String, String> pairLogRDD = filteredLogRDD
                    .mapToPair(new PairFunction<String, String, String>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, String> call(String val) throws Exception {
                            String pageId = val.split("	")[3];
                            String userId = val.split("	")[2];
                            return new Tuple2<String, String>(pageId, userId);
                        }
                    });
    
            pairLogRDD.groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() {
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Tuple2<String, Iterable<String>> tuple) throws Exception {
                    String pageId = tuple._1;
                    Iterator<String> iterator = tuple._2.iterator();
                    Set<String> userSets = new HashSet<>();
                    while (iterator.hasNext()) {
                        String userId = iterator.next();
                        userSets.add(userId);
                    }
                    System.out.println("PAGEID:" + pageId + "	 UV_COUNT:" + userSets.size());
                }
            });
        }
    }
    

      

    result
     

     
  • 相关阅读:
    这个夏天,感动我的歌,感动我的你
    设计中最困难的部分在于决定要设计什么 设计原本择录
    Sql效能优化总结(续) sql语句优化篇
    sql效能优化总结
    使用AStyle进行代码格式化
    迭代模型 转
    软件项目开发系列开篇杂谈
    Sql效能优化总结(续) 架构调整篇
    throw和throw ex的区别
    面向过程&面向对象 UML&RUP
  • 原文地址:https://www.cnblogs.com/haozhengfei/p/34616038af48357d06054611ba71aae3.html
Copyright © 2011-2022 走看看