1 模块创建和数据准备
继续在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为MarketAnalysis。
这个模块中我们没有现成的数据,所以会用自定义的测试源来产生测试数据流,或者直接用生成测试数据文件。
2 APP 市场推广统计
随着智能手机的普及,在如今的电商网站中已经有越来越多的用户来自移动端,相比起传统浏览器的登录方式,手机 APP 成为了更多用户访问电商网站的首选。对
于电商企业来说,一般会通过各种不同的渠道对自己的 APP 进行市场推广,而这些渠道的统计数据(比如,不同网站上广告链接的点击量、APP 下载量)就成了市场
营销的重要商业指标。
首 先 我 们 考 察 分 渠 道 的 市 场 推 广 统 计 。 在 src/main/scala 下创建AppMarketingByChannel.scala 文件。由于没有现成的数据,所以我们需要自定义一
个测试源来生成用户行为的事件流。
分渠道统计
package com.atguigu.market_analysis import java.util.UUID import com.sun.jmx.snmp.Timestamp import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.util.Random case class MarketUserBehavior(userId:String, behavior: String, channnel:String, timestamp:Long) // 定义输出统计的样例类 case class MarketCount( windowStart:String, windowEnd:String, channel:String, behavior:String, count:Long) //自定义测输入数据源 class SimulateMarketEventSource() extends RichParallelSourceFunction[MarketUserBehavior]{ // 定义是否在运行的标识位 var running: Boolean = true //定义用户行为和推广渠道的集合 val behaviorSet:Seq[String] = Seq("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL") val channelSet: Seq[String] = Seq("appstore", "huaweiStore", "weibo", "wechat") //定义随机数生成器 val rand: Random = Random override def cancel(): Unit = running = false override def run(sourceContext: SourceFunction.SourceContext[MarketUserBehavior]): Unit = { //定义一个发出数据的最大量,用于控制测试数据量 val maxCounts = Long.MaxValue var count =0L //while循环,不停地随机生成数据 while(running && count < maxCounts){ val id = UUID.randomUUID().toString val behavior = behaviorSet(rand.nextInt(behaviorSet.size)) val channel = channelSet(rand.nextInt(channelSet.size)) val ts = System.currentTimeMillis() sourceContext.collect(MarketUserBehavior(id, behavior, channel, ts)) count += 1 Thread.sleep(50L) } } } object AppMarketingByChannel { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream:DataStream[MarketUserBehavior] = env.addSource(new SimulateMarketEventSource()) .assignAscendingTimestamps(_.timestamp) val resultStream:DataStream[MarketCount] = dataStream .filter(_.behavior != "UNINSTALL") .keyBy(data => (data.channnel, data.behavior)) .timeWindow(Time.hours(1)) .process( new MarketCountByChannel()) resultStream.print() env.execute("makrket count by channel job") } } class MarketCountByChannel() extends ProcessWindowFunction[MarketUserBehavior, MarketCount, (String, String), TimeWindow]{ override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketCount]): Unit = { val windowStart: String = new Timestamp(context.window.getStart).toString val windowEnd: String = new Timestamp(context.window.getEnd).toString val channel: String = key._1 val behavior: String = key._2 val count: Long = elements.size out.collect(MarketCount(windowStart, windowEnd, channel, behavior, count)) } }
不分渠道(总量)统计
package com.atguigu.market_analysis import java.util.UUID import com.sun.jmx.snmp.Timestamp import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source._ import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.util.Random case class MarketUserBehavior(userId:String, behavior: String, channnel:String, timestamp:Long) // 定义输出统计的样例类 case class MarketCount( windowStart:String, windowEnd:String, channel:String, behavior:String, count:Long) //自定义测输入数据源 class SimulateMarketEventSource() extends RichParallelSourceFunction[MarketUserBehavior]{ // 定义是否在运行的标识位 var running: Boolean = true //定义用户行为和推广渠道的集合 val behaviorSet:Seq[String] = Seq("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL") val channelSet: Seq[String] = Seq("appstore", "huaweiStore", "weibo", "wechat") //定义随机数生成器 val rand: Random = Random override def cancel(): Unit = running = false override def run(sourceContext: SourceFunction.SourceContext[MarketUserBehavior]): Unit = { //定义一个发出数据的最大量,用于控制测试数据量 val maxCounts = Long.MaxValue var count =0L //while循环,不停地随机生成数据 while(running && count < maxCounts){ val id = UUID.randomUUID().toString val behavior = behaviorSet(rand.nextInt(behaviorSet.size)) val channel = channelSet(rand.nextInt(channelSet.size)) val ts = System.currentTimeMillis() sourceContext.collect(MarketUserBehavior(id, behavior, channel, ts)) count += 1 Thread.sleep(50L) } } } object AppMarketingTotal { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream:DataStream[MarketUserBehavior] = env.addSource(new SimulateMarketEventSource()) .assignAscendingTimestamps(_.timestamp) val resultStream:DataStream[MarketCount] = dataStream .filter(_.behavior != "UNINSTALL") .map( data => ("total", 1L)) .keyBy(_._1) .timeWindow(Time.hours(1), Time.seconds(5)) .aggregate(new MarketCountAgg(), new MarketCountResult()) //.aggregate(new MarketCountAgg(), new MarketCountResult()) resultStream.print() env.execute("makrket total count job") } } class MarketCountAgg() extends AggregateFunction[(String, Long), Long, Long]{ override def add(in: (String, Long), acc: Long): Long = acc +1 override def createAccumulator(): Long = 0L override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc + acc1 } class MarketCountResult() extends WindowFunction[Long, MarketCount, String, TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[MarketCount]): Unit = { val windowStart: String = new Timestamp(window.getStart).toString val windowEnd: String = new Timestamp(window.getEnd).toString val count: Long = input.head out.collect(MarketCount(windowStart, windowEnd, "total", "total", count)) } }
3 页面广告分析
电商网站的市场营销商业指标中,除了自身的 APP 推广,还会考虑到页面上的广告投放(包括自己经营的产品和其它网站的广告)。所以广告相关的统计分析,
也是市场营销的重要指标。
对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信
息。更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。
3.1 页面广告点击量统计
接下来我们就进行页面广告按照省份划分的点击量的统计。在 src/main/scala 下创建 AdStatisticsByGeo.scala 文件。同样由于没有现成的数据,我们定义一些测试数
据,放在 AdClickLog.csv 中,用来生成用户点击广告行为的事件流。
在代码中我们首先定义源数据的样例类 AdClickLog,以及输出统计数据的样例类 CountByProvince。主函数中先以 province 进行 keyBy,然后开一小时的时间窗口,
滑动距离为 5 秒,统计窗口内的点击事件数量。具体代码实现如下:
package com.atguigu.market_analysis import com.sun.jmx.snmp.Timestamp import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector // 定义输入输出样例类 case class AdClickEvent(userId:Long, adId:Long, province:String, city: String, timestamp:Long ) case class AdCountByProvince(province:String, windowEnd:String, count:Long) object AdAnalysisByProvince { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val resource = getClass.getResource("AdClickLog.csv") //val adLogStream:DataStream[AdClickEvent] = env.readTextFile(resource.getPath) val adLogStream:DataStream[AdClickEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\MarketAnalysis\src\main\resources\AdClickLog.csv") .map(data => { val dataArray = data.split(",") AdClickEvent(dataArray(0).toLong, dataArray(1).toLong, dataArray(2), dataArray(3), dataArray(4).toLong) }) .assignAscendingTimestamps(_.timestamp*1000L) //开窗聚合统计 val adCountStream:DataStream[AdCountByProvince] = adLogStream .keyBy(_.province) .timeWindow(Time.hours(1),Time.seconds(5)) .aggregate(new AdCountAgg(), new AdCountResult()) adCountStream.print() env.execute("ad analysis job") } } class AdCountAgg() extends AggregateFunction[AdClickEvent, Long, Long]{ override def add(in: AdClickEvent, acc: Long): Long = acc + 1 override def createAccumulator(): Long = 0L override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc + acc1 } class AdCountResult() extends WindowFunction[Long, AdCountByProvince, String, TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdCountByProvince]): Unit = { val province = key val windowEnd = new Timestamp(window.getEnd).toString val count = input.head out.collect( AdCountByProvince(province, windowEnd, count)) } }
3.2 黑名单过滤
上节我们进行的点击量统计,同一用户的重复点击是会叠加计算的。在实际场景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣;
但是如果用户在一段时间非常频繁地点击广告,这显然不是一个正常行为,有刷点击量的嫌疑。所以我们可以对一段时间内(比如一天内)的用户点击行为进行约束,
如果对同一个广告点击超过一定限额(比如 100 次),应该把该用户加入黑名单并报警,此后其点击行为不应该再统计。
package com.atguigu.market_analysis import com.atguigu.market_analysis.AdAnalysisByProvince.getClass import com.sun.jmx.snmp.Timestamp import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector // 定义输入输出样例类 case class AdClickEvent(userId:Long, adId:Long, province:String, city: String, timestamp:Long ) case class AdCountByProvince(province:String, windowEnd:String, count:Long) //定义侧输出样例类 case class BlackListWarning( userId:Long, adId:Long, msg:String ) object AdAnalysisByProvinceFilter { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val resource = getClass.getResource("AdClickLog.csv") //val adLogStream:DataStream[AdClickEvent] = env.readTextFile(resource.getPath) val adLogStream:DataStream[AdClickEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\MarketAnalysis\src\main\resources\AdClickLog.csv") .map(data => { val dataArray = data.split(",") AdClickEvent(dataArray(0).toLong, dataArray(1).toLong, dataArray(2), dataArray(3), dataArray(4).toLong) }) .assignAscendingTimestamps(_.timestamp*1000L) // 定义刷单行为过滤操作 val filterBlackListStream: DataStream[AdClickEvent] = adLogStream .keyBy(data => (data.userId, data.adId)) .process(new FilterBlackListt(100)) //开窗聚合统计 val adCountStream:DataStream[AdCountByProvince] = filterBlackListStream .keyBy(_.province) .timeWindow(Time.hours(1),Time.seconds(5)) .aggregate(new AdCountAgg(), new AdCountResult()) adCountStream.print() filterBlackListStream.getSideOutput(new OutputTag[BlackListWarning]("blacklist")).print("blacklist") env.execute("ad analysis job") } } class AdCountAgg() extends AggregateFunction[AdClickEvent, Long, Long]{ override def add(in: AdClickEvent, acc: Long): Long = acc + 1 override def createAccumulator(): Long = 0L override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc + acc1 } class AdCountResult() extends WindowFunction[Long, AdCountByProvince, String, TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdCountByProvince]): Unit = { val province = key val windowEnd = new Timestamp(window.getEnd).toString val count = input.head out.collect( AdCountByProvince(province, windowEnd, count)) } } class FilterBlackListt(maxClickCount: Long) extends KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]{ //定义状态,需要保存当前用户对当前广告的点击量count lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count", classOf[Long])) //标识位,用来表示用户是否已经在黑名单中 lazy val isSentState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-sent", classOf[Boolean])) override def processElement(value: AdClickEvent, context: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#Context, collector: Collector[AdClickEvent]): Unit = { //取出状态数据 val curCount = countState.value() // 如果是第一个数据,那么注册第二天0点的定时器,用于清空状态 if(curCount == 0){ val ts = (context.timerService().currentProcessingTime() / (1000*60*60*24) +1)*(1000*60*60*24) context.timerService().registerProcessingTimeTimer(ts) } // 判断count值是否达到上限,如果达到,并且之前没有输出过报警信息,那么报警 if( curCount >= maxClickCount){ if( !isSentState.value()){ context.output(new OutputTag[BlackListWarning]("blacklist"),BlackListWarning(value.userId, value.adId, "click over " + maxClickCount + " times today")) isSentState.update(true) } return } countState.update(curCount+1) collector.collect(value) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#OnTimerContext, out: Collector[AdClickEvent]): Unit = { countState.clear() isSentState.clear() } }