zoukankan      html  css  js  c++  java
  • UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现

     

    UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现

     
    测试数据
    java代码
     1 package com.hzf.spark.study;
     2 
     3 import java.util.Map;
     4 import java.util.Set;
     5 
     6 import org.apache.spark.SparkConf;
     7 import org.apache.spark.api.java.JavaPairRDD;
     8 import org.apache.spark.api.java.JavaRDD;
     9 import org.apache.spark.api.java.JavaSparkContext;
    10 import org.apache.spark.api.java.function.Function;
    11 import org.apache.spark.api.java.function.PairFunction;
    12 import org.apache.spark.broadcast.Broadcast;
    13 
    14 import scala.Tuple2;
    15 
    16 public class UVAnalysis02 {
    17     public static void main(String[] args) {
    18         SparkConf conf = new SparkConf().setAppName("UV_ANA").setMaster("local")
    19                 .set("spark.testing.memory", "2147480000");
    20         @SuppressWarnings("resource")
    21         JavaSparkContext sc = new JavaSparkContext(conf);
    22         JavaRDD<String> logRDD = sc.textFile("userLog1");
    23         String str = "View";
    24         final Broadcast<String> broadcast = sc.broadcast(str);
    25         uvAnalyzeOptz(logRDD, broadcast);
    26     }
    27     
    28     private static void uvAnalyzeOptz(JavaRDD<String> logRDD, final Broadcast<String> broadcast) {
    29         JavaRDD<String> filteredLogRDD = logRDD.filter(new Function<String, Boolean>() {
    30           
    31             private static final long serialVersionUID = 1L;
    32 
    33             @Override
    34             public Boolean call(String v1) throws Exception {
    35                 String actionParam = broadcast.value();
    36                 String action = v1.split("	")[5];
    37                 return actionParam.equals(action);
    38             }
    39         });
    40         
    41         JavaPairRDD<String, String> up2LogRDD = filteredLogRDD.mapToPair(new PairFunction<String, String, String>() {
    42 
    43             private static final long serialVersionUID = 1L;
    44 
    45             @Override
    46             public Tuple2<String, String> call(String val) throws Exception {
    47                 String[] splited = val.split("	");
    48                 String userId = splited[2];
    49                 String pageId = splited[3];
    50                  
    51                 return new Tuple2<String, String>(userId + "_" + pageId,null);
    52             }
    53         });
    54         
    55         JavaPairRDD<String, Iterable<String>> groupUp2LogRDD = up2LogRDD.groupByKey();
    56         
    57         Map<String, Object> countByKey = groupUp2LogRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<String>>, String, String>() {
    58 
    59             private static final long serialVersionUID = 1L;
    60 
    61             @Override
    62             public Tuple2<String, String> call(Tuple2<String, Iterable<String>> tuple) throws Exception {
    63                 String pu = tuple._1;
    64                 String[] splited = pu.split("_");
    65                 String pageId = splited[1];
    66                 return new Tuple2<String, String>(pageId,null);
    67             }
    68         }).countByKey();
    69         
    70         Set<String> keySet = countByKey.keySet();
    71         for (String key : keySet) {
    72             System.out.println("PAGEID:"+key+"	UV_COUNT:"+countByKey.get(key));
    73         }
    74     }
    75 }
    View Code
    result
     


     

     
  • 相关阅读:
    如何安装Tomcat服务器
    浅谈数据库中的锁机制
    彻底理解js中this的指向
    Javascript模块化编程的写法
    滚屏加载--无刷新动态加载数据技术的应用
    JavaScript正则表达式
    CSS:水平居中与垂直居中
    Linux常用命令大全
    HTML的元素嵌套规则
    clearfix清除浮动进化史
  • 原文地址:https://www.cnblogs.com/haozhengfei/p/1878742a6fb0471d68c5323c2a1567cc.html
Copyright © 2011-2022 走看看