zoukankan      html  css  js  c++  java
  • Flink统计日活

    
    .keyBy(0)
          .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
          .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
          .evictor(TimeEvictor.of(Time.seconds(0), true))
          .process(new ProcessWindowFunction[(String, String), (String, String, Long), Tuple, TimeWindow] {
            /*
            这是使用state是因为,窗口默认只会在创建结束的时候触发一次计算,然后数据结果,
            如果长时间的窗口,比如:一天的窗口,要是等到一天结束在输出结果,那还不如跑批。
            所有大窗口会添加trigger,以一定的频率输出中间结果。
            加evictor 是因为,每次trigger,触发计算是,窗口中的所有数据都会参与,所以数据会触发很多次,比较浪费,加evictor 驱逐已经计算过的数据,就不会重复计算了
            驱逐了已经计算过的数据,导致窗口数据不完全,所以需要state 存储我们需要的中间结果
             */
            var wordState: MapState[String, String] = _
            var pvCount: ValueState[Long] = _
    
            override def open(parameters: Configuration): Unit = {
              // new MapStateDescriptor[String, String]("word", classOf[String], classOf[String])
              wordState = getRuntimeContext.getMapState(new MapStateDescriptor[String, String]("word", classOf[String], classOf[String]))
              pvCount = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pvCount", classOf[Long]))
            }
    
            override def process(key: Tuple, context: Context, elements: Iterable[(String, String)], out: Collector[(String, String, Long)]): Unit = {
    
    
              var pv = 0;
              val elementsIterator = elements.iterator
              // 遍历窗口数据,获取唯一word
              while (elementsIterator.hasNext) {
                pv += 1
                val word = elementsIterator.next()._2
                wordState.put(word, null)
              }
              // add current
              pvCount.update(pvCount.value() + pv)
              var count: Long = 0
              val wordIterator = wordState.keys().iterator()
              while (wordIterator.hasNext) {
                wordIterator.next()
                count += 1
              }
              // uv
              out.collect((key.getField(0), "uv", count))
              out.collect(key.getField(0), "pv", pv)
    
            }
          })
    
  • 相关阅读:
    PHP学习笔记:APACHE配置虚拟目录、一个站点使用多域名配置方式
    转载:分页原理+分页代码+分页类制作
    PHP学习笔记:数据库学习心得
    PHP学习笔记:用mysqli连接数据库
    PHP学习笔记:MySQL数据库的操纵
    PHP学习笔记:利用时间和mt_rand函数获取随机名字
    PHP学习笔记:等比例缩放图片
    前端学习(一) html介绍和head标签
    Python 协程
    Python 线程----线程方法,线程事件,线程队列,线程池,GIL锁,协程,Greenlet
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14034205.html
Copyright © 2011-2022 走看看