zoukankan      html  css  js  c++  java
  • UserView--第一种方式set去重,基于Spark算子的java代码实现


    UserView--第一种方式set去重,基于Spark算子的java代码实现

    测试数据
    java代码
    package com.hzf.spark.study;
    
    import java.util.HashSet;
    import java.util.Iterator;
    import java.util.Set;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.broadcast.Broadcast;
    
    import scala.Tuple2;
    
    public class UVAnalysis {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setAppName("UV_ANA").setMaster("local")
                    .set("spark.testing.memory", "2147480000");
            @SuppressWarnings("resource")
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> logRDD = sc.textFile("userLog1");
            String str = "View";
            final Broadcast<String> broadcast = sc.broadcast(str);
            uvAnalyze(logRDD, broadcast);
        }
    
        private static void uvAnalyze(JavaRDD<String> logRDD, final Broadcast<String> broadcast) {
            JavaRDD<String> filteredLogRDD = logRDD.filter(new Function<String, Boolean>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Boolean call(String v1) throws Exception {
                    String actionParam = broadcast.value();
                    String action = v1.split("	")[5];
                    return actionParam.equals(action);
                }
            });
    
            JavaPairRDD<String, String> pairLogRDD = filteredLogRDD
                    .mapToPair(new PairFunction<String, String, String>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, String> call(String val) throws Exception {
                            String pageId = val.split("	")[3];
                            String userId = val.split("	")[2];
                            return new Tuple2<String, String>(pageId, userId);
                        }
                    });
    
            pairLogRDD.groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() {
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Tuple2<String, Iterable<String>> tuple) throws Exception {
                    String pageId = tuple._1;
                    Iterator<String> iterator = tuple._2.iterator();
                    Set<String> userSets = new HashSet<>();
                    while (iterator.hasNext()) {
                        String userId = iterator.next();
                        userSets.add(userId);
                    }
                    System.out.println("PAGEID:" + pageId + "	 UV_COUNT:" + userSets.size());
                }
            });
        }
    }
    

      

    result
     

     
  • 相关阅读:
    C#中sizeof的用法
    C#托管堆对象实例包含什么
    C#引用类型转换的几种方式
    C#中结构(struct)的部分初始化和完全初始化
    C#值类型装箱后能改变其值吗
    C#程序集系列13,如何让CLR选择不同版本的程序集
    C#程序集系列12,C#编译器和CLR如何找寻程序集
    C#程序集系列11,全局程序集缓存
    C#程序集系列10,强名称程序集
    C#程序集系列09,程序集签名
  • 原文地址:https://www.cnblogs.com/haozhengfei/p/34616038af48357d06054611ba71aae3.html
Copyright © 2011-2022 走看看