Spark1.3中新增DirectStream处理Kafka的消息。使用方法如下:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
ssc:StreamContext
kafkaParams:Kafka的参数,包括Kafka的Brokers等
topicsSet:要读取的topic。
此方法创建一个input steam,通过直接从Kafka的Brokers读取消息,而不是创建任何的receiver(即不通过kafka的高级api来读取消息)
此stream保证Kafka的消息只会被处理一次。
* - 无需receivers:该Steam不创建任何receiver。它直接查询Kafka的offset(偏移量),不需要通过zookeeper来保存已经消费过的offset。
当然,现有很多kafka的监控工具都会读取Zookeeper的数据,因此,如果你想继续使用Kafka 的监控工具,需要自己实现代码来更新Zookeeper的offset。
可以参考 org.apache.spark.streaming.kafka.HasOffsetRanges
* - 故障恢复:启用checkpoint机制后,当Driver失败后,可以快速的从故障中恢复。
Driver失败时,当前Kafka的读取的offset也会保存下来,恢复的时候会从此offset继续处理,保证消息不会丢失,并且读取处理一次。
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = {
//创建消息处理
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
(for {
topicPartitions <- kc.getPartitions(topics).right
leaderOffsets <- (if (reset == Some("smallest")) {
//根据topic和partition得到相应的leader节点,并且读取offset的最小值(最小值可能不是0)
kc.getEarliestLeaderOffsets(topicPartitions)
} else {
//根据topic和partition得到相应的leader节点,并且读取offset的最大值,然后此stream只会处理kafka的新增消息,有点想tail命令
kc.getLatestLeaderOffsets(topicPartitions)
}).right
} yield {
val fromOffsets = leaderOffsets.map { case (tp, lo) =>
(tp, lo.offset)
}
//根据ssc、offsets等来创建stream
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}).fold(
errs => throw new SparkException(errs.mkString("
")),
ok => ok
)
}
生成的DirectKafkaInputDStream
class DirectKafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
@transient ssc_ : StreamingContext,
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
) extends InputDStream[R](ssc_) with Logging {
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)
//创建checkpoint的数据
protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData
...
//当前kafka的偏移量,每批消息处理完毕之后,都将此offset进行更新。,此offset也会保存在checkpointData中,如果Driver失败,可以恢复,保证消息不会被遗漏。
protected var currentOffsets = fromOffsets
//获取kafka每个topic的每个partition的leader,并且读取每个leader已经最大的offset。
//如果kafka的leader已经发生变更(由于机器故障灯),也可以尽快的发现
@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
// Either.fold would confuse @tailrec, do it manually
if (o.isLeft) {
val err = o.left.get.toString
if (retries <= 0) {
throw new SparkException(err)
} else {
log.error(err)
Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)
}
} else {
o.right.get
}
}
...
//stream的计算,根据currentOffsets和untilOffsets生成RDD
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}
...
private[streaming]
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
def batchForTime = data.asInstanceOf[mutable.HashMap[
Time, Array[OffsetRange.OffsetRangeTuple]]]
override def update(time: Time) {
batchForTime.clear()
generatedRDDs.foreach { kv =>
val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
batchForTime += kv._1 -> a
}
}
override def cleanup(time: Time) { }
//从失败中恢复的时候,需要重新计算generatedRDDs
override def restore() {
// this is assuming that the topics don't change during execution, which is true currently
val topics = fromOffsets.keySet
val leaders = kc.findLeaders(topics).fold(
errs => throw new SparkException(errs.mkString("
")),
ok => ok
)
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
}
}
}
}