spark streaming 读取kafka topic上json格式数据,存储为parquet文件;使用redis存储offset;因为是将数据存储下来,没能使用事务,本文不能实现exactly once语义;基于幂等的角度,可以考虑数据设置唯一标志,进行merge去重,来实现exactly once。
package com.abc.etl package spark import java.util.{HashSet => JavaHashSet, Set => JavaSet} import cn.hutool.core.util.StrUtil import com.alibaba.fastjson.JSON import com.alibaba.fastjson.parser.Feature import org.apache.commons.pool2.impl.GenericObjectPoolConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext, TaskContext} import redis.clients.jedis.{Jedis, JedisSentinelPool} import scala.collection.mutable.ListBuffer object MoreTopic { /** * redis中储存的topic偏移量的数据模型: topic_partition <- offset * * @param topics * @param jedis * @return 所有TopicPartition的offset */ def getRedisOffset(topics: collection.Set[String], jedis: Jedis): collection.Map[TopicPartition, Long] = { val res: collection.mutable.HashMap[TopicPartition, Long] = collection.mutable.HashMap.empty; for (topic <- topics) { val topicPartitionKeys: JavaSet[String] = jedis.keys(topic + StrUtil.UNDERLINE + "*") val iterator = topicPartitionKeys.iterator() while (iterator.hasNext) { val topicPartitionKey = iterator.next() val offset = jedis.get(topicPartitionKey) val topic = topicPartitionKey.split(StrUtil.UNDERLINE)(0) val partition = topicPartitionKey.split(StrUtil.UNDERLINE)(1) res.put(new TopicPartition(topic, partition.toInt), offset.toLong) } } res } def main(args: Array[String]): Unit = { val duration = 20 val appName = "sparkstreamingkafka2" val conf = SparkConfSingleton.getInstance().setAppName(appName).setMaster("local[2]") val ssc = new StreamingContext(SparkContextSingleton.getInstance(conf), Seconds(duration)) val kafkaParams = Map[String, Object]( "auto.offset.reset" -> "earliest", "value.deserializer" -> classOf[StringDeserializer] , "key.deserializer" -> classOf[StringDeserializer] , "bootstrap.servers" -> "zk1:9092,zk2:9092,zk3:9092" , "group.id" -> "hellokitty" , "enable.auto.commit" -> (false: java.lang.Boolean) ) val sentinels = new JavaHashSet[String] { { add("zk2:26379"); add("zk3:26379"); } } val master = "mymaster"; val jedis = JedisSentinelPoolSingleton.getInstance(master, sentinels).getResource var stream: InputDStream[ConsumerRecord[String, String]] = null val topics = Set("Kafka2Hdfs") stream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, getRedisOffset(topics, jedis)) ) jedis.close() stream.foreachRDD(rdd => { if (!rdd.isEmpty()) { //require rdd format is Rdd[ConsumerRecord],ref https://blog.csdn.net/xianpanjia4616/article/details/85871063 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition(partition => { val sc = SparkContextSingleton.getInstance(conf) val o = offsetRanges(TaskContext.get.partitionId) println("reach position .................: " + s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") val list = ListBuffer.empty[Row] while (partition.hasNext) { val consumerRecord = partition.next() val json = consumerRecord.value() println("json is : " + json) val jsonObject = JSON.parseObject(json, Feature.OrderedField) val values = jsonObject.values().toArray() val row = Row.apply(values: _*) list += row } val rowRdd = sc.makeRDD(list) val schema = StructType( List( StructField("key1", StringType, true), StructField("value1", StringType, true), StructField("key1.type", StringType, true) ) ) val sqlContext = SQLContextSingleton.getInstance(sc) val df = sqlContext.createDataFrame(rowRdd, schema) df.write.format("parquet").mode("append").save("sparkStreamingKafka2HdfsData") val jedis = JedisSentinelPoolSingleton.getInstance(master, sentinels).getResource offsetRanges.foreach { offsetRange => println("partition : " + offsetRange.partition + " fromOffset: " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset) val topic_partition_key_new = offsetRange.topic + StrUtil.UNDERLINE + offsetRange.partition jedis.set(topic_partition_key_new, offsetRange.untilOffset + "") } jedis.close() }) } }) ssc.start() ssc.awaitTermination() } object JedisSentinelPoolSingleton { @transient private var instance: JedisSentinelPool = _ def getInstance(master: String, sentinels: JavaHashSet[String]): JedisSentinelPool = { if (instance == null) { val gPoolConfig = new GenericObjectPoolConfig(); gPoolConfig.setMaxIdle(10); gPoolConfig.setMaxTotal(10); gPoolConfig.setMaxWaitMillis(10); gPoolConfig.setJmxEnabled(true); instance = new JedisSentinelPool(master, sentinels, gPoolConfig) } instance } } object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } } object SparkContextSingleton { @transient private var instance: SparkContext = _ def getInstance(sparkConf: SparkConf): SparkContext = { if (instance == null) { instance = new SparkContext(sparkConf) } instance } } object SparkConfSingleton { @transient private var instance: SparkConf = _ def getInstance(): SparkConf = { if (instance == null) { instance = new SparkConf() } instance } } }
特别依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.2</version> </dependency> <!-- fastjson--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.37</version> </dependency> <!-- redis客户端--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> <!--hutool --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>4.5.10</version> </dependency>
参考:https://blog.csdn.net/xianpanjia4616/article/details/81709075