zoukankan      html  css  js  c++  java
  • flink(五) 电商用户行为分析(五)市场营销商业指标统计分析之市场推广统计、广告点击量统计、 黑名单过滤

    1 模块创建和数据准备

    继续在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为MarketAnalysis。
    这个模块中我们没有现成的数据,所以会用自定义的测试源来产生测试数据流,或者直接用生成测试数据文件。

    2 APP 市场推广统计

      随着智能手机的普及,在如今的电商网站中已经有越来越多的用户来自移动端,相比起传统浏览器的登录方式,手机 APP 成为了更多用户访问电商网站的首选。对
    于电商企业来说,一般会通过各种不同的渠道对自己的 APP 进行市场推广,而这些渠道的统计数据(比如,不同网站上广告链接的点击量、APP 下载量)就成了市场
    营销的重要商业指标。
      首 先 我 们 考 察 分 渠 道 的 市 场 推 广 统 计 。 在 src/main/scala 下创建AppMarketingByChannel.scala 文件。由于没有现成的数据,所以我们需要自定义一
    个测试源来生成用户行为的事件流。
    分渠道统计
    package com.atguigu.market_analysis
    
    import java.util.UUID
    
    import com.sun.jmx.snmp.Timestamp
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    import scala.util.Random
    
    case class MarketUserBehavior(userId:String, behavior: String, channnel:String, timestamp:Long)
    
    // 定义输出统计的样例类
    case class MarketCount( windowStart:String, windowEnd:String, channel:String, behavior:String, count:Long)
    
    //自定义测输入数据源
    class SimulateMarketEventSource() extends RichParallelSourceFunction[MarketUserBehavior]{
      // 定义是否在运行的标识位
      var running: Boolean = true
    
      //定义用户行为和推广渠道的集合
      val behaviorSet:Seq[String] = Seq("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL")
      val channelSet: Seq[String] = Seq("appstore", "huaweiStore", "weibo", "wechat")
    
      //定义随机数生成器
      val rand: Random = Random
    
      override def cancel(): Unit = running = false
    
      override def run(sourceContext: SourceFunction.SourceContext[MarketUserBehavior]): Unit = {
        //定义一个发出数据的最大量,用于控制测试数据量
        val maxCounts = Long.MaxValue
        var count =0L
    
        //while循环,不停地随机生成数据
        while(running && count < maxCounts){
          val id = UUID.randomUUID().toString
          val behavior = behaviorSet(rand.nextInt(behaviorSet.size))
          val channel = channelSet(rand.nextInt(channelSet.size))
          val ts = System.currentTimeMillis()
    
          sourceContext.collect(MarketUserBehavior(id, behavior, channel, ts))
    
          count += 1
          Thread.sleep(50L)
        }
    
      }
    }
    
    object AppMarketingByChannel {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val dataStream:DataStream[MarketUserBehavior] = env.addSource(new SimulateMarketEventSource())
          .assignAscendingTimestamps(_.timestamp)
    
        val resultStream:DataStream[MarketCount] = dataStream
          .filter(_.behavior != "UNINSTALL")
          .keyBy(data => (data.channnel, data.behavior))
          .timeWindow(Time.hours(1))
          .process( new MarketCountByChannel())
    
        resultStream.print()
        env.execute("makrket count by channel job")
    
      }
    
    
    }
    
    class MarketCountByChannel() extends ProcessWindowFunction[MarketUserBehavior, MarketCount, (String, String), TimeWindow]{
      override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketCount]): Unit = {
        val windowStart: String = new Timestamp(context.window.getStart).toString
        val windowEnd: String = new Timestamp(context.window.getEnd).toString
        val channel: String = key._1
        val behavior: String = key._2
        val count: Long = elements.size
        out.collect(MarketCount(windowStart, windowEnd, channel, behavior, count))
    
      }
    
    }
    不分渠道(总量)统计
    package com.atguigu.market_analysis
    
    import java.util.UUID
    
    import com.sun.jmx.snmp.Timestamp
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.source._
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    import scala.util.Random
    
    case class MarketUserBehavior(userId:String, behavior: String, channnel:String, timestamp:Long)
    // 定义输出统计的样例类
    case class MarketCount( windowStart:String, windowEnd:String, channel:String, behavior:String, count:Long)
    
    //自定义测输入数据源
    class SimulateMarketEventSource() extends RichParallelSourceFunction[MarketUserBehavior]{
      // 定义是否在运行的标识位
      var running: Boolean = true
    
      //定义用户行为和推广渠道的集合
      val behaviorSet:Seq[String] = Seq("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL")
      val channelSet: Seq[String] = Seq("appstore", "huaweiStore", "weibo", "wechat")
    
      //定义随机数生成器
      val rand: Random = Random
    
      override def cancel(): Unit = running = false
    
      override def run(sourceContext: SourceFunction.SourceContext[MarketUserBehavior]): Unit = {
        //定义一个发出数据的最大量,用于控制测试数据量
        val maxCounts = Long.MaxValue
        var count =0L
    
        //while循环,不停地随机生成数据
        while(running && count < maxCounts){
          val id = UUID.randomUUID().toString
          val behavior = behaviorSet(rand.nextInt(behaviorSet.size))
          val channel = channelSet(rand.nextInt(channelSet.size))
          val ts = System.currentTimeMillis()
    
          sourceContext.collect(MarketUserBehavior(id, behavior, channel, ts))
    
          count += 1
          Thread.sleep(50L)
        }
    
      }
    }
    
    object AppMarketingTotal {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val dataStream:DataStream[MarketUserBehavior] = env.addSource(new SimulateMarketEventSource())
          .assignAscendingTimestamps(_.timestamp)
    
        val resultStream:DataStream[MarketCount] = dataStream
          .filter(_.behavior != "UNINSTALL")
          .map( data => ("total", 1L))
          .keyBy(_._1)
          .timeWindow(Time.hours(1), Time.seconds(5))
          .aggregate(new MarketCountAgg(), new MarketCountResult())
    
          //.aggregate(new MarketCountAgg(), new MarketCountResult())
    
        resultStream.print()
        env.execute("makrket total count job")
      }
    
    }
    
    class MarketCountAgg() extends AggregateFunction[(String, Long), Long, Long]{
      override def add(in: (String, Long), acc: Long): Long = acc +1
    
      override def createAccumulator(): Long = 0L
    
      override def getResult(acc: Long): Long = acc
    
      override def merge(acc: Long, acc1: Long): Long = acc + acc1
    
    }
    
    class MarketCountResult() extends WindowFunction[Long, MarketCount, String, TimeWindow]{
      override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[MarketCount]): Unit = {
        val windowStart: String = new Timestamp(window.getStart).toString
        val windowEnd: String = new Timestamp(window.getEnd).toString
        val count: Long = input.head
        out.collect(MarketCount(windowStart, windowEnd, "total", "total", count))
      }
    
    }

    3 页面广告分析

      电商网站的市场营销商业指标中,除了自身的 APP 推广,还会考虑到页面上的广告投放(包括自己经营的产品和其它网站的广告)。所以广告相关的统计分析,
    也是市场营销的重要指标。
      对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信
    息。更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。

    3.1 页面广告点击量统计

      接下来我们就进行页面广告按照省份划分的点击量的统计。在 src/main/scala 下创建 AdStatisticsByGeo.scala 文件。同样由于没有现成的数据,我们定义一些测试数
    据,放在 AdClickLog.csv 中,用来生成用户点击广告行为的事件流。
      在代码中我们首先定义源数据的样例类 AdClickLog,以及输出统计数据的样例类 CountByProvince。主函数中先以 province 进行 keyBy,然后开一小时的时间窗口,
    滑动距离为 5 秒,统计窗口内的点击事件数量。具体代码实现如下:
    package com.atguigu.market_analysis
    
    
    
    import com.sun.jmx.snmp.Timestamp
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    // 定义输入输出样例类
    case class AdClickEvent(userId:Long, adId:Long, province:String, city: String, timestamp:Long )
    
    case class AdCountByProvince(province:String, windowEnd:String, count:Long)
    
    object AdAnalysisByProvince {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
    
        val resource = getClass.getResource("AdClickLog.csv")
        //val adLogStream:DataStream[AdClickEvent] = env.readTextFile(resource.getPath)
        val adLogStream:DataStream[AdClickEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\MarketAnalysis\src\main\resources\AdClickLog.csv")
          .map(data => {
            val dataArray = data.split(",")
            AdClickEvent(dataArray(0).toLong, dataArray(1).toLong, dataArray(2), dataArray(3), dataArray(4).toLong)
          })
          .assignAscendingTimestamps(_.timestamp*1000L)
    
        //开窗聚合统计
        val adCountStream:DataStream[AdCountByProvince] = adLogStream
          .keyBy(_.province)
          .timeWindow(Time.hours(1),Time.seconds(5))
          .aggregate(new  AdCountAgg(), new AdCountResult())
    
        adCountStream.print()
        env.execute("ad analysis job")
    
      }
    
    }
    
    class AdCountAgg() extends AggregateFunction[AdClickEvent, Long, Long]{
      override def add(in: AdClickEvent, acc: Long): Long = acc + 1
    
      override def createAccumulator(): Long = 0L
    
      override def getResult(acc: Long): Long = acc
    
      override def merge(acc: Long, acc1: Long): Long = acc + acc1
    }
    
    class AdCountResult() extends WindowFunction[Long, AdCountByProvince, String, TimeWindow]{
      override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdCountByProvince]): Unit = {
        val province = key
        val windowEnd = new Timestamp(window.getEnd).toString
        val count = input.head
        out.collect( AdCountByProvince(province, windowEnd, count))
      }
    }
    3.2 黑名单过滤
     
      上节我们进行的点击量统计,同一用户的重复点击是会叠加计算的。在实际场景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣;
    但是如果用户在一段时间非常频繁地点击广告,这显然不是一个正常行为,有刷点击量的嫌疑。所以我们可以对一段时间内(比如一天内)的用户点击行为进行约束,
    如果对同一个广告点击超过一定限额(比如 100 次),应该把该用户加入黑名单并报警,此后其点击行为不应该再统计。
    package com.atguigu.market_analysis
    
    import com.atguigu.market_analysis.AdAnalysisByProvince.getClass
    import com.sun.jmx.snmp.Timestamp
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    // 定义输入输出样例类
    case class AdClickEvent(userId:Long, adId:Long, province:String, city: String, timestamp:Long )
    case class AdCountByProvince(province:String, windowEnd:String, count:Long)
    
    //定义侧输出样例类
    case class BlackListWarning( userId:Long, adId:Long, msg:String )
    
    
    object AdAnalysisByProvinceFilter {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
    
        val resource = getClass.getResource("AdClickLog.csv")
        //val adLogStream:DataStream[AdClickEvent] = env.readTextFile(resource.getPath)
        val adLogStream:DataStream[AdClickEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\MarketAnalysis\src\main\resources\AdClickLog.csv")
          .map(data => {
            val dataArray = data.split(",")
            AdClickEvent(dataArray(0).toLong, dataArray(1).toLong, dataArray(2), dataArray(3), dataArray(4).toLong)
          })
          .assignAscendingTimestamps(_.timestamp*1000L)
    
        // 定义刷单行为过滤操作
        val filterBlackListStream: DataStream[AdClickEvent] = adLogStream
          .keyBy(data => (data.userId, data.adId))
          .process(new FilterBlackListt(100))
    
    
        //开窗聚合统计
        val adCountStream:DataStream[AdCountByProvince] = filterBlackListStream
          .keyBy(_.province)
          .timeWindow(Time.hours(1),Time.seconds(5))
          .aggregate(new  AdCountAgg(), new AdCountResult())
    
        adCountStream.print()
        filterBlackListStream.getSideOutput(new OutputTag[BlackListWarning]("blacklist")).print("blacklist")
        env.execute("ad analysis job")
      }
    
    }
    
    
    class AdCountAgg() extends AggregateFunction[AdClickEvent, Long, Long]{
      override def add(in: AdClickEvent, acc: Long): Long = acc + 1
    
      override def createAccumulator(): Long = 0L
    
      override def getResult(acc: Long): Long = acc
    
      override def merge(acc: Long, acc1: Long): Long = acc + acc1
    }
    
    class AdCountResult() extends WindowFunction[Long, AdCountByProvince, String, TimeWindow]{
      override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdCountByProvince]): Unit = {
        val province = key
        val windowEnd = new Timestamp(window.getEnd).toString
        val count = input.head
        out.collect( AdCountByProvince(province, windowEnd, count))
      }
    }
    
    class FilterBlackListt(maxClickCount: Long) extends  KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]{
      //定义状态,需要保存当前用户对当前广告的点击量count
      lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count", classOf[Long]))
      //标识位,用来表示用户是否已经在黑名单中
      lazy val isSentState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-sent", classOf[Boolean]))
    
      override def processElement(value: AdClickEvent, context: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#Context, collector: Collector[AdClickEvent]): Unit = {
        //取出状态数据
        val curCount = countState.value()
    
        // 如果是第一个数据,那么注册第二天0点的定时器,用于清空状态
        if(curCount == 0){
          val ts = (context.timerService().currentProcessingTime() / (1000*60*60*24) +1)*(1000*60*60*24)
          context.timerService().registerProcessingTimeTimer(ts)
        }
    
        // 判断count值是否达到上限,如果达到,并且之前没有输出过报警信息,那么报警
        if( curCount >= maxClickCount){
          if( !isSentState.value()){
            context.output(new OutputTag[BlackListWarning]("blacklist"),BlackListWarning(value.userId, value.adId, "click over " + maxClickCount + " times today"))
            isSentState.update(true)
          }
          return
        }
    
        countState.update(curCount+1)
        collector.collect(value)
    
      }
    
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#OnTimerContext, out: Collector[AdClickEvent]): Unit = {
        countState.clear()
        isSentState.clear()
      }
    
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13491886.html

  • 相关阅读:
    【BZOJ-4422】Cow Confinement 线段树 + 扫描线 + 差分 (优化DP)
    【BZOJ-2521】最小生成树 最小割
    mtools使用-1
    关于nodejs DeprecationWarning: current URL string parser is deprecated, and will be removed in a future version. To use the new parser, pass option { useNewUrlParser: true } to MongoClient.connect.
    学习RUNOOB.COM进度二
    学习RUNOOB.COM进度一
    深入了解jQuery Mobile-3装载器
    深入了解jQuery Mobile-1
    mongodb的学习之旅一
    支付回调内容
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13491886.html
Copyright © 2011-2022 走看看