写在前面:
在使用SparkStreaming 整合 Kafka 0.8版本的时候, spark-streaming-kafka-0-8 是不提供offset的管理的。为了保证数据零丢失,我们需要自己来管理这个偏移量。
参照:http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html
我们是将偏移量储存在MySQL中进行管理,
快速入门使用scalikejdbc 操作MySQL:
1.导入依赖
<!--scalikejdbc-config_2.11-->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-config_2.11</artifactId>
<version>2.5.0</version>
</dependency>
2.resource文件中新建application.conf文件,配置如下
# MySQL example 本地
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8"
db.default.user="root"
db.default.password="123456"
3.Scalikejdbc Demo
package com.csylh.logAnalysis.scalikejdbc
import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs
/**
* scala中连接mysql,使用ScaLikeJdbc框架进行数据的增删改查
*/
object ScaLikeJdbcApp {
def main(args: Array[String]): Unit = {
//解析application.conf的文件
DBs.setup()
//DBs.setupAll()
DB.autoCommit {
implicit session =>
SQL("insert into people(name,age,fv) values(?,?,?)")
.bind("留歌", 22, 88)
.update().apply()
}
}
def delete() = {
DB.autoCommit {
implicit session =>
SQL("delete from people where name = ?")
.bind("留歌")
.update().apply()
}
}
def update() {
DB.autoCommit { implicit session =>
SQL("update people set age = ? where name = ?")
.bind(18, "留歌")
.update().apply()
}
}
/**
* select查询到数据之后会产生一个rs的对象集,然后可以得到这个对象集里面的数据
*/
def select() = {
DB.readOnly {
implicit session =>
val sql = SQL("select * from people ").map(rs =>
(rs.string("name"), rs.int("age"))
).toList().apply()
}
}
}
零丢失数据解决方案代码如下:
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scalikejdbc.config.DBs
/**
* Description: Spark Streaming 整合Kafka的 偏移量 管理
* 其实可以在 MySQL/zk/kafka/hbase/redis ...中存储我们的的offset偏移量数据的
* 这里选用MySQL + scalikejdbc
*
* @Author: 留歌36
* @Date: 2019/8/8 11:24
*/
object OffsetApp {
def main(args:Array[String]){
val conf= new SparkConf().setAppName("OffsetApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map(
"metadata.broker.list"->"192.168.1.116:9092",
"group.id"->"liuge.group.id" ,// 消费的时候是以组为单位进行消费
"auto.offset.reset" -> "smallest"
)
val topics = "test".split(",").toSet
// 步骤一:先获取offset
import scalikejdbc._
/**
* tuple 转 map
* ().list.apply().toMap
*/
DBs.setup()
val fromOffsets =
DB.readOnly {
implicit session => {
SQL("select * from offsets_storage ").map(rs =>
(TopicAndPartition(rs.string("topic"), rs.int("partitions")),rs.long("offset"))
).list().apply()
}
}.toMap
for (ele <- fromOffsets){
println("读取MySQL偏移量相关数据: " + ele._1.topic + ":" + ele._1.partition +":"+ele._2)
}
// 步骤二: Direct 模式对接Kafka ,得到InputDStream
val stream = if (fromOffsets.isEmpty){
// 第一次进来,进行消费Kafka ==> stream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)
}else{ // 非第一次,进行消费Kafka ==> stream
val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(), mm.message())
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
}
// 步骤三: 业务逻辑 + 保存offset
stream.foreachRDD(rdd => {
// TODO.. 3.1这里是你的业务逻辑代码,简单count()为例
println("留歌本轮的统计结果:" + rdd.count())
// 3.2这里是保存offset数据的代码
var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
println("消费数据从多少到多少:"+s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
DB.autoCommit{
implicit session => {
SQL("replace into offsets_storage(topic,groupid,partitions,offset) values(?,?,?,?)")
.bind(o.topic, "liuge.group.id", o.partition, o.untilOffset).update().apply()
}
}
}
})
ssc.start()
ssc.awaitTermination()
}
}
到这里,我们就能够比较好的去消费Kafka的数据了。
有任何不对的地方,欢迎指证!谢谢~~