zoukankan      html  css  js  c++  java
  • 066 基于checkpoint的HA机制实现

    1.说明

      针对需要恢复的应用场景,提供了HA的的机制

      内部实现原理:基于checkpoint的

      当程序被kill的时候,下次恢复的时候,会从checkpoint对用的文件中进行数据的恢复

    2.HA原理

      当job执行的时候,将数据同步到checkpoint设置的对应文件夹中
      同步的数据包括:
        类的信息(包名 + 类名)
        Job DAG执行图(在运行后,代码的DAG图不能进行任何修改,否则下次执行的时候会报错<类型不匹配>; 只要DAG图不变,其它API内部的代码执行逻辑可以随便更改)
      Job执行的源数据

    二:程序

    1.程序

     1 package com.stream.it
     2 
     3 import kafka.serializer.StringDecoder
     4 import org.apache.spark.storage.StorageLevel
     5 import org.apache.spark.streaming.kafka.KafkaUtils
     6 import org.apache.spark.streaming.{Seconds, StreamingContext}
     7 import org.apache.spark.{SparkConf, SparkContext}
     8 
     9 object HAKafkaWordcount {
    10   def main(args: Array[String]): Unit = {
    11     val conf=new SparkConf()
    12         .setAppName("spark-streaming-wordcount")
    13           .setMaster("local[*]")
    14     val sc=SparkContext.getOrCreate(conf)
    15     val checkpointDir = "hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/chkdir02"
    16 
    17 
    18     /**
    19       * 构造StreamingContext对象
    20       *
    21       * @return
    22       */
    23     def createStreamingContextFunc(): StreamingContext = {
    24       val ssc = new StreamingContext(sc, Seconds(5))
    25       ssc.checkpoint(checkpointDir)
    26       val kafkaParams=Map("group.id"->"stream-sparking-0",
    27         "zookeeper.connect"->"linux-hadoop01.ibeifeng.com:2181/kafka",
    28         "auto.offset.reset"->"smallest"
    29       )
    30       val topics=Map("beifeng"->1)
    31       val dStream=KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
    32         ssc,             //给定sparkStreaming的上下文
    33         kafkaParams,     //kafka的参数信息,通过kafka HightLevelComsumerApi连接
    34         topics,          //给定读取对应的topic的名称以及读取数据的线程数量
    35         StorageLevel.MEMORY_AND_DISK_2     //数据接收器接收到kafka的数据后的保存级别
    36       ).map(_._2)
    37 
    38 
    39       val resultWordcount=dStream
    40         .filter(line=>line.nonEmpty)
    41         .flatMap(line=>line.split(" ").map((_,1)))
    42         .reduceByKey(_+_)
    43       resultWordcount.foreachRDD(rdd=>{
    44         rdd.foreachPartition(iter=>iter.foreach(println))
    45       })
    46       ssc
    47     }
    48 
    49     val ssc = StreamingContext.getOrCreate(
    50       checkpointPath = checkpointDir,
    51       creatingFunc = createStreamingContextFunc
    52     )
    53 
    54     //启动
    55     ssc.start()
    56     //等到
    57     ssc.awaitTermination()
    58   }
    59 }

    2.注意点

      HA第一次执行后,以后如果代码进行改动(创建StreamingContext的代码改动),不会得到反应(会直接从checkpoint中读取数据进行StreamingContext的恢复) ===> 解决SparkStreaming和Kafka集成的时候offset偏移量管理的问题

  • 相关阅读:
    [IndiaHacks 2016
    [Northern Eurasia Finals Online 2020]D. Down We Dig(记忆化搜索,博弈)
    2018 ICPC Asia Nakhon Pathom Regional Contest-F.Lucky Pascal Triangle(杨辉三角,递归,分治,规律)
    The 2020 ICPC Asia Taipei-Hsinchu Site Programming Contest-C题 Pyramid (思维,递推,规律)
    [2020-2021 ACM-ICPC Brazil Subregional Programming Contest] K. Between Us (高斯消元解异或方程组)
    HDU-6252
    [AtCoder Regular Contest 109]-C
    [2016-2017 ACM-ICPC CHINA-Final]Problem C. Mr. Panda and Strips(DP+线段树)
    失败经1
    [2016-2017 ACM-ICPC CHINA-Final]-Problem H. Great Cells(贡献,组合数学)
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9464277.html
Copyright © 2011-2022 走看看