zoukankan      html  css  js  c++  java
  • 周期性清除Spark Streaming流状态的方法

    在Spark Streaming程序中,我们经常需要使用有状态的流来统计一些累积性的指标,比如各个商品的PV。简单的代码描述如下,使用mapWithState()算子:

     val productPvStream = stream.mapPartitions(records => {
        var result = new ListBuffer[(String, Int)]
          for (record <- records) {
            result += Tuple2(record.key(), 1)
          }
        result.iterator
      }).reduceByKey(_ + _).mapWithState(
        StateSpec.function((productId: String, pv: Option[Int], state: State[Int]) => {
          val sum = pv.getOrElse(0) + state.getOption().getOrElse(0)
          state.update(sum)
          (productId, sum)
      })).stateSnapshots()

    现在的问题是,PV并不是一直累加的,而是每天归零,重新统计数据。要达到在凌晨0点清除状态的目的,有以下两种方法。

    编写脚本重启Streaming程序

    用crontab、Azkaban等在凌晨0点调度执行下面的Shell脚本:

    stream_app_name='com.xyz.streaming.MallForwardStreaming'
    cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`
    
    if [ ${cnt} -eq 1 ]; then
      pid=`ps aux | grep SparkSubmit | grep ${stream_app_name} | awk '{print $2}'`
      kill -9 ${pid}
      sleep 20
      cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`
      if [ ${cnt} -eq 0 ]; then
        nohup sh /path/to/streaming/bin/mall_forward.sh > /path/to/streaming/logs/mall_forward.log 2>&1
      fi
    fi

    这种方式最简单,也不需要对程序本身做任何改动。但随着同时运行的Streaming任务越来越多,就会显得越来越累赘了。

    给StreamingContext设置超时

    在程序启动之前,先计算出当前时间点距离第二天凌晨0点的毫秒数:

    def msTillTomorrow = {
      val now = new Date()
      val tomorrow = new Date(now.getYear, now.getMonth, now.getDate + 1)
      tomorrow.getTime - now.getTime
    }

    然后将Streaming程序的主要逻辑写在while(true)循环中,并且不像平常一样调用StreamingContext.awaitTermination()方法,而改用awaitTerminationOrTimeout()方法,即:

    while (true) {
        val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL))
        ssc.checkpoint(CHECKPOINT_DIR)
    
        // ...处理逻辑...
    
        ssc.start()
        ssc.awaitTerminationOrTimeout(msTillTomorrow)
        ssc.stop(false, true)
        Thread.sleep(BATCH_INTERVAL * 1000)
      }

    在经过msTillTomorrow毫秒之后,StreamingContext就会超时,再调用其stop()方法(注意两个参数,stopSparkContext表示是否停止关联的SparkContext,stopGracefully表示是否优雅停止),就可以停止并重启StreamingContext。

    以上两种方法都是仍然采用Spark Streaming的机制进行状态计算的。如果其他条件允许的话,我们还可以抛弃mapWithState(),直接借助外部存储自己维护状态。比如将Redis的Key设计为product_pv:[product_id]:[date],然后在Spark Streaming的每个批次中使用incrby指令,就能方便地统计PV了,不必考虑定时的问题。

    转自大数据技术与架构公众号 作者 LittleMagic 

  • 相关阅读:
    log输出到日志和控制台
    CRM--搜索功能
    CRM--对数据进行排序
    CRM-注意的小事情
    CRM--modelform之instance
    CRM--保留原搜索条件
    crm系统
    Django多个app情况下静态文件的配置
    测试
    题库
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/12486822.html
Copyright © 2011-2022 走看看