zoukankan      html  css  js  c++  java
  • spark streaming 读取kafka数据保存到parquet文件,redis存储offset

     spark streaming 读取kafka topic上json格式数据,存储为parquet文件;使用redis存储offset;因为是将数据存储下来,没能使用事务,本文不能实现exactly  once语义;基于幂等的角度,可以考虑数据设置唯一标志,进行merge去重,来实现exactly once。

    package com.abc.etl
    
    
    package spark
    
    import java.util.{HashSet => JavaHashSet, Set => JavaSet}
    
    import cn.hutool.core.util.StrUtil
    import com.alibaba.fastjson.JSON
    import com.alibaba.fastjson.parser.Feature
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext, TaskContext}
    import redis.clients.jedis.{Jedis, JedisSentinelPool}
    
    import scala.collection.mutable.ListBuffer
    
    object MoreTopic {
    
      /**
        * redis中储存的topic偏移量的数据模型:  topic_partition <- offset
        *
        * @param topics
        * @param jedis
        * @return 所有TopicPartition的offset
        */
      def getRedisOffset(topics: collection.Set[String], jedis: Jedis): collection.Map[TopicPartition, Long] = {
        val res: collection.mutable.HashMap[TopicPartition, Long] = collection.mutable.HashMap.empty;
        for (topic <- topics) {
          val topicPartitionKeys: JavaSet[String] = jedis.keys(topic + StrUtil.UNDERLINE + "*")
          val iterator = topicPartitionKeys.iterator()
          while (iterator.hasNext) {
            val topicPartitionKey = iterator.next()
            val offset = jedis.get(topicPartitionKey)
            val topic = topicPartitionKey.split(StrUtil.UNDERLINE)(0)
            val partition = topicPartitionKey.split(StrUtil.UNDERLINE)(1)
            res.put(new TopicPartition(topic, partition.toInt), offset.toLong)
          }
        }
        res
      }
    
      def main(args: Array[String]): Unit = {
        val duration = 20
        val appName = "sparkstreamingkafka2"
    
        val conf = SparkConfSingleton.getInstance().setAppName(appName).setMaster("local[2]")
        val ssc = new StreamingContext(SparkContextSingleton.getInstance(conf), Seconds(duration))
    
        val kafkaParams = Map[String, Object](
          "auto.offset.reset" -> "earliest",
          "value.deserializer" -> classOf[StringDeserializer]
          , "key.deserializer" -> classOf[StringDeserializer]
          , "bootstrap.servers" -> "zk1:9092,zk2:9092,zk3:9092"
          , "group.id" -> "hellokitty"
          , "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val sentinels = new JavaHashSet[String] {
          {
            add("zk2:26379");
            add("zk3:26379");
          }
        }
        val master = "mymaster";
    
        val jedis = JedisSentinelPoolSingleton.getInstance(master, sentinels).getResource
    
        var stream: InputDStream[ConsumerRecord[String, String]] = null
        val topics = Set("Kafka2Hdfs")
    
        stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, getRedisOffset(topics, jedis))
        )
        jedis.close()
    
        stream.foreachRDD(rdd => {
          if (!rdd.isEmpty()) {
            //require rdd format is Rdd[ConsumerRecord],ref https://blog.csdn.net/xianpanjia4616/article/details/85871063
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
            rdd.foreachPartition(partition => {
              val sc = SparkContextSingleton.getInstance(conf)
              val o = offsetRanges(TaskContext.get.partitionId)
              println("reach position .................: " + s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
    
              val list = ListBuffer.empty[Row]
              while (partition.hasNext) {
                val consumerRecord = partition.next()
                val json = consumerRecord.value()
                println("json is : " + json)
                val jsonObject = JSON.parseObject(json, Feature.OrderedField)
                val values = jsonObject.values().toArray()
                val row = Row.apply(values: _*)
                list += row
              }
              val rowRdd = sc.makeRDD(list)
              val schema = StructType(
                List(
                  StructField("key1", StringType, true),
                  StructField("value1", StringType, true),
                  StructField("key1.type", StringType, true)
                )
              )
              val sqlContext = SQLContextSingleton.getInstance(sc)
              val df = sqlContext.createDataFrame(rowRdd, schema)
              df.write.format("parquet").mode("append").save("sparkStreamingKafka2HdfsData")
    
    
              val jedis = JedisSentinelPoolSingleton.getInstance(master, sentinels).getResource
              offsetRanges.foreach { offsetRange =>
                println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
                val topic_partition_key_new = offsetRange.topic + StrUtil.UNDERLINE + offsetRange.partition
                jedis.set(topic_partition_key_new, offsetRange.untilOffset + "")
              }
              jedis.close()
            })
          }
    
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    
      object JedisSentinelPoolSingleton {
        @transient private var instance: JedisSentinelPool = _
    
        def getInstance(master: String, sentinels: JavaHashSet[String]): JedisSentinelPool = {
          if (instance == null) {
            val gPoolConfig = new GenericObjectPoolConfig();
            gPoolConfig.setMaxIdle(10);
            gPoolConfig.setMaxTotal(10);
            gPoolConfig.setMaxWaitMillis(10);
            gPoolConfig.setJmxEnabled(true);
            instance = new JedisSentinelPool(master, sentinels, gPoolConfig)
          }
          instance
        }
      }
    
    
      object SQLContextSingleton {
        @transient private var instance: SQLContext = _
    
        def getInstance(sparkContext: SparkContext): SQLContext = {
          if (instance == null) {
            instance = new SQLContext(sparkContext)
          }
          instance
        }
      }
    
    
      object SparkContextSingleton {
        @transient private var instance: SparkContext = _
    
        def getInstance(sparkConf: SparkConf): SparkContext = {
          if (instance == null) {
            instance = new SparkContext(sparkConf)
          }
          instance
        }
      }
    
      object SparkConfSingleton {
        @transient private var instance: SparkConf = _
    
        def getInstance(): SparkConf = {
          if (instance == null) {
            instance = new SparkConf()
          }
          instance
        }
    
      }
    
    }
    

      

    特别依赖:

                   
                    <dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    			<version>2.3.2</version>
    		</dependency>
                    <!-- fastjson-->
    		<dependency>
    			<groupId>com.alibaba</groupId>
    			<artifactId>fastjson</artifactId>
    			<version>1.2.37</version>
    		</dependency>
    		<!-- redis客户端-->
    		<dependency>
    			<groupId>redis.clients</groupId>
    			<artifactId>jedis</artifactId>
    			<version>3.1.0</version>
    		</dependency>
    		<!--hutool -->
    		<dependency>
    			<groupId>cn.hutool</groupId>
    			<artifactId>hutool-all</artifactId>
    			<version>4.5.10</version>
    		</dependency>        

    参考:https://blog.csdn.net/xianpanjia4616/article/details/81709075

  • 相关阅读:
    后缀树(suffix tree)
    哈希表(Hash Table)
    ansible报错Using a SSH password instead of a key is not possible because Host Key checking is enabled and sshpass does not support this
    windows上python上传下载文件到linux服务器指定路径【转】
    MySQL参数最大连接数max_connections
    linux服务器last查看关机记录
    /etc/fstab文件详解【转】
    MySQL5.7更改用户名密码
    awk对列/行进行统计求和【转】
    passwd: Have exhausted maximum number of retries for service【转】
  • 原文地址:https://www.cnblogs.com/mylittlecabin/p/11580053.html
Copyright © 2011-2022 走看看