zoukankan      html  css  js  c++  java
  • SparkStreaming以Direct的方式对接Kafka

    package SparkStreaming
    
    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
    import org.I0Itec.zkclient.ZkClient
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
    
    /**
     * Created by 古城小巷少年 on 2020-01-03 10:03
     */
    
    object KafkaDirectWordCount {
    
      def main(args: Array[String]): Unit = {
    
        // 指定消费者组名,多个消费者组消费同一个topic,每个组记录各自的offset
        // 一个topic的分区只能被消费者组内的一个消费者消费。
        val group = "g001"
    
        val conf = new SparkConf().setAppName("kafkaDirectWordCount").setMaster("local[2]")
        // 创建SparkSteaming
        val ssc = new StreamingContext(conf, Seconds(5))
    
        // 指定消费的topic名字
        val topic = "wwcc"
    
        // 指定kafka的broker地址
        val brokerList = "hadoop102:9092,hadoop103:9092,hadoop104:9092"
    
        // 指定zk的地址,后期更细消费的偏移量时使用
        val zkQuorum = "hadoop102:2181,hadoop103:2181,hadoop104:2181"
    
        // 创建stream时使用的topic名字集合,sparkStreaming可以消费多个topic
        val topics: Set[String] = Set(topic)
    
        // 创建一个ZKGroupTopicDirs对象,其实是指往zk中写入数据的目录,用于保存偏移量
        val topicDirs = new ZKGroupTopicDirs(group, topic)
        // 获取zookeeper中的路径"/g001/offsets/wwcc/"
        val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
    
        // 准备kafka的参数
        val kafkaParams: Map[String, String] = Map(
          "metadata.broker.list" -> brokerList,
          "group.id" -> group,
          // 从头开始读取数据
          "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
        )
    
        // 创建zookeeper客户端,可以从zk中读取偏移量数据,并更新偏移量
        val zkClient = new ZkClient(zkQuorum)
    
        // 查询该路径下是否有字节点
        val children: Int = zkClient.countChildren(zkTopicPath)
    
        var kafkaStream: InputDStream[(String, String)] = null
    
        // 如果zookeeper中保存有offset,则利用这个offset作为kafkaStream的起始位置
        var fromOffsets: Map[TopicAndPartition, Long] = Map()
    
        // 如果保存过offset
        if(children > 0) {
          for (i<-0 until children){
            // 读取偏移量
            val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/${i}")
            // 将topic和partition封装到ip对象里 wordcount/0
            val tp = TopicAndPartition(topic, i)
            // 将key:TopicAndPartition, value:partitionOffset写入fromOffsets Map中
            fromOffsets += (tp -> partitionOffset)
          }
    
          // 将Kafka的消息进行转换成(key,value)形式,value是消息内容,key是元数据信息
          val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
    
          // 通过kafkaUtils创建直连的Dstream
          // fromOffsets的作用是按照前面计算好的偏移量继续消费数据
    
          val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)
    
        } else {
          // 如果未保存
          val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        }
    
        // 偏移量的范围
        var offsetRanges: Array[OffsetRange] = Array[OffsetRange]()
    
        // 从kafka读取数据,DSteam的transform方法可以将当前批次的RDD获取出来
        // 该transform方法计算获取到当前批次RDD,然后将RDD的偏移量取出来,然后将RDD返回到DStream
        val transform: DStream[(String, String)] = kafkaStream.transform(rdd => {
          // 得到该RDD对应的kafka的消息的offset
          // 该RDD是一个kafkaRDD,可以获得偏移量的范围
          offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd
        })
    
        val messages: DStream[String] = transform.map(_._2)
    
        // 依次迭代DSteam中的RDD
        messages.foreachRDD( rdd => {
          rdd.foreachPartition(partition => {
            partition.foreach(x=> {
              println(x)
            })
          })
    
          for (o <- offsetRanges){
            val zPath = s"${topicDirs.consumerOffsetDir}/${(o.partition)}"
            // 将该partition的offset保存到zookeeper中
            ZkUtils.updatePersistentPath(zkClient, zPath, o.untilOffset.toString)
          }
        })
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    
    }
    
  • 相关阅读:
    【crontab】误删crontab及其恢复
    New Concept English there (7)
    New Concept English there (6)
    New Concept English there (5)
    New Concept English there (4)
    New Concept English there (3)
    New Concept English there (2)Typing speed exercise
    New Concept English there (1)Typing speed exercise
    New Concept English Two 34 game over
    New Concept English Two 33 94
  • 原文地址:https://www.cnblogs.com/lucas-zhao/p/12144457.html
Copyright © 2011-2022 走看看