spark streaming中维护kafka偏移量到外部介质
以kafka偏移量维护到redis为例。
redis存储格式
使用的数据结构为string
,其中key为topic:partition
,value为offset
。
例如bobo
这个topic
下有3个分区,则key-value结构如下:
bobo:0
的偏移量为xbobo:1
的偏移量为ybobo:2
的偏移量为z
消费时指定offset
主要是如下两个方法:
createKafkaStream()
创建kakfa流getOffsets()
从redis中获取offsets
/**
* kakfa参数
*/
private val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
// 注意这里是none。
"auto.offset.reset" -> "none",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// `bobo`topic下有3个分区
private val topicPartitions = Map[String, Int]("bobo" -> 3)
// 从redis中获取offsets
def getOffsets: Map[TopicPartition, Long] = {
val jedis = InternalRedisClient.getResource
// 设置每个分区起始的offset
val offsets = mutable.Map[TopicPartition, Long]()
topicPartitions.foreach { it =>
val topic = it._1
val partitions = it._2
// 遍历分区,设置每个topic下对应partition的offset
for (partition <- 0 until partitions) {
val topicPartitionKey = topic + ":" + partition
var lastOffset = 0L
val lastSavedOffset = jedis.get(topicPartitionKey)
if (null != lastSavedOffset) {
try {
lastOffset = lastSavedOffset.toLong
} catch {
case e: Exception =>
log.error("get lastSavedOffset error", e)
System.exit(1)
}
}
log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset)
// 添加
offsets += (new TopicPartition(topic, partition) -> lastOffset)
}
}
InternalRedisClient.returnResource(jedis)
offsets.toMap
}
/**
* 创建kakfa流
*
* @param ssc StreamingContext
* @return InputDStream
*/
def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
val offsets = getOffsets
// 创建kafka stream
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets)
)
stream
}
其中:核心是通过ConsumerStrategies.Assign
方法来指定topic
下对应partition
的offset
信息。
更新offset到redis
最后将offset信息维护到redis即可。
/**
* 消费
*
* @param stream InputDStream
*/
def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
stream.foreachRDD { rdd =>
// 获取offset信息
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 计算相关指标,这里就统计下条数了
val total = rdd.count()
val jedis = InternalRedisClient.getResource
val pipeline = jedis.pipelined()
// 会阻塞redis
pipeline.multi()
// 更新相关指标
pipeline.incrBy("totalRecords", total)
// 更新offset
offsetRanges.foreach { offsetRange =>
log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset)
val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition
pipeline.set(topicPartitionKey, offsetRange.untilOffset + "")
}
// 执行,释放
pipeline.exec()
pipeline.sync()
pipeline.close()
InternalRedisClient.returnResource(jedis)
}
}
参考
spark代码
顺便贴一下自己整理的spark相关的代码。
Github地址:spark-programming
主要包括:
- RDD的基本使用
- SQL
- jdbc(读、写)
- hive(读、写、动态分区)
- Streaming
- 消费kafka(手动提交、手动维护offset)
- 写入HBase
- 写入Hive