zoukankan      html  css  js  c++  java
  • Spark应用_PageView_UserView_HotChannel


    Spark应用_PageView_UserView_HotChannel

    一、PV

    对某一个页面的访问量,在页面中进行刷新一次就是一次pv

    PV {p1, (u1,u2,u3,u1,u2,u4…)} 对同一个页面的浏览量进行统计,用户可以重复

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    public class PV_ANA {
    public static void main(String[] args) {
    SparkConf conf = new SparkConf()
    .setAppName("PV_ANA")
    .setMaster("local")
    .set("spark.testing.memory", "2147480000");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logRDD = sc.textFile("f:/userLog");
    String str = "View";
    final Broadcast<String> broadcast = sc.broadcast(str);
    pvAnalyze(logRDD, broadcast);
    }
     
    private static void pvAnalyze(JavaRDD<String> logRDD,
    final Broadcast<String> broadcast) {
    JavaRDD<String> filteredLogRDD = logRDD.filter
    (new Function<String, Boolean>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Boolean call(String s) throws Exception {
    String actionParam = broadcast.value();
    String action = s.split(" ")[5];
    return actionParam.equals(action);
    }
    });
    JavaPairRDD<String, String> pariLogRDD = filteredLogRDD.mapToPair
    (new PairFunction<String, String, String>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Tuple2<String, String> call(String s)
    throws Exception {
    String pageId = s.split(" ")[3];
    return new Tuple2<String, String>(pageId, null);
    }
    });
    pariLogRDD.groupByKey().foreach(new VoidFunction
    <Tuple2<String, Iterable<String>>>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public void call(Tuple2<String, Iterable<String>> tuple)
    throws Exception {
    String pageId = tuple._1;
    Iterator<String> iterator = tuple._2.iterator();
    long count = 0L;
    while (iterator.hasNext()) {
    iterator.next();
    count++;
    }
    System.out.println("PAGEID:" + pageId + " PV_COUNT:" + count);
    }
    });
    }
     
    }

    二、UV

    UV {p1, (u1,u2,u3,u4,u5…)} 对一个页面有多少用户访问,用户不可以重复

    【方式一】

    【流程图】

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    public class UV_ANA {
    public static void main(String[] args) {
    SparkConf conf = new SparkConf()
    .setAppName("UV_ANA")
    .setMaster("local")
    .set("spark.testing.memory", "2147480000");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logRDD = sc.textFile("f:/userLog");
    String str = "View";
    final Broadcast<String> broadcast = sc.broadcast(str);
    uvAnalyze(logRDD, broadcast);
    }
     
    private static void uvAnalyze(JavaRDD<String> logRDD,
    final Broadcast<String> broadcast) {
    JavaRDD<String> filteredLogRDD = logRDD.filter
    (new Function<String, Boolean>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Boolean call(String s) throws Exception {
    String actionParam = broadcast.value();
    String action = s.split(" ")[5];
    return actionParam.equals(action);
    }
    });
    JavaPairRDD<String, String> pairLogRDD = filteredLogRDD.mapToPair
    (new PairFunction<String, String, String>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Tuple2<String, String> call(String s) throws Exception {
    String pageId = s.split(" ")[3];
    String userId = s.split(" ")[2];
    return new Tuple2<String, String>(pageId, userId);
    }
    });
    pairLogRDD.groupByKey().foreach(new VoidFunction
    <Tuple2<String, Iterable<String>>>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public void call(Tuple2<String, Iterable<String>> tuple)
    throws Exception {
    String pageId = tuple._1;
    Iterator<String> iterator = tuple._2.iterator();
    Set<String> userSets = new HashSet<>();
    while (iterator.hasNext()) {
    String userId = iterator.next();
    userSets.add(userId);
    }
    System.out.println("PAGEID:" + pageId + " " +
    "UV_COUNT:" + userSets.size());
    }
    });
    }
    }

    【方式二】

    【流程图】

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    public class UV_ANAoptz {
    public static void main(String[] args) {
    SparkConf conf = new SparkConf()
    .setAppName("UV_ANAoptz")
    .setMaster("local")
    .set("spark.testing.memory", "2147480000");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logRDD = sc.textFile("f:/userLog");
    String str = "View";
    final Broadcast<String> broadcast = sc.broadcast(str);
    uvAnalyzeOptz(logRDD, broadcast);
    }
     
    private static void uvAnalyzeOptz(JavaRDD<String> logRDD,
    final Broadcast<String> broadcast) {
    JavaRDD<String> filteredLogRDD = logRDD.filter
    (new Function<String, Boolean>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Boolean call(String s) throws Exception {
    String actionParam = broadcast.value();
    String action = s.split(" ")[5];
    return actionParam.equals(action);
    }
    });
    JavaPairRDD<String, String> pairRDD = filteredLogRDD.mapToPair
    (new PairFunction<String, String, String>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Tuple2<String, String> call(String s)
    throws Exception {
    String pageId = s.split(" ")[3];
    String userId = s.split(" ")[2];
    return new Tuple2<String, String>(pageId + "_" +
    userId, null);
    }
    });
    JavaPairRDD<String, Iterable<String>> groupUp2LogRDD = pairRDD.groupByKey();
    Map<String, Object> countByKey = groupUp2LogRDD.mapToPair
    (new PairFunction<Tuple2<String, Iterable<String>>,
    String, String>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Tuple2<String, String> call(Tuple2<String,
    Iterable<String>> tuple)
    throws Exception {
    String pu = tuple._1;
    String[] spilted = pu.split("_");
    String pageId = spilted[0];
    return new Tuple2<String, String>(pageId, null);
    }
    }).countByKey();
    Set<String> keySet = countByKey.keySet();
    for (String key : keySet) {
    System.out.println("PAGEID:" + key + " UV_COUNT:" +
    countByKey.get(key));
    }
    }
    }

    三、热门版块下用户访问的数量

    统计出热门版块中最活跃的top3用户。

    【方式一】

    【流程图】

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    public class HotChannel {
    public static void main(String[] args) {
    SparkConf conf = new SparkConf()
    .setAppName("HotChannel")
    .setMaster("local")
    .set("spark.testing.memory", "2147480000");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logRDD = sc.textFile("f:/userLog");
    String str = "View";
    final Broadcast<String> broadcast = sc.broadcast(str);
    hotChannel(sc, logRDD, broadcast);
    }
     
    private static void hotChannel(JavaSparkContext sc, JavaRDD<String> logRDD,
    final Broadcast<String> broadcast) {
    JavaRDD<String> filteredLogRDD = logRDD.filter
    (new Function<String, Boolean>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Boolean call(String v1) throws Exception {
    String actionParam = broadcast.value();
    String action = v1.split(" ")[5];
    return actionParam.equals(action);
    }
    });
    JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair
    (new PairFunction<String, String, String>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Tuple2<String, String> call(String s) throws Exception {
    String channel = s.split(" ")[4];
    return new Tuple2<String, String>(channel, null);
    }
    });
    Map<String, Object> channelPVMap = channel2nullRDD.countByKey();
    Set<String> keySet = channelPVMap.keySet();
    List<SortObj> channels = new ArrayList<>();
    for (String channel : keySet) {
    channels.add(new SortObj(channel, Integer.valueOf
    (channelPVMap.get(channel) + "")));
    }
    Collections.sort(channels, new Comparator<SortObj>() {
     
    @Override
    public int compare(SortObj o1, SortObj o2) {
    return o2.getValue() - o1.getValue();
    }
    });
    List<String> hotChannelList = new ArrayList<>();
    for (int i = 0; i < 3; i++) {
    hotChannelList.add(channels.get(i).getKey());
    }
    for (String channel : hotChannelList) {
    System.out.println("channel:" + channel);
    }
    final Broadcast<List<String>> hotChannelListBroadcast =
    sc.broadcast(hotChannelList);
    JavaRDD<String> filterRDD = logRDD.filter(new Function<String, Boolean>() {
    @Override
    public Boolean call(String s) throws Exception {
    List<String> hostChannels = hotChannelListBroadcast.value();
    String channel = s.split(" ")[4];
    String userId = s.split(" ")[2];
    return hostChannels.contains(channel) && !"null".equals(userId);
    }
    });
    JavaPairRDD<String, String> channel2UserRDD = filterRDD.mapToPair
    (new PairFunction<String, String, String>() {
    @Override
    public Tuple2<String, String> call(String s)
    throws Exception {
    String[] splited = s.split(" ");
    String channel = splited[4];
    String userId = splited[2];
    return new Tuple2<String, String>(channel, userId);
    }
    });
    channel2UserRDD.groupByKey().foreach(new VoidFunction
    <Tuple2<String, Iterable<String>>>() {
    @Override
    public void call(Tuple2<String, Iterable<String>> tuple)
    throws Exception {
    String channel = tuple._1;
    Iterator<String> iterator = tuple._2.iterator();
    Map<String, Integer> userNumMap = new HashMap<>();
    while (iterator.hasNext()) {
    String userId = iterator.next();
    Integer count = userNumMap.get(userId);
    if (count == null) {
    count = 1;
    } else {
    count++;
    }
    userNumMap.put(userId, count);
    }
    List<SortObj> lists = new ArrayList<>();
    Set<String> keys = userNumMap.keySet();
    for (String key : keys) {
    lists.add(new SortObj(key, userNumMap.get(key)));
    }
    Collections.sort(lists, new Comparator<SortObj>() {
     
    @Override
    public int compare(SortObj O1, SortObj O2) {
    return O2.getValue() - O1.getValue();
    }
    });
    System.out.println("HOT_CHANNEL:" + channel);
    for (int i = 0; i < 3; i++) {
    SortObj sortObj = lists.get(i);
    System.out.println(sortObj.getKey() + "=="
    + sortObj.getValue());
    }
    }
    });
    }
    }

    【方式二】

    【流程图】

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    public class HotChannelOpz {
    public static void main(String[] args) {
    SparkConf conf = new SparkConf()
    .setAppName("hotChannelOpz")
    .setMaster("local")
    .set("spark.testing.memory", "2147480000");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logRDD = sc.textFile("f:/userLog");
    String str = "View";
    final Broadcast<String> broadcast = sc.broadcast(str);
    hotChannelOpz(sc, logRDD, broadcast);
    }
     
    private static void hotChannelOpz(JavaSparkContext sc, JavaRDD<String> logRDD,
    final Broadcast<String> broadcast) {
    JavaRDD<String> filteredLogRDD = logRDD.filter
    (new Function<String, Boolean>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Boolean call(String v1) throws Exception {
    String actionParam = broadcast.value();
    String action = v1.split(" ")[5];
    return actionParam.equals(action);
    }
    });
     
    JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair
    (new PairFunction<String, String, String>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Tuple2<String, String> call(String val)
    throws Exception {
    String channel = val.split(" ")[4];
     
    return new Tuple2<String, String>(channel, null);
    }
    });
    Map<String, Object> channelPVMap = channel2nullRDD.countByKey();
    Set<String> keySet = channelPVMap.keySet();
    List<SortObj> channels = new ArrayList<>();
    for (String channel : keySet) {
    channels.add(new SortObj(channel, Integer.valueOf
    (channelPVMap.get(channel) + "")));
    }
    Collections.sort(channels, new Comparator<SortObj>() {
     
    @Override
    public int compare(SortObj o1, SortObj o2) {
    return o2.getValue() - o1.getValue();
    }
    });
    List<String> hotChannelList = new ArrayList<>();
    for (int i = 0; i < 3; i++) {
    hotChannelList.add(channels.get(i).getKey());
    }
    final Broadcast<List<String>> hotChannelListBroadcast =
    sc.broadcast(hotChannelList);
    JavaRDD<String> filtedRDD = logRDD.filter
    (new Function<String, Boolean>() {
     
    @Override
    public Boolean call(String v1) throws Exception {
    List<String> hostChannels = hotChannelListBroadcast.value();
    String channel = v1.split(" ")[4];
    String userId = v1.split(" ")[2];
    return hostChannels.contains(channel) &&
    !"null".equals(userId);
    }
    });
    JavaPairRDD<String, String> user2ChannelRDD = filtedRDD.mapToPair
    (new PairFunction<String, String, String>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Tuple2<String, String> call(String val)
    throws Exception {
    String[] splited = val.split(" ");
    String userId = splited[2];
    String channel = splited[4];
    return new Tuple2<String, String>(userId, channel);
    }
    });
    JavaPairRDD<String, String> userVistChannelsRDD =
    user2ChannelRDD.groupByKey().
    flatMapToPair(new PairFlatMapFunction
    <Tuple2<String, Iterable<String>>, String, String>() {
    private static final long serialVersionUID = 1L;
     
    @Override
    public Iterable<Tuple2<String, String>> call
    (Tuple2<String, Iterable<String>> tuple)
    throws Exception {
    String userId = tuple._1;
    Iterator<String> iterator = tuple._2.iterator();
    Map<String, Integer> channelMap = new HashMap<>();
    while (iterator.hasNext()) {
    String channel = iterator.next();
    Integer count = channelMap.get(channel);
    if (count == null)
    count = 1;
    else
    count++;
    channelMap.put(channel, count);
    }
    List<Tuple2<String, String>> list = new ArrayList<>();
    Set<String> keys = channelMap.keySet();
    for (String channel : keys) {
    Integer channelNum = channelMap.get(channel);
    list.add(new Tuple2<String, String>(channel,
    userId + "_" + channelNum));
    }
    return list;
    }
    });
    userVistChannelsRDD.groupByKey().foreach(new VoidFunction
    <Tuple2<String, Iterable<String>>>() {
     
    @Override
    public void call(Tuple2<String, Iterable<String>> tuple)
    throws Exception {
    String channel = tuple._1;
    Iterator<String> iterator = tuple._2.iterator();
    List<SortObj> list = new ArrayList<>();
    while (iterator.hasNext()) {
    String ucs = iterator.next();
    String[] splited = ucs.split("_");
    String userId = splited[0];
    Integer num = Integer.valueOf(splited[1]);
    list.add(new SortObj(userId, num));
    }
    Collections.sort(list, new Comparator<SortObj>() {
    @Override
    public int compare(SortObj o1, SortObj o2) {
    return o2.getValue() - o1.getValue();
    }
    });
    System.out.println("HOT_CHANNLE:" + channel);
    for (int i = 0; i < 3; i++) {
    SortObj sortObj = list.get(i);
    System.out.println(sortObj.getKey() + "==="
    + sortObj.getValue());
    }
    }
    });
    }
    }
  • 相关阅读:
    Excel VBA宏 链接服务器 上传和下载数据
    SQL IF while 游标
    关于SQL while 循环嵌套 外部循环数据无法进入内部循环
    SQL中读取Excel 以及 bpc语言
    安装zabbix及LNMP的平台的搭建
    MySQL的储存过程
    zabbix添加客户端
    ossec日志文件的安装
    find常用参数详解
    Liunx的备份
  • 原文地址:https://www.cnblogs.com/haozhengfei/p/b34b27ee2f9e79dc3c6d56820f790e42.html
Copyright © 2011-2022 走看看