1.1 模块创建和数据准备
继续在Flink-Project下新建一个 maven module作为子项目,命名为gmall-market。
这个模块中我们没有现成的数据,所以会用自定义的测试源来产生测试数据流,或者直接用生成测试数据文件。
1.2 APP市场推广统计
随着智能手机的普及,在如今的电商网站中已经有越来越多的用户来自移动端,相比起传统浏览器的登录方式,手机APP成为了更多用户访问电商网站的首选。对于电商企业来说,一般会通过各种不同的渠道对自己的APP进行市场推广,而这些渠道的统计数据(比如,不同网站上广告链接的点击量、APP下载量)就成了市场营销的重要商业指标。
首先我们考察分渠道的市场推广统计。由于没有现成的数据,所以我们需要自定义一个测试源来生成用户行为的事件流。
1.2.1 自定义测试数据源
定义一个源数据的javaBean类MarketUserBehavior,再定义一个SourceFunction,用于产生用户行为源数据:
1)定义JavaBean--MarketUserBehavior
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MarketUserBehavior {
// 属性:用户ID,用户行为,推广渠道,时间戳
private Long userId;
private String behavior;
private String channel;
private Long timestamp;
}
2)自定义数据源
public class MarketBehaviorSource implements ParallelSourceFunction<MarketUserBehavior> {
//是否运行的标识位
private Boolean running=true;
//定义用户行为和推广渠道的集合
private List<String> behaviorList= Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
private List<String> channelList=Arrays.asList("app store", "wechat", "weibo", "tieba");
//定义随机数发生器
private Random random=new Random();
@Override
public void run(SourceContext<MarketUserBehavior> ctx) throws Exception {
while (running){
//随机生成所有字段
long id = random.nextLong();
String behavior = behaviorList.get(random.nextInt(behaviorList.size()));
String channel = channelList.get(random.nextInt(channelList.size()));
long timestamp = System.currentTimeMillis();
//发出数据
ctx.collect(new MarketUserBehavior(id,behavior,channel,timestamp) );
Thread.sleep(100L);
}
}
@Override
public void cancel() {
running=false;
}
}
1.2.2 分渠道统计
每隔5秒钟统计最近一个小时按照渠道的推广量。
1)定义JavaBean--ChannelBehaviorCount
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ChannelBehaviorCount {
private String channel;
private String behavior;
private String windowEnd;
private Long count;
}
2)主类程序
public class MarketByChannelApp {
public static void main(String[] args) throws Exception {
//1.创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.读取自定义数据源数据
DataStreamSource<MarketUserBehavior> marketUserDS = env.addSource(new MarketBehaviorSource());
//3.过滤卸载数据,按照渠道和行为做分组,开窗
SingleOutputStreamOperator<ChannelBehaviorCount> result = marketUserDS.filter(data -> "UNINSTALL".equals(data.getBehavior()))
.keyBy("channel", "behavior")
.timeWindow(Time.hours(1), Time.seconds(5))
.aggregate(new MarketAggFunc(), new MarketWindowFunc());
//4.打印
result.print();
//5.执行任务
env.execute();
}
public static class MarketAggFunc implements AggregateFunction<MarketUserBehavior,Long,Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(MarketUserBehavior value, Long accumulator) {
return accumulator+1L;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a+b;
}
}
public static class MarketWindowFunc implements WindowFunction<Long, ChannelBehaviorCount, Tuple, TimeWindow>{
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ChannelBehaviorCount> out) throws Exception {
String channel = tuple.getField(0);
String behavior = tuple.getField(1);
String windowEnd = new Timestamp(window.getEnd()).toString();
Long count = input.iterator().next();
out.collect(new ChannelBehaviorCount(channel,behavior,windowEnd,count));
}
}
}
1.2.3 不分渠道(总量)统计
同样我们还可以考察不分渠道的市场推广统计,这样得到的就是所有渠道推广的总量。
1.3 页面广告分析
电商网站的市场营销商业指标中,除了自身的APP推广,还会考虑到页面上的广告投放(包括自己经营的产品和其它网站的广告)。所以广告相关的统计分析,也是市场营销的重要指标。
对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信息。更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。
1.3.1 页面广告点击量统计
接下来我们就进行页面广告按照省份划分的点击量的统计。
同样由于没有现成的数据,我们定义一些测试数据,放在AdClickLog.csv中,用来生成用户点击广告行为的事件流。
在代码中我们首先定义源数据的javaBean类AdClickEvent,以及输出统计数据的javaBean类AdCountByProvince。主函数中先以province进行keyBy,然后开一小时的时间窗口,滑动距离为5秒,统计窗口内的点击事件数量。具体代码实现如下
1)JavaBean--AdClickEvent
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AdClickEvent {
private Long userId;
private Long adId;
private String province;
private String city;
private Long timestamp;
}
2)JavaBean—AdCountByProvince
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AdCountByProvince {
private String province;
private String windowEnd;
private Long count;
}
3)主程序
public class AdStatisticsByProvince {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// 读取数据
DataStream<AdClickEvent> adClickEventStream = env.readTextFile("input/AdClickLog.csv")
.map(data -> {
String[] fields = data.split(",");
return new AdClickEvent(new Long(fields[0]), new Long(fields[1]), fields[2], fields[3], new Long(fields[4]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<AdClickEvent>() {
@Override
public long extractAscendingTimestamp(AdClickEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 根据province分组,开窗聚合统计
DataStream<AdCountByProvince> adCountStream = adClickEventStream
.keyBy(AdClickEvent::getProvince)
.timeWindow(Time.hours(1), Time.seconds(5))
.aggregate(new AdCountAgg(), new AdCountResult());
adCountStream.print();
env.execute("ad statistics job");
}
// 实现自定义的增量聚合函数
public static class AdCountAgg implements AggregateFunction<AdClickEvent, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AdClickEvent value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
// 实现自定义的全窗口函数
public static class AdCountResult implements WindowFunction<Long, AdCountByProvince, String, TimeWindow> {
@Override
public void apply(String province, TimeWindow window, Iterable<Long> input, Collector<AdCountByProvince> out) throws Exception {
out.collect(new AdCountByProvince(province, new Timestamp(window.getEnd()).toString(), input.iterator().next()));
}
}
}
1.3.2 黑名单过滤
上节我们进行的点击量统计,同一用户的重复点击是会叠加计算的。在实际场景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣;但是如果用户在一段时间非常频繁地点击广告,这显然不是一个正常行为,有刷点击量的嫌疑。所以我们可以对一段时间内(比如一天内)的用户点击行为进行约束,如果对同一个广告点击超过一定限额(比如100次),应该把该用户加入黑名单并报警,此后其点击行为不应该再统计.
具体代码实现如下:
1)JavaBean—BlackListWarning
@Data |
2)主程序
public class AdClickByProvinceApp {
public static void main(String[] args) throws Exception {
//1.创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//2.读取文本数据创建流转换位javaBean
SingleOutputStreamOperator<AdClickEvent> adClickDS = env.readTextFile("input/dClickLog.csv")
.map(line -> {
String[] fields = line.split(",");
return new AdClickEvent(Long.parseLong(fields[0]),
Long.parseLong(fields[1]),
fields[2],
fields[3],
Long.parseLong(fields[4]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<AdClickEvent>() {
@Override
public long extractAscendingTimestamp(AdClickEvent element) {
return element.getTimestamp() * 1000L;
}
});
//3.根据点击量进行数据数据过滤(单日某个用户点击某个广告超过100次,则加入黑名单)
SingleOutputStreamOperator<AdClickEvent> filterByClickCount = adClickDS.
keyBy("userId", "adId")
.process(new AdClickKeyProcessFunc(100L));
//4.按照省份分组,开窗,计算各个省份广告点击总数
SingleOutputStreamOperator<AdCountByProvince> result = filterByClickCount.keyBy(data -> data.getProvince())
.timeWindow(Time.hours(1), Time.seconds(5))
.aggregate(new AdClickAgg(), new AdClickWindowFunc());
//5.获取侧输出流
DataStream<BlackListWarning> sideOutput = filterByClickCount.getSideOutput(new OutputTag<BlackListWarning>("OutputTag") {
});
//6.打印
result.print();
sideOutput.print("sideOutput");
//7.启动任务
env.execute();
}
public static class AdClickAgg implements AggregateFunction<AdClickEvent,Long,Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AdClickEvent value, Long accumulator) {
return accumulator + 1L;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a+b;
}
}
public static class AdClickWindowFunc implements WindowFunction<Long, AdCountByProvince,String, TimeWindow>{
@Override
public void apply(String province, TimeWindow window, Iterable<Long> input, Collector<AdCountByProvince> out) throws Exception {
String windowEnd = new Timestamp(window.getEnd()).toString();
Long count = input.iterator().next();
out.collect(new AdCountByProvince(province,windowEnd,count));
}
}
public static class AdClickKeyProcessFunc extends KeyedProcessFunction<Tuple,AdClickEvent,AdClickEvent>{
//定义单日单人点击某个广告上界
private Long maxClick;
public AdClickKeyProcessFunc() {
}
public AdClickKeyProcessFunc(Long maxClick) {
this.maxClick = maxClick;
}
// 定义状态,保存当前用户对某一广告的点击次数
private ValueState<Long> countState;
// 定义状态,用来标记当前用户和广告ID是否已经发送到黑名单
private ValueState<Boolean> isBlackList;
@Override
public void open(Configuration parameters) throws Exception {
countState= getRuntimeContext().getState(new ValueStateDescriptor<Long>("count-state",Long.class));
isBlackList=getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-black-list",Boolean.class));
}
@Override
public void processElement(AdClickEvent value, Context ctx, Collector<AdClickEvent> out) throws Exception {
//获取状态中的数据
Long count = countState.value();
//判断是否是第一条数据
if(count == null){
//如果是第一条数据
countState.update(1L);
//定义定时器,注册定时,每天0点用于清空状态