zoukankan      html  css  js  c++  java
  • SparkStreaming HA高可用性

    1、UpdateStateByKey、windows等有状态的操作时,自动进行checkpoint,必须设置checkpoint目录,数据保留一份在容错的文件系统中,一旦内存中的数据丢失,可以从文件系统中读取数据,不需要重新计算。

    SparkStreaming.checkpoint("hdfs://ip:port/checkpoint")

    2、Driver高可用性

    一、Java版

    第一次在创建和启动StreamingContext的时候,那么将持续不断的产生实时计算的元数据并写入检查点,如果driver节点挂掉,那么可以让Spark集群自动重启集群(必须使用yarn cluster模式,spark-submit --deploy-mode cluster --supervise ....),然后继续运行计算程序,没有数据丢失。

    private static void testDriverHA() {

      final Streaming checkpointDir="hdfs://ip:port/checkpoint";

      JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {

      @Override
      public JavaStreamingContext create() {
        SparkConf conf = new SparkConf()
          .setMaster("local[2]")
          .setAppName("AdClickRealTimeStatSpark");

        JavaStreamingContext jssc = new JavaStreamingContext(
              conf, Durations.seconds(5));
        jssc.checkpoint(checkpointDir);

        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put(Constants.KAFKA_METADATA_BROKER_LIST,
          ConfigurationManager.getProperty(Constants.KAFKA_METADATA_BROKER_LIST));
        String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
        String[] kafkaTopicsSplited = kafkaTopics.split(",");
        Set<String> topics = new HashSet<String>();
        for(String kafkaTopic : kafkaTopicsSplited) {
          topics.add(kafkaTopic);
        }

        JavaPairInputDStream<String, String> adRealTimeLogDStream = KafkaUtils.createDirectStream(
          jssc,
          String.class,
          String.class,
          StringDecoder.class,
          StringDecoder.class,
          kafkaParams,
          topics);

        JavaPairDStream<String, String> filteredAdRealTimeLogDStream =
          filterByBlacklist(adRealTimeLogDStream);
        generateDynamicBlacklist(filteredAdRealTimeLogDStream);
        JavaPairDStream<String, Long> adRealTimeStatDStream = calculateRealTimeStat(
          filteredAdRealTimeLogDStream);
        calculateProvinceTop3Ad(adRealTimeStatDStream);
        calculateAdClickCountByWindow(adRealTimeLogDStream);
        return jssc;
        }
      };

      JavaStreamingContext context = JavaStreamingContext.getOrCreate(
      checkpointDir, contextFactory);
      context.start();
      context.awaitTermination();

    }

    二、Scala版

    package cn.piesat.spark

    import org.apache.kafka.clients.consumer.{ConsumerRecord}
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    object SparkStreamingKafka {
    private val brokers = "hadoop01:9092"
    private val topics = "lj01"
    private val checkPointPath = "hdfs://hadoop01:9000/sparkStreaming/kafka6"

    def main(args: Array[String]): Unit = {
    val spark = getSparkSession()
    val streamingContext = StreamingContext.getOrCreate(checkPointPath, () => {
    val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
    ssc.checkpoint(checkPointPath)
    val kafkaInputStream = getKafkaInputStream(ssc)
    val result = kafkaInputStream.map(x => x.value()).flatMap(x => {
    x.split(" ").map(x => {
    (x, 1)
    })
    }).reduceByKey(_ + _)
    result.print()
    ssc
    })
    streamingContext.start()
    streamingContext.awaitTermination()
    }

    def getSparkSession(): SparkSession = {
    SparkSession.builder()
    .appName("kafka_test")
    .master("local[4]")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
    }

    def getKafkaInputStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
    val topicArray = topics.split(",").toList
    val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "lj00",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topicArray, kafkaParams)
    )
    }

    }

    注意:对streaming的操作逻辑必须写在StreamingContext.getOrCreate()方法里,因为若是第二次恢复时则执行方法里的逻辑!!!

    3、实现RDD高可用性,启动WAL预写日志机制

    sparkStreaming从原理上说,是通过receiver来进行数据接收的,接收到时的数据,会被划分成一个个的block,block会被组合成batch,针对一个batch,会创建一个Rdd,启动一个job来执行定义的算子操作。receiver主要接收到数据,那么就会立即将数据写入一份到时容错文件系统(比如hdfs)上的checkpoint目录中的,一份磁盘文件中去,作为数据的冗余副本。

      SparkConf conf = new SparkConf()
        .setMaster("local[2]")
        .setAppName("AdClickRealTimeStatSpark")
        .set("spark.streaming.receiver.writeAheadLog.enable","true");

  • 相关阅读:
    How to Enable Trace or Debug for APIs executed as SQL Script Outside of the Applications ?
    Android中MVC、MVP、MVVM具体解释
    破坏性创新第一原则
    Android学习笔记(八)——显示运行进度对话框
    Hadoop知识汇总
    mybatis collection和association使用区别
    mybatis $和#
    IDEA新建一个多maven模块工程(有图)
    可输入的下拉框
    springboot 使用i18n进行国际化
  • 原文地址:https://www.cnblogs.com/runnerjack/p/9684027.html
Copyright © 2011-2022 走看看