zoukankan      html  css  js  c++  java
  • Spark Streaming与kafka整合实践之WordCount

    本次实践使用kafka console作为消息的生产者,Spark Streaming作为消息的消费者,具体实践代码如下

    首先启动kafka server

    .inwindowskafka-server-start.bat    .configserver.properties
    

    创建一个Topic

    此处topic名以test为例

    kafka-topics.bat  --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

    创建一个producer

    kafka-console-consumer.bat  --zookeeper localhost:2181  --topic test
    

    创建一个Consumer

    package spark.examples.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
    object SparkStreamingKakfaWordCount {
      def main(args: Array[String]) {
        println("Start to run SparkStreamingKakfaWordCount")
        val conf = new SparkConf().setMaster("local[3]")setAppName("SparkStreamingKakfaWordCount")
        val ssc = new StreamingContext(conf, Seconds(4))
        val topicMap=Map("test" -> 1)
    //    zookeeper quorums server list
        val zkQuorum = "localhost:2181";
    //   consumer group
        val group = "test-consumer-group01"
        //下面的处理方式假设topic test只有一个分区
       val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
       lines.print()
       
      val words = lines.flatMap(_.split(" "))   
      val wordCounts = words.map(x => (x,1L)).reduceByKey(_+_)
      wordCounts.print()
    //  下面的处理方式假设topic test有2个分区,spark streaming 创建2个Input DStream,并行读2个分区
      //  Spark Streaming将RDD重新分区为4个RDD,进行并行处理,处理逻辑的并行度是读取并行的度的2倍
    //    val streams = (1 to 2).map( _ => KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2))
    
    //    将2个stream进行union
    //    val partitions = ssc.union(streams).repartition(4).map("DataReceived: " + _)
    //    partitions.print()
    //    val partitions = ssc.union(streams).repartition(2)   //partition个数根据spark并行处理能力而定
    //    val words = partitions.flatMap(_.split(" "))
    //    val wordCounts = words.map(x => (x,1L)).reduceByKey(_+_)
    //    wordCounts.print()
    
        ssc.start()   //Start the computation
        ssc.awaitTermination()   //Wait for the computation to termination
      }
    
    }
    
  • 相关阅读:
    剑指Offer-49.把字符串转换成整数(C++/Java)
    codeforces Gym 100338H High Speed Trains (递推,高精度)
    codeforces Gym 100338E Numbers (贪心,实现)
    codeforces Gym 100338C Important Roads (重建最短路图)
    HDU 4347 The Closest M Points (kdTree)
    UVA 10817
    HDU 4348 I
    HDU 4341 Gold miner (分组背包)
    UVA 1218
    UVA 1220 Party at Hali-Bula (树形DP)
  • 原文地址:https://www.cnblogs.com/xinlingyoulan/p/6072645.html
Copyright © 2011-2022 走看看