Spark应用_PageView_UserView_HotChannel
一、PV
对某一个页面的访问量,在页面中进行刷新一次就是一次pv
PV {p1, (u1,u2,u3,u1,u2,u4…)} 对同一个页面的浏览量进行统计,用户可以重复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
public class PV_ANA {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("PV_ANA")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("f:/userLog");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
pvAnalyze(logRDD, broadcast);
}
private static void pvAnalyze(JavaRDD<String> logRDD,
final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter
(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String s) throws Exception {
String actionParam = broadcast.value();
String action = s.split(" ")[5];
return actionParam.equals(action);
}
});
JavaPairRDD<String, String> pariLogRDD = filteredLogRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String s)
throws Exception {
String pageId = s.split(" ")[3];
return new Tuple2<String, String>(pageId, null);
}
});
pariLogRDD.groupByKey().foreach(new VoidFunction
<Tuple2<String, Iterable<String>>>() {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Iterable<String>> tuple)
throws Exception {
String pageId = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
long count = 0L;
while (iterator.hasNext()) {
iterator.next();
count++;
}
System.out.println("PAGEID:" + pageId + " PV_COUNT:" + count);
}
});
}
}
|
二、UV
UV {p1, (u1,u2,u3,u4,u5…)} 对一个页面有多少用户访问,用户不可以重复
【方式一】
【流程图】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
public class UV_ANA {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("UV_ANA")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("f:/userLog");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
uvAnalyze(logRDD, broadcast);
}
private static void uvAnalyze(JavaRDD<String> logRDD,
final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter
(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String s) throws Exception {
String actionParam = broadcast.value();
String action = s.split(" ")[5];
return actionParam.equals(action);
}
});
JavaPairRDD<String, String> pairLogRDD = filteredLogRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String s) throws Exception {
String pageId = s.split(" ")[3];
String userId = s.split(" ")[2];
return new Tuple2<String, String>(pageId, userId);
}
});
pairLogRDD.groupByKey().foreach(new VoidFunction
<Tuple2<String, Iterable<String>>>() {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Iterable<String>> tuple)
throws Exception {
String pageId = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
Set<String> userSets = new HashSet<>();
while (iterator.hasNext()) {
String userId = iterator.next();
userSets.add(userId);
}
System.out.println("PAGEID:" + pageId + " " +
"UV_COUNT:" + userSets.size());
}
});
}
}
|
【方式二】
【流程图】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
public class UV_ANAoptz {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("UV_ANAoptz")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("f:/userLog");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
uvAnalyzeOptz(logRDD, broadcast);
}
private static void uvAnalyzeOptz(JavaRDD<String> logRDD,
final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter
(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String s) throws Exception {
String actionParam = broadcast.value();
String action = s.split(" ")[5];
return actionParam.equals(action);
}
});
JavaPairRDD<String, String> pairRDD = filteredLogRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String s)
throws Exception {
String pageId = s.split(" ")[3];
String userId = s.split(" ")[2];
return new Tuple2<String, String>(pageId + "_" +
userId, null);
}
});
JavaPairRDD<String, Iterable<String>> groupUp2LogRDD = pairRDD.groupByKey();
Map<String, Object> countByKey = groupUp2LogRDD.mapToPair
(new PairFunction<Tuple2<String, Iterable<String>>,
String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(Tuple2<String,
Iterable<String>> tuple)
throws Exception {
String pu = tuple._1;
String[] spilted = pu.split("_");
String pageId = spilted[0];
return new Tuple2<String, String>(pageId, null);
}
}).countByKey();
Set<String> keySet = countByKey.keySet();
for (String key : keySet) {
System.out.println("PAGEID:" + key + " UV_COUNT:" +
countByKey.get(key));
}
}
}
|
三、热门版块下用户访问的数量
统计出热门版块中最活跃的top3用户。
【方式一】
【流程图】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
public class HotChannel {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("HotChannel")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("f:/userLog");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
hotChannel(sc, logRDD, broadcast);
}
private static void hotChannel(JavaSparkContext sc, JavaRDD<String> logRDD,
final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter
(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String v1) throws Exception {
String actionParam = broadcast.value();
String action = v1.split(" ")[5];
return actionParam.equals(action);
}
});
JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String s) throws Exception {
String channel = s.split(" ")[4];
return new Tuple2<String, String>(channel, null);
}
});
Map<String, Object> channelPVMap = channel2nullRDD.countByKey();
Set<String> keySet = channelPVMap.keySet();
List<SortObj> channels = new ArrayList<>();
for (String channel : keySet) {
channels.add(new SortObj(channel, Integer.valueOf
(channelPVMap.get(channel) + "")));
}
Collections.sort(channels, new Comparator<SortObj>() {
public int compare(SortObj o1, SortObj o2) {
return o2.getValue() - o1.getValue();
}
});
List<String> hotChannelList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
hotChannelList.add(channels.get(i).getKey());
}
for (String channel : hotChannelList) {
System.out.println("channel:" + channel);
}
final Broadcast<List<String>> hotChannelListBroadcast =
sc.broadcast(hotChannelList);
JavaRDD<String> filterRDD = logRDD.filter(new Function<String, Boolean>() {
public Boolean call(String s) throws Exception {
List<String> hostChannels = hotChannelListBroadcast.value();
String channel = s.split(" ")[4];
String userId = s.split(" ")[2];
return hostChannels.contains(channel) && !"null".equals(userId);
}
});
JavaPairRDD<String, String> channel2UserRDD = filterRDD.mapToPair
(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s)
throws Exception {
String[] splited = s.split(" ");
String channel = splited[4];
String userId = splited[2];
return new Tuple2<String, String>(channel, userId);
}
});
channel2UserRDD.groupByKey().foreach(new VoidFunction
<Tuple2<String, Iterable<String>>>() {
public void call(Tuple2<String, Iterable<String>> tuple)
throws Exception {
String channel = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
Map<String, Integer> userNumMap = new HashMap<>();
while (iterator.hasNext()) {
String userId = iterator.next();
Integer count = userNumMap.get(userId);
if (count == null) {
count = 1;
} else {
count++;
}
userNumMap.put(userId, count);
}
List<SortObj> lists = new ArrayList<>();
Set<String> keys = userNumMap.keySet();
for (String key : keys) {
lists.add(new SortObj(key, userNumMap.get(key)));
}
Collections.sort(lists, new Comparator<SortObj>() {
public int compare(SortObj O1, SortObj O2) {
return O2.getValue() - O1.getValue();
}
});
System.out.println("HOT_CHANNEL:" + channel);
for (int i = 0; i < 3; i++) {
SortObj sortObj = lists.get(i);
System.out.println(sortObj.getKey() + "=="
+ sortObj.getValue());
}
}
});
}
}
|
【方式二】
【流程图】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
public class HotChannelOpz {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("hotChannelOpz")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("f:/userLog");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
hotChannelOpz(sc, logRDD, broadcast);
}
private static void hotChannelOpz(JavaSparkContext sc, JavaRDD<String> logRDD,
final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter
(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String v1) throws Exception {
String actionParam = broadcast.value();
String action = v1.split(" ")[5];
return actionParam.equals(action);
}
});
JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String val)
throws Exception {
String channel = val.split(" ")[4];
return new Tuple2<String, String>(channel, null);
}
});
Map<String, Object> channelPVMap = channel2nullRDD.countByKey();
Set<String> keySet = channelPVMap.keySet();
List<SortObj> channels = new ArrayList<>();
for (String channel : keySet) {
channels.add(new SortObj(channel, Integer.valueOf
(channelPVMap.get(channel) + "")));
}
Collections.sort(channels, new Comparator<SortObj>() {
public int compare(SortObj o1, SortObj o2) {
return o2.getValue() - o1.getValue();
}
});
List<String> hotChannelList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
hotChannelList.add(channels.get(i).getKey());
}
final Broadcast<List<String>> hotChannelListBroadcast =
sc.broadcast(hotChannelList);
JavaRDD<String> filtedRDD = logRDD.filter
(new Function<String, Boolean>() {
public Boolean call(String v1) throws Exception {
List<String> hostChannels = hotChannelListBroadcast.value();
String channel = v1.split(" ")[4];
String userId = v1.split(" ")[2];
return hostChannels.contains(channel) &&
!"null".equals(userId);
}
});
JavaPairRDD<String, String> user2ChannelRDD = filtedRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String val)
throws Exception {
String[] splited = val.split(" ");
String userId = splited[2];
String channel = splited[4];
return new Tuple2<String, String>(userId, channel);
}
});
JavaPairRDD<String, String> userVistChannelsRDD =
user2ChannelRDD.groupByKey().
flatMapToPair(new PairFlatMapFunction
<Tuple2<String, Iterable<String>>, String, String>() {
private static final long serialVersionUID = 1L;
public Iterable<Tuple2<String, String>> call
(Tuple2<String, Iterable<String>> tuple)
throws Exception {
String userId = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
Map<String, Integer> channelMap = new HashMap<>();
while (iterator.hasNext()) {
String channel = iterator.next();
Integer count = channelMap.get(channel);
if (count == null)
count = 1;
else
count++;
channelMap.put(channel, count);
}
List<Tuple2<String, String>> list = new ArrayList<>();
Set<String> keys = channelMap.keySet();
for (String channel : keys) {
Integer channelNum = channelMap.get(channel);
list.add(new Tuple2<String, String>(channel,
userId + "_" + channelNum));
}
return list;
}
});
userVistChannelsRDD.groupByKey().foreach(new VoidFunction
<Tuple2<String, Iterable<String>>>() {
public void call(Tuple2<String, Iterable<String>> tuple)
throws Exception {
String channel = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
List<SortObj> list = new ArrayList<>();
while (iterator.hasNext()) {
String ucs = iterator.next();
String[] splited = ucs.split("_");
String userId = splited[0];
Integer num = Integer.valueOf(splited[1]);
list.add(new SortObj(userId, num));
}
Collections.sort(list, new Comparator<SortObj>() {
public int compare(SortObj o1, SortObj o2) {
return o2.getValue() - o1.getValue();
}
});
System.out.println("HOT_CHANNLE:" + channel);
for (int i = 0; i < 3; i++) {
SortObj sortObj = list.get(i);
System.out.println(sortObj.getKey() + "==="
+ sortObj.getValue());
}
}
});
}
}
|