zoukankan      html  css  js  c++  java
  • Spark Streaming metadata checkpoint

    Checkpointing

    一个流应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming需要checkpoint足够的信息到容错存储系统中, 以使系统从故障中恢复。

    • Metadata checkpointing:保存流计算的定义信息到容错存储系统如HDFS中。这用来恢复应用程序中运行worker的节点的故障。元数据包括
      • Configuration :创建Spark Streaming应用程序的配置信息
      • DStream operations :定义Streaming应用程序的操作集合
      • Incomplete batches:操作存在队列中的未完成的批
    • Data checkpointing :保存生成的RDD到可靠的存储系统中,这在有状态transformation(如结合跨多个批次的数据)中是必须的。在这样一个transformation中,生成的RDD依赖于之前 批的RDD,随着时间的推移,这个依赖链的长度会持续增长。在恢复的过程中,为了避免这种无限增长。有状态的transformation的中间RDD将会定时地存储到可靠存储系统中,以截断这个依赖链。

    元数据checkpoint主要是为了从driver故障中恢复数据。如果transformation操作被用到了,数据checkpoint即使在简单的操作中都是必须的。

     

    Metadata checkpointing

    相关代码:

     
    def createContext(checkpointDirectory: String)
    : StreamingContext = {
        // If you do not see this printed, that means the StreamingContext has been loaded
        // from the new checkpoint
        println("Creating new context")
        val sparkConf = new SparkConf().setAppName("DynamicRange")
        // Create the context with a 1 second batch size
        val ssc = new StreamingContext(sparkConf, Seconds(8))
        ssc.checkpoint(checkpointDirectory)
        ...
        //你的kafka streaming的相关代码最好放在这里,不然有可能抛异常:spark checkpoint KafkaInputDStream has not been initialize
        //create kafka stream
        val fullLines = KafkaUtils.createStream(ssc, SystemConfig.config.kafkaZkQuorum, SystemConfig.config.kafkaGroup, topicMap);
        //parse data string
        val valueLines = fullLines.map(_._2)
        ..
        ssc
    }
    
    def main(args: Array[String]) {
        var ssc: StreamingContext = null
        try {
            ssc = StreamingContext.getOrCreate(".", () => {
                println("get context fail, try to create a new one.")
                createContext(".")
            })
        } catch{
            case e:Exception =>{
                println("get context exception, try to create a new one.")
                ssc = createContext(".")
            }
        }
    
        ssc.start()
        ssc.awaitTermination()
    }
     

    注意:

    1. kafka streaming的相关代码最好放在createContext里面,不然有可能抛异常:spark checkpoint KafkaInputDStream has not been initialize。
    2. 不同版本之间的Spark Driver是不能从文件中恢复的,所以这里我用try catch如果有异常就新建一个context。
  • 相关阅读:
    高性能分布式计算与存储系统设计概要
    .NET核心代码保护策略
    Web 通信 之 长连接、长轮询(long polling)
    C++数据结构之二叉查找树(BST)
    T4:T4 笔记 + Trait 示例
    腾讯2014软件开发
    CSS选择器从右向左的匹配规则
    Js面向对象编程
    Js杂谈-正则的测试与回溯次数
    Microsoft Message Analyzer (微软消息分析器,“网络抓包工具
  • 原文地址:https://www.cnblogs.com/hd-zg/p/5831235.html
Copyright © 2011-2022 走看看