zoukankan      html  css  js  c++  java
  • Spark状态管理State的应用

    场景描述

    如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。那么我就需要一个东西保存历史状态State。


    首先区分一下两个概念,state一般指一个具体的task/operator的状态。而checkpoint则表示了一个Job,在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。我们在这里讨论的是state。

    Spark的状态更新

    updateStateByKey

    updateStateByKey会统计全局的key的状态,不管又没有数据输入,它会在每一个批次间隔返回之前的key的状态。updateStateByKey会对已存在的key进行state的状态更新,同时还会对每个新出现的key执行相同的更新函数操作。如果通过更新函数对state更新后返回来为none,此时刻key对应的state状态会被删除(state可以是任意类型的数据的结构)。

    mapWithState

    mapWithState也会统计全局的key的状态,但是如果没有数据输入,便不会返回之前的key的状态,类似于增量的感觉。

    updateStateByKey和mapWithState的区别

    updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。

    mapWithState只返回变化后的key的值,这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。

    updateStateByKey示例:

    def updateFunction(currValues:Seq[Int],preValue:Option[Int]): Option[Int] = {
           val currValueSum = currValues.sum
            //上面的Int类型都可以用对象类型替换
            Some(currValueSum + preValue.getOrElse(0)) //当前值的和加上历史值
        }
        kafkaStream.map(r => (r._2,1)).updateStateByKey(updateFunction _)

    这里的updateFunction方法就是需要我们自己去实现的状态跟新的逻辑,currValues就是当前批次的所有值,preValue是历史维护的状态,updateStateByKey返回的是包含历史所有状态信息的DStream。


    mapWithState示例:

    val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]())
        //自定义mappingFunction,累加单词出现的次数并更新状态
       val mappingFunc = (word: String, count: Option[Int], state: State[Int]) => {
       val sum = count.getOrElse(0) + state.getOption.getOrElse(0)
       val output = (word, sum)
       state.update(sum)
       output
        }
        //调用mapWithState进行管理流数据的状态
      kafkaStream.map(r => (r._2,1)).mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)).print()

    这里的initialRDD就是初始化状态,updateStateByKey也有对应的API。这里的mappingFun也是需要我们自己实现的状态跟新逻辑,调用state.update()就是对状态的跟新,output就是通过mapWithState后返回的DStream中的数据形式。注意这里不是直接传入的mappingFunc函数,而是一个StateSpec 的对象,其实也是对函数的一个包装而已。

    整理自:

    大数据技术与架构公众号,作者wzw

  • 相关阅读:
    Haproxy 配置项及配置实例-Haproxy入门教程
    Spring Boot 配置-Spring Boot教程深入浅出系列
    RMI 接口和类概述-RMI快速入门教程
    分布式和非分布式模型对比-RMI快速入门教程
    RMI分布式对象模型-RMI快速入门教程
    RMI介绍-RMI快速入门教程
    Qt编写可视化大屏电子看板系统17-柱状堆积图
    Qt开发经验小技巧161-165
    Qt编写安防视频监控系统60-子模块4云台控制
    MyBatis的关联映射,resultMap元素之collection子元素,实现一对多关联关系(节选自:Java EE企业级应用开发教程)
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/12486541.html
Copyright © 2011-2022 走看看