package SparkStreaming import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, Seconds, StreamingContext} /** * Created by 古城小巷少年 on 2020-01-03 10:03 */ object KafkaDirectWordCount { def main(args: Array[String]): Unit = { // 指定消费者组名,多个消费者组消费同一个topic,每个组记录各自的offset // 一个topic的分区只能被消费者组内的一个消费者消费。 val group = "g001" val conf = new SparkConf().setAppName("kafkaDirectWordCount").setMaster("local[2]") // 创建SparkSteaming val ssc = new StreamingContext(conf, Seconds(5)) // 指定消费的topic名字 val topic = "wwcc" // 指定kafka的broker地址 val brokerList = "hadoop102:9092,hadoop103:9092,hadoop104:9092" // 指定zk的地址,后期更细消费的偏移量时使用 val zkQuorum = "hadoop102:2181,hadoop103:2181,hadoop104:2181" // 创建stream时使用的topic名字集合,sparkStreaming可以消费多个topic val topics: Set[String] = Set(topic) // 创建一个ZKGroupTopicDirs对象,其实是指往zk中写入数据的目录,用于保存偏移量 val topicDirs = new ZKGroupTopicDirs(group, topic) // 获取zookeeper中的路径"/g001/offsets/wwcc/" val zkTopicPath = s"${topicDirs.consumerOffsetDir}" // 准备kafka的参数 val kafkaParams: Map[String, String] = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, // 从头开始读取数据 "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString ) // 创建zookeeper客户端,可以从zk中读取偏移量数据,并更新偏移量 val zkClient = new ZkClient(zkQuorum) // 查询该路径下是否有字节点 val children: Int = zkClient.countChildren(zkTopicPath) var kafkaStream: InputDStream[(String, String)] = null // 如果zookeeper中保存有offset,则利用这个offset作为kafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() // 如果保存过offset if(children > 0) { for (i<-0 until children){ // 读取偏移量 val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/${i}") // 将topic和partition封装到ip对象里 wordcount/0 val tp = TopicAndPartition(topic, i) // 将key:TopicAndPartition, value:partitionOffset写入fromOffsets Map中 fromOffsets += (tp -> partitionOffset) } // 将Kafka的消息进行转换成(key,value)形式,value是消息内容,key是元数据信息 val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) // 通过kafkaUtils创建直连的Dstream // fromOffsets的作用是按照前面计算好的偏移量继续消费数据 val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler) } else { // 如果未保存 val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } // 偏移量的范围 var offsetRanges: Array[OffsetRange] = Array[OffsetRange]() // 从kafka读取数据,DSteam的transform方法可以将当前批次的RDD获取出来 // 该transform方法计算获取到当前批次RDD,然后将RDD的偏移量取出来,然后将RDD返回到DStream val transform: DStream[(String, String)] = kafkaStream.transform(rdd => { // 得到该RDD对应的kafka的消息的offset // 该RDD是一个kafkaRDD,可以获得偏移量的范围 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }) val messages: DStream[String] = transform.map(_._2) // 依次迭代DSteam中的RDD messages.foreachRDD( rdd => { rdd.foreachPartition(partition => { partition.foreach(x=> { println(x) }) }) for (o <- offsetRanges){ val zPath = s"${topicDirs.consumerOffsetDir}/${(o.partition)}" // 将该partition的offset保存到zookeeper中 ZkUtils.updatePersistentPath(zkClient, zPath, o.untilOffset.toString) } }) ssc.start() ssc.awaitTermination() } }