zoukankan      html  css  js  c++  java
  • Spark Streaming读取Kafka数据的两种方式

    Kafka在0.80.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8spark-streaming-kafka-0-10。在使用时应注意以下几点:

    1. spark-streaming-kafka-0-8兼容Kafka 0.8.2.1及以后的版本, 从Spark 2.3.0开始,对Kafka 0.8支持已被标记为过时。

    2. spark-streaming-kafka-0-10兼容Kafka 0.10.0及以后的版本, 从Spark 2.3.0开始, 此API是稳定版。

    3. 如果Kafka版本大于等于0.10.0,且Spark版本大于等于Spark 2.3.0,应使用spark-streaming-kafka-0-10

    本文总结spark-streaming-kafka-0-8中两种读取Kafka数据的方式:createStreamcreateDirectStream

    基于Receiver方式

    POM依赖

     1 <dependencies>
     2      <!--spark-streaming-->
     3      <dependency>
     4          <groupId>org.apache.spark</groupId>
     5          <artifactId>spark-streaming_2.11</artifactId>
     6          <version>2.2.2</version>
     7      </dependency>
     8 
     9      <!--spark-streaming-kafka-plugin-->
    10      <dependency>
    11          <groupId>org.apache.spark</groupId>
    12          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    13          <version>2.2.2</version>
    14      </dependency>
    15  </dependencies>

    示例一

     1 // 1、Kafka配置
     2  // 配置zookeeper集群、消费者组
     3  val kafkaParams = Map(
     4    "zookeeper.connect" -> "localhost:2181",
     5    "group.id" -> groupID)
     6 
     7  // 2、topic_name与numThreads的映射
     8  // topic有几个partition,就写几个numThreads。
     9  // 每个partition对应一个单独线程从kafka取数据到Spark Streaming
    10  val topics = Map(topicName -> numThreads)
    11 
    12  // 3、ReceiverInputDStream
    13  // 注意:应先import kafka.serializer.StringDecoder再import org.apache.spark.streaming._
    14  val kafkaStream= KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
    15    ssc,
    16    kafkaParams,
    17    topics,
    18    StorageLevel.MEMORY_AND_DISK_SER_2)

    示例二

     1 // 1、topic_name与numThreads的映射
     2  // topic有几个partition,就写几个numThreads。
     3  // 每个partition对应一个单独线程从kafka取数据到Spark Streaming
     4  val topics = Map(topicName -> numThreads)
     5 
     6  // 2、ReceiverInputDStream
     7  // 底层先根据zkQuorum、groupId 构造kafkaParams,
     8  // 然后再调用createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, storageLevel)
     9  val kafkaStream=KafkaUtils.createStream(
    10    ssc=ssc,
    11    zkQuorum="localhost:2181",
    12    groupId = groupID,
    13    topics,
    14    StorageLevel.MEMORY_AND_DISK_SER_2
    15  )

    特点

    1. 需要使用单独的Receiver线程来异步获取Kafka数据。

    2. Receiver底层实现中使用了Kafka高级消费者API,因此,不需要自己管理Offset,只需指定Zookeeper和消费者组GroupID,系统便会自行管理。

    3. 执行过程: Spark Streaming启动时,会在Executor中同时启动Receiver异步线程用于从Kafka持续获取数据,获取的数据先存储在Receiver中(存储方式由StorageLevel决定),后续,当Batch Job触发后,这些数据会被转移到剩下的Executor中被处理。处理完毕后,Receiver会自动更新Zookeeper中的Offset。

    4. 默认情况下,程序失败或Executor宕掉后可能会丢失数据,为避免数据丢失,可启用预写日志(Write Ahead Log,WAL)。将Receiver收到的数据再备份一份到更可靠的系统如HDFS分布式文件中,以冗余的数据来换取数据不丢失。

    5. 生产下,为保证数据完全不丢失,一般需要启用WAL。启用WAL,在数据量较大,网络不好情况下,会严重降低性能。


    基于Direct(No Receiver)方式

    POM依赖

     1 <dependencies>
     2      <!--spark-streaming-->
     3      <dependency>
     4          <groupId>org.apache.spark</groupId>
     5          <artifactId>spark-streaming_2.11</artifactId>
     6          <version>2.3.1</version>
     7      </dependency>
     8 
     9      <!--spark-streaming-kafka-plugin-->
    10      <dependency>
    11          <groupId>org.apache.spark</groupId>
    12          <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    13          <version>2.3.1</version>
    14      </dependency>
    15  </dependencies>

    示例一

     1 // 1、Kafka配置
     2  // auto.offset.reset=latest 无提交的offset时,从最新的开始消费
     3  // enable.auto.commit=false 禁用后台自动提交offset,自己手动管理
     4  val kafkaParams = Map[String, Object](
     5    "bootstrap.servers" -> "localhost:9092",
     6    "key.deserializer" -> classOf[StringDeserializer],
     7    "value.deserializer" -> classOf[StringDeserializer],
     8    "auto.offset.reset" -> "latest",
     9    "enable.auto.commit" -> (false: java.lang.Boolean),
    10    "group.id" -> groupID)
    11 
    12  // 2、DirectKafkaInputDStream
    13  // LocationStrategies:本地策略。为提升性能,可指定Kafka Topic Partition的消费者所在的Executor。
    14  // LocationStrategies.PreferConsistent:一致性策略。一般情况下用这个策略就OK。将分区尽可能分配给所有可用Executor。
    15  // LocationStrategies.PreferBrokers:特殊情况,如果Executor和Kafka Broker在同一主机,则可使用此策略。
    16  // LocationStrategies.PreferFixed:特殊情况,当Kafka Topic Partition负荷倾斜,可用此策略,手动指定Executor来消费特定的Partition.
    17  // ConsumerStrategies:消费策略。
    18  // ConsumerStrategies.Subscribe/SubscribePattern:可订阅一类Topic,且当新Topic加入时,会自动订阅。一般情况下,用这个就OK。
    19  // ConsumerStrategies.Assign:可指定要消费的Topic-Partition,以及从指定Offset开始消费。
    20  val kafkaStream=KafkaUtils.createDirectStream[String,String](
    21    ssc,
    22    LocationStrategies.PreferConsistent,
    23    ConsumerStrategies.Subscribe[String,String](List(topicName),kafkaParams)
    24  )

    示例二

     1 // 1、Kafka配置
     2  // auto.offset.reset=latest 无提交的offset时,从最新的开始消费
     3  // enable.auto.commit=false 禁用后台自动提交offset,自己手动管理
     4  val kafkaParams = Map[String, Object](
     5    "bootstrap.servers" -> "localhost:9092",
     6    "key.deserializer" -> classOf[StringDeserializer],
     7    "value.deserializer" -> classOf[StringDeserializer],
     8    "auto.offset.reset" -> "latest",
     9    "enable.auto.commit" -> (false: java.lang.Boolean),
    10    "group.id" -> groupID)
    11 
    12  // 2、DirectKafkaInputDStream
    13  // LocationStrategies.PreferConsistent:一致性策略。
    14  // ConsumerStrategies.Assign:从指定Topic-Partition的Offset开始消费。
    15  val initOffset=Map(new TopicPartition(topicName,0)->10L)
    16  val kafkaStream=KafkaUtils.createDirectStream[String,String](
    17    ssc,
    18    LocationStrategies.PreferConsistent,
    19    ConsumerStrategies.Assign[String,String](initOffset.keys,kafkaParams,initOffset)
    20  )

    特点

    1. 不需要使用单独的Receiver线程从Kafka获取数据。

    2. 使用Kafka简单消费者API,不需要ZooKeeper参与,直接从Kafka Broker获取数据。

    3. 执行过程:Spark Streaming Batch Job触发时,Driver端确定要读取的Topic-Partition的OffsetRange,然后由Executor并行从Kafka各Partition读取数据并计算。

    4. 为保证整个应用EOS, Offset管理一般需要借助外部存储实现。如Mysql、HBase等。

    5. 由于不需要WAL,且Spark Streaming会创建和Kafka Topic Partition一样多的RDD Partition,且一一对应,这样,就可以并行读取,大大提高了性能。

    6. Spark Streaming应用启动后,自己通过内部currentOffsets变量跟踪Offset,避免了基于Receiver的方式中Spark Streaming和Zookeeper中的Offset不一致问题。

  • 相关阅读:
    应用图标大小
    AndroidStudio使用笔记
    shell 三剑客之 sed 命令详解
    shell 三剑客之 sed pattern 详解
    shell 文本处理三剑客之 grep 和 egrep
    Shell 编程中的常用工具
    shell 函数的高级用法
    shell 数学运算
    shell 变量的高级用法
    nginx 之 https 证书配置
  • 原文地址:https://www.cnblogs.com/fnlingnzb-learner/p/13429762.html
Copyright © 2011-2022 走看看