zoukankan      html  css  js  c++  java
  • 066 基于checkpoint的HA机制实现

    1.说明

      针对需要恢复的应用场景,提供了HA的的机制

      内部实现原理:基于checkpoint的

      当程序被kill的时候,下次恢复的时候,会从checkpoint对用的文件中进行数据的恢复

    2.HA原理

      当job执行的时候,将数据同步到checkpoint设置的对应文件夹中
      同步的数据包括:
        类的信息(包名 + 类名)
        Job DAG执行图(在运行后,代码的DAG图不能进行任何修改,否则下次执行的时候会报错<类型不匹配>; 只要DAG图不变,其它API内部的代码执行逻辑可以随便更改)
      Job执行的源数据

    二:程序

    1.程序

     1 package com.stream.it
     2 
     3 import kafka.serializer.StringDecoder
     4 import org.apache.spark.storage.StorageLevel
     5 import org.apache.spark.streaming.kafka.KafkaUtils
     6 import org.apache.spark.streaming.{Seconds, StreamingContext}
     7 import org.apache.spark.{SparkConf, SparkContext}
     8 
     9 object HAKafkaWordcount {
    10   def main(args: Array[String]): Unit = {
    11     val conf=new SparkConf()
    12         .setAppName("spark-streaming-wordcount")
    13           .setMaster("local[*]")
    14     val sc=SparkContext.getOrCreate(conf)
    15     val checkpointDir = "hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/chkdir02"
    16 
    17 
    18     /**
    19       * 构造StreamingContext对象
    20       *
    21       * @return
    22       */
    23     def createStreamingContextFunc(): StreamingContext = {
    24       val ssc = new StreamingContext(sc, Seconds(5))
    25       ssc.checkpoint(checkpointDir)
    26       val kafkaParams=Map("group.id"->"stream-sparking-0",
    27         "zookeeper.connect"->"linux-hadoop01.ibeifeng.com:2181/kafka",
    28         "auto.offset.reset"->"smallest"
    29       )
    30       val topics=Map("beifeng"->1)
    31       val dStream=KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
    32         ssc,             //给定sparkStreaming的上下文
    33         kafkaParams,     //kafka的参数信息,通过kafka HightLevelComsumerApi连接
    34         topics,          //给定读取对应的topic的名称以及读取数据的线程数量
    35         StorageLevel.MEMORY_AND_DISK_2     //数据接收器接收到kafka的数据后的保存级别
    36       ).map(_._2)
    37 
    38 
    39       val resultWordcount=dStream
    40         .filter(line=>line.nonEmpty)
    41         .flatMap(line=>line.split(" ").map((_,1)))
    42         .reduceByKey(_+_)
    43       resultWordcount.foreachRDD(rdd=>{
    44         rdd.foreachPartition(iter=>iter.foreach(println))
    45       })
    46       ssc
    47     }
    48 
    49     val ssc = StreamingContext.getOrCreate(
    50       checkpointPath = checkpointDir,
    51       creatingFunc = createStreamingContextFunc
    52     )
    53 
    54     //启动
    55     ssc.start()
    56     //等到
    57     ssc.awaitTermination()
    58   }
    59 }

    2.注意点

      HA第一次执行后,以后如果代码进行改动(创建StreamingContext的代码改动),不会得到反应(会直接从checkpoint中读取数据进行StreamingContext的恢复) ===> 解决SparkStreaming和Kafka集成的时候offset偏移量管理的问题

  • 相关阅读:
    Hbase性能调优(一)
    文章标题
    JDBC的PreparedStatement启动事务使用批处理executeBatch()
    java.lang.OutOfMemoryError: PermGen space及其解决方法
    linux 关机命令总结
    oracle 启动关闭以及监听启动关闭命令
    bash: sqlplus: command not found 解决方法
    VMware 虚拟机 linux执行 ifconfig 命令 eth0没有IP地址(intet addr、Bcast、Mask) UP BROADCAST MULTICAST 问题
    Linux的文本编辑和文本内容查看命令
    RHEL6服务器网络配置 修改linux RHEL6系统下的ip方法
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9464277.html
Copyright © 2011-2022 走看看