zoukankan      html  css  js  c++  java
  • Flink之广告点击黑名单统计

    1、数据格式

    543462,1715,北京,北京,1511658000
    662867,2244074,广东,广州,1511658060
    561558,3611281,广东,深圳,1511658120
    894923,1715,北京,北京,1511658180
    834377,2244074,上海,上海,1511658240
    625915,3611281,广东,珠海,1511658300
    578814,1715,广东,深圳,1511658330
    873335,1256540,上海,上海,1511658540
    429984,2244074,广东,深圳,1511658600
    937166,1715,北京,北京,1511661601
    937166,1715,北京,北京,1511661602
    937166,1715,北京,北京,1511661603
    937166,1715,北京,北京,1511661604
    937166,1715,北京,北京,1511661605
    937166,1715,北京,北京,1511661606
    937166,1715,北京,北京,1511661607
    937166,1715,北京,北京,1511661608
    161501,36156,江苏,南京,1511661608
    937166,1715,北京,北京,1511661609
    937166,1715,北京,北京,1511661610
    937166,1715,北京,北京,1511661611
    937166,1715,北京,北京,1511661612
    937166,1715,北京,北京,1511661613
    937166,1715,北京,北京,1511661614
    937166,1715,北京,北京,1511661615
    937166,1715,北京,北京,1511661616
    937166,1715,北京,北京,1511661617
    937166,1715,北京,北京,1511661618
    2315,36156,zhejiang,杭州,1511661618
    10236,5614,henan,郑州,1511661619
    937166,1715,北京,北京,1511661619
    937166,1715,北京,北京,1511661620
    937166,1715,北京,北京,1511661621
    937166,1715,北京,北京,1511661622
    937166,1715,北京,北京,1511661623
    937166,1715,北京,北京,1511661624
    937166,1715,北京,北京,1511661625
    937166,1715,北京,北京,1511661626
    937166,1715,北京,北京,1511661627
    937166,1715,北京,北京,1511661628
    937166,1715,北京,北京,1511661629
    937166,1715,北京,北京,1511661630
    937166,1715,北京,北京,1511661631
    937166,1715,北京,北京,1511661632
    937166,1715,北京,北京,1511661633
    937166,1715,北京,北京,1511661634
    937166,1715,北京,北京,1511661635
    937166,1715,北京,北京,1511661636
    937166,1715,北京,北京,1511661637
    937166,1715,北京,北京,1511661638
    937166,1715,北京,北京,1511661639
    937166,1715,北京,北京,1511661640
    2315,36237,zhejiang,杭州,1511661641
    10236,2914,henan,郑州,1511661641
    2315,7156,zhejiang,杭州,1511661641
    10236,5389,hebei,石家庄,1511661641
    937166,1715,北京,北京,1511661641
    937166,1715,北京,北京,1511661642
    937166,1715,北京,北京,1511661643
    937166,1715,北京,北京,1511661644
    937166,1715,北京,北京,1511661645
    937166,1715,北京,北京,1511661646
    937166,1715,北京,北京,1511661647
    937166,1715,北京,北京,1511661648
    937166,1715,北京,北京,1511661649
    937166,1715,北京,北京,1511661650
    937166,1715,北京,北京,1511661651
    937166,1715,北京,北京,1511661652
    161501,36156,江苏,南京,1511661652
    937166,1715,北京,北京,1511661653
    937166,1715,北京,北京,1511661654
    161501,36156,江苏,南京,1511661654
    161501,36156,江苏,南京,1511661655
    937166,1715,北京,北京,1511661655
    937166,1715,北京,北京,1511661656
    937166,1715,北京,北京,1511661657
    937166,1715,北京,北京,1511661658
    937166,1715,北京,北京,1511661659
    937166,1715,北京,北京,1511661660
    937166,1715,北京,北京,1511661661
    937166,1715,北京,北京,1511661662
    937166,1715,北京,北京,1511661663
    937166,1715,北京,北京,1511661664
    937166,1715,北京,北京,1511661665
    937166,1715,北京,北京,1511661666
    937166,1715,北京,北京,1511661667
    937166,1715,北京,北京,1511661668
    937166,1715,北京,北京,1511661669
    937166,1715,北京,北京,1511661670
    937166,1715,北京,北京,1511661671
    937166,1715,北京,北京,1511661672
    937166,1715,北京,北京,1511661673
    937166,1715,北京,北京,1511661674
    937166,1715,北京,北京,1511661675
    937166,1715,北京,北京,1511661676
    937166,1715,北京,北京,1511661677
    937166,1715,北京,北京,1511661678
    937166,1715,北京,北京,1511661679
    937166,1715,北京,北京,1511661680
    937166,1715,北京,北京,1511661681
    937166,1715,北京,北京,1511661682
    937166,1715,北京,北京,1511661683
    937166,1715,北京,北京,1511661684
    937166,1715,北京,北京,1511661685
    937166,1715,北京,北京,1511661686
    937166,1715,北京,北京,1511661687
    937166,1715,北京,北京,1511661688
    937166,1715,北京,北京,1511661689
    937166,1715,北京,北京,1511661690
    937166,1715,北京,北京,1511661691
    937166,1715,北京,北京,1511661692
    2315,36237,zhejiang,杭州,1511661692
    10236,2914,henan,郑州,1511661693
    2315,7156,zhejiang,杭州,1511661693
    937166,1715,北京,北京,1511661693
    937166,1715,北京,北京,1511661694
    937166,1715,北京,北京,1511661695
    937166,1715,北京,北京,1511661696
    937166,1715,北京,北京,1511661697
    937166,1715,北京,北京,1511661698
    937166,1715,北京,北京,1511661699
    937166,1715,北京,北京,1511661700
    937166,1715,北京,北京,1511661701
    937166,1715,北京,北京,1511661702
    937166,1715,北京,北京,1511661703
    937166,1715,北京,北京,1511661704
    937166,1715,北京,北京,1511661705
    937166,1715,北京,北京,1511661706
    937166,1715,北京,北京,1511661707
    937166,1715,北京,北京,1511661708
    937166,1715,北京,北京,1511661709
    937166,1715,北京,北京,1511661710
    937166,1715,北京,北京,1511661711
    937166,1715,北京,北京,1511661712
    937166,1715,北京,北京,1511661713
    937166,1715,北京,北京,1511661714
    937166,1715,北京,北京,1511661715
    937166,1715,北京,北京,1511661716
    937166,1715,北京,北京,1511661717
    937166,1715,北京,北京,1511661718
    937166,1715,北京,北京,1511661719
    937166,1715,北京,北京,1511661710
    937166,1715,北京,北京,1511661711
    View Code

    2、处理主类

    package service
    
    /**
     * @program: demo
     * @description: ${description}
     * @author: yang
     * @create: 2020-12-30 14:29
     */
    
    import java.sql.Timestamp
    
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, KeyedProcessFunction}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    import utils.Utils
    
    
    // 输入的广告点击事件样例类
    case class AdClickEvent( userId: Long,  //用户ID
                             adId: Long, //广告ID
                             province: String,  //省份
                             city: String, //城市
                             timestamp: Long ) //用户点击广告的时间
    
    /**
     * 实时黑名单统计
     */
    object AdClickCount {
      //设置了一个侧输出流 外部的变量,全局的变量
      private val outputBlackList = new OutputTag[String]("blacklist")
    
    
      def main(args: Array[String]): Unit = {
        //步骤一:获取程序入口
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        //设置参数
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        //步骤二:计算黑名单
        val adEventStream = env.readTextFile(Utils.adClickLogPath) //获取数据
          .map(Utils.string2ClickEvent(_)) //解析数据
          .assignTimestampsAndWatermarks(new AdClickEventTimeExtractor()) //设置watermark
          .keyBy(data => (data.userId, data.adId)) //分组(userid,adclickid)
          .process(new CountBlackListUser(100)) //实时统计
    
        //步骤三:从侧输出流打印黑名单
        adEventStream.getSideOutput(outputBlackList)
          .print()
    
    
    
        env.execute("AdClickCount")
    
      }
    
    
      /**
       * 对广告点击次数进行聚合统计
       */
      class AdClickCount extends AggregateFunction[AdClickEvent,Long,Long]{
        //辅助变量赋初始值
        override def createAccumulator(): Long = 0L
        //对每条数据加一
        override def add(in: AdClickEvent, acc: Long): Long = acc + 1
        //返回最后的结果
        override def getResult(acc: Long): Long = acc
        //把所有的数据加起来
        override def merge(acc: Long, acc1: Long): Long = acc + acc1
      }
    
      /**
       * 过滤黑名单数据
       * @param maxCount 最大次数
       */
      class CountBlackListUser(maxCount:Int)
        extends KeyedProcessFunction[(Long,Long),AdClickEvent,AdClickEvent]{
        //记录当前用户对当前广告的点击量
        lazy val clickCountState:ValueState[Long] = getRuntimeContext.getState(
          new ValueStateDescriptor[Long]("count-click-state",classOf[Long]))
    
        //保存是否发送过黑名单
        lazy val isSetBlackList:ValueState[Boolean] = getRuntimeContext.getState(
          new ValueStateDescriptor[Boolean]("is-sent-state",classOf[Boolean]))
    
        //保存定时器触发的时间戳
        lazy val saveTimerState:ValueState[Long] = getRuntimeContext.getState(
          new ValueStateDescriptor[Long]("reset-time-state",classOf[Long])
    
    
        )
    
        override def processElement(value: AdClickEvent,
                                    ctx: KeyedProcessFunction[(Long, Long),
                                      AdClickEvent, AdClickEvent]#Context,
                                    out: Collector[AdClickEvent]): Unit = {
    
          val currentCount = clickCountState.value()
          //如果当前用户的当前广告第一次来,注册定时器,定时器每天00:00触发
          //也就是说,到了晚上12:00的时候,你要清空今天统计的数据。
          if(currentCount == 0){
            //计算时间
            val ts = (ctx.timerService().currentProcessingTime()/(1000*60*60*24) +1) * (1000 * 60 * 60 * 24)
            saveTimerState.update(ts)
            //注册定时器
            ctx.timerService().registerProcessingTimeTimer(ts)
          }
          //判断计数是否达到上线,如果达到加入黑名单
          if(currentCount >= maxCount){ //100 101
            //是否发送过黑名单
            if(!isSetBlackList.value()){ //如果没有发送过黑名单消息
              //更新一下发送黑名单的状态
              isSetBlackList.update(true)
              //输入到侧输出流
              ctx.output(outputBlackList,
                "用户"+value.userId+" 对广告:"+value.adId+" 点击超过 " + maxCount +" 次")
            }
            return
          }
          //更新当前的状态,累加访问的次数
          clickCountState.update(currentCount + 1)
          out.collect(value)
        }
    
        override def onTimer(timestamp: Long,
                             ctx: KeyedProcessFunction[(Long, Long),
                               AdClickEvent, AdClickEvent]#OnTimerContext,
                             out: Collector[AdClickEvent]): Unit = {
          if(timestamp == saveTimerState.value()){
            //清空状态数据
            isSetBlackList.clear()
            clickCountState.clear()
            saveTimerState.clear()
          }
    
        }
    
      }
    
    }
    
    class AdClickEventTimeExtractor extends AssignerWithPeriodicWatermarks[AdClickEvent]{
      //当前窗口的时间最大值
      var currentMaxEventTime = 0L
      //最大乱序时间 10s
      val maxOufOfOrderness = 10
    
      override def getCurrentWatermark: Watermark = {
        new Watermark((currentMaxEventTime - maxOufOfOrderness) * 1000)
      }
    
      override def extractTimestamp(element: AdClickEvent, previousElementTimestamp: Long): Long = {
        //时间字段
        val timestamp = element.timestamp * 1000
    
        currentMaxEventTime = Math.max(element.timestamp, currentMaxEventTime)
        timestamp;
      }
    }

    3、UTils工具类

    package utils
    
    /**
     * @program: demo
     * @description: ${description}
     * @author: yang
     * @create: 2020-12-30 14:26
     */
    import java.text.SimpleDateFormat
    
    import service.{AdClickEvent, ApacheLogEvent, UserBehavior}
    
    
    object Utils {
    
      //时间日志路径
      val eventLogPath = "E:\java\demo\src\main\resources\file\data2.log"
      //广告点击日志路径
      val adClickLogPath = "E:\java\demo\src\main\resources\file\data3.csv"
    
      //用户行为数据日志路径
      val userBehaviorLogPath="E:\java\demo\src\main\resources\file\data1.csv"
    
    
      /**
       * 根据字符串把数据转换成为日志服务数据对象
       * @param line
       * @return
       */
      def string2ApacheLogEvent(line:String):ApacheLogEvent={
        val fields = line.split(" ")
        val dateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
        val timeStamp = dateFormat.parse(fields(3).trim).getTime
        ApacheLogEvent(fields(0).trim,fields(1).trim,timeStamp,
          fields(5).trim,fields(6).trim)
      }
    
      /**
       * 根据字符串生成广告点击日志对象
       * @param line
       * @return
       */
      def string2ClickEvent(line:String):AdClickEvent={
        val dataArray = line.split(",")
        AdClickEvent(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim, dataArray(3).trim, dataArray(4).trim.toLong)
      }
    
      /**
       * 根据字符串,把数据转换成为用户行为对象
       * @param line
       * @return
       */
      def string2UserBehavior(line:String):UserBehavior={
        val fields = line.split(",")
        UserBehavior(fields(0).trim.toLong,
          fields(1).trim.toLong,
          fields(2).trim.toLong,
          fields(3).trim,
          fields(4).trim.toLong,
          fields(5).trim
        )
    
      }
    
    }
  • 相关阅读:
    dwSun带你选Python的编辑器/IDE
    ubuntu中文乱码解决
    解决matplotlib中文显示
    1506.01186-Cyclical Learning Rates for Training Neural Networks
    1503.02531-Distilling the Knowledge in a Neural Network.md
    1804.03235-Large scale distributed neural network training through online distillation.md
    mysql导入太慢解决方法
    已某个时间单位(日月周年)来分割时间段
    阿里云邮件推送
    阿里云短信推送服务
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14234924.html
Copyright © 2011-2022 走看看