zoukankan      html  css  js  c++  java
  • SparkStreaming以Direct的方式对接Kafka

    package SparkStreaming
    
    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
    import org.I0Itec.zkclient.ZkClient
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
    
    /**
     * Created by 古城小巷少年 on 2020-01-03 10:03
     */
    
    object KafkaDirectWordCount {
    
      def main(args: Array[String]): Unit = {
    
        // 指定消费者组名,多个消费者组消费同一个topic,每个组记录各自的offset
        // 一个topic的分区只能被消费者组内的一个消费者消费。
        val group = "g001"
    
        val conf = new SparkConf().setAppName("kafkaDirectWordCount").setMaster("local[2]")
        // 创建SparkSteaming
        val ssc = new StreamingContext(conf, Seconds(5))
    
        // 指定消费的topic名字
        val topic = "wwcc"
    
        // 指定kafka的broker地址
        val brokerList = "hadoop102:9092,hadoop103:9092,hadoop104:9092"
    
        // 指定zk的地址,后期更细消费的偏移量时使用
        val zkQuorum = "hadoop102:2181,hadoop103:2181,hadoop104:2181"
    
        // 创建stream时使用的topic名字集合,sparkStreaming可以消费多个topic
        val topics: Set[String] = Set(topic)
    
        // 创建一个ZKGroupTopicDirs对象,其实是指往zk中写入数据的目录,用于保存偏移量
        val topicDirs = new ZKGroupTopicDirs(group, topic)
        // 获取zookeeper中的路径"/g001/offsets/wwcc/"
        val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
    
        // 准备kafka的参数
        val kafkaParams: Map[String, String] = Map(
          "metadata.broker.list" -> brokerList,
          "group.id" -> group,
          // 从头开始读取数据
          "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
        )
    
        // 创建zookeeper客户端,可以从zk中读取偏移量数据,并更新偏移量
        val zkClient = new ZkClient(zkQuorum)
    
        // 查询该路径下是否有字节点
        val children: Int = zkClient.countChildren(zkTopicPath)
    
        var kafkaStream: InputDStream[(String, String)] = null
    
        // 如果zookeeper中保存有offset,则利用这个offset作为kafkaStream的起始位置
        var fromOffsets: Map[TopicAndPartition, Long] = Map()
    
        // 如果保存过offset
        if(children > 0) {
          for (i<-0 until children){
            // 读取偏移量
            val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/${i}")
            // 将topic和partition封装到ip对象里 wordcount/0
            val tp = TopicAndPartition(topic, i)
            // 将key:TopicAndPartition, value:partitionOffset写入fromOffsets Map中
            fromOffsets += (tp -> partitionOffset)
          }
    
          // 将Kafka的消息进行转换成(key,value)形式,value是消息内容,key是元数据信息
          val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
    
          // 通过kafkaUtils创建直连的Dstream
          // fromOffsets的作用是按照前面计算好的偏移量继续消费数据
    
          val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)
    
        } else {
          // 如果未保存
          val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        }
    
        // 偏移量的范围
        var offsetRanges: Array[OffsetRange] = Array[OffsetRange]()
    
        // 从kafka读取数据,DSteam的transform方法可以将当前批次的RDD获取出来
        // 该transform方法计算获取到当前批次RDD,然后将RDD的偏移量取出来,然后将RDD返回到DStream
        val transform: DStream[(String, String)] = kafkaStream.transform(rdd => {
          // 得到该RDD对应的kafka的消息的offset
          // 该RDD是一个kafkaRDD,可以获得偏移量的范围
          offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd
        })
    
        val messages: DStream[String] = transform.map(_._2)
    
        // 依次迭代DSteam中的RDD
        messages.foreachRDD( rdd => {
          rdd.foreachPartition(partition => {
            partition.foreach(x=> {
              println(x)
            })
          })
    
          for (o <- offsetRanges){
            val zPath = s"${topicDirs.consumerOffsetDir}/${(o.partition)}"
            // 将该partition的offset保存到zookeeper中
            ZkUtils.updatePersistentPath(zkClient, zPath, o.untilOffset.toString)
          }
        })
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    
    }
    
  • 相关阅读:
    【技术贴】【技术贴】每次双击都会跳出来打开方式的解决办法。。。选择你想用来打开此文件的程序。。
    【技术贴】xp更改登录头像,打开“用户账户”时显示:Automation服务器不能创建对象。的解决办
    【技术贴】关于惠普在郑州建立全球云计算服务中心的解析。。来自大河报
    【技术贴】如何删除卡巴斯基的日志?占C盘了好多空间....
    2007 Office 产品版本号
    SharePoint Workflow 基础
    重装SPS 2003的一点经验
    列出有空应该看一下的要点
    WinDBG命令概览(下) 扩展命令
    Content Deployment入门(下)
  • 原文地址:https://www.cnblogs.com/lucas-zhao/p/12144457.html
Copyright © 2011-2022 走看看