zoukankan      html  css  js  c++  java
  • 066 基于checkpoint的HA机制实现

    1.说明

      针对需要恢复的应用场景,提供了HA的的机制

      内部实现原理:基于checkpoint的

      当程序被kill的时候,下次恢复的时候,会从checkpoint对用的文件中进行数据的恢复

    2.HA原理

      当job执行的时候,将数据同步到checkpoint设置的对应文件夹中
      同步的数据包括:
        类的信息(包名 + 类名)
        Job DAG执行图(在运行后,代码的DAG图不能进行任何修改,否则下次执行的时候会报错<类型不匹配>; 只要DAG图不变,其它API内部的代码执行逻辑可以随便更改)
      Job执行的源数据

    二:程序

    1.程序

     1 package com.stream.it
     2 
     3 import kafka.serializer.StringDecoder
     4 import org.apache.spark.storage.StorageLevel
     5 import org.apache.spark.streaming.kafka.KafkaUtils
     6 import org.apache.spark.streaming.{Seconds, StreamingContext}
     7 import org.apache.spark.{SparkConf, SparkContext}
     8 
     9 object HAKafkaWordcount {
    10   def main(args: Array[String]): Unit = {
    11     val conf=new SparkConf()
    12         .setAppName("spark-streaming-wordcount")
    13           .setMaster("local[*]")
    14     val sc=SparkContext.getOrCreate(conf)
    15     val checkpointDir = "hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/chkdir02"
    16 
    17 
    18     /**
    19       * 构造StreamingContext对象
    20       *
    21       * @return
    22       */
    23     def createStreamingContextFunc(): StreamingContext = {
    24       val ssc = new StreamingContext(sc, Seconds(5))
    25       ssc.checkpoint(checkpointDir)
    26       val kafkaParams=Map("group.id"->"stream-sparking-0",
    27         "zookeeper.connect"->"linux-hadoop01.ibeifeng.com:2181/kafka",
    28         "auto.offset.reset"->"smallest"
    29       )
    30       val topics=Map("beifeng"->1)
    31       val dStream=KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
    32         ssc,             //给定sparkStreaming的上下文
    33         kafkaParams,     //kafka的参数信息,通过kafka HightLevelComsumerApi连接
    34         topics,          //给定读取对应的topic的名称以及读取数据的线程数量
    35         StorageLevel.MEMORY_AND_DISK_2     //数据接收器接收到kafka的数据后的保存级别
    36       ).map(_._2)
    37 
    38 
    39       val resultWordcount=dStream
    40         .filter(line=>line.nonEmpty)
    41         .flatMap(line=>line.split(" ").map((_,1)))
    42         .reduceByKey(_+_)
    43       resultWordcount.foreachRDD(rdd=>{
    44         rdd.foreachPartition(iter=>iter.foreach(println))
    45       })
    46       ssc
    47     }
    48 
    49     val ssc = StreamingContext.getOrCreate(
    50       checkpointPath = checkpointDir,
    51       creatingFunc = createStreamingContextFunc
    52     )
    53 
    54     //启动
    55     ssc.start()
    56     //等到
    57     ssc.awaitTermination()
    58   }
    59 }

    2.注意点

      HA第一次执行后,以后如果代码进行改动(创建StreamingContext的代码改动),不会得到反应(会直接从checkpoint中读取数据进行StreamingContext的恢复) ===> 解决SparkStreaming和Kafka集成的时候offset偏移量管理的问题

  • 相关阅读:
    此查询使用的不是 ANSI 外部联接运算符
    centos重启命令
    updatePanel 加载完成后回调JS
    建站推荐十个免费的CMS内容管理系统(Php+mysql)
    [转]最值得拥有的免费Bootstrap后台管理模板
    Got a packet bigger than 'max_allowed_packet' bytes”
    ECshop商城程序常见的96个小问题汇总
    linux 命令
    mysql 存储过程
    千万级记录的Discuz论坛导致MySQL CPU 100%的优化笔记
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9464277.html
Copyright © 2011-2022 走看看