zoukankan      html  css  js  c++  java
  • SparkStreaming from Kafka Demo

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object KafkaSparkStreaming extends App{
      //1.初始化Spark配置信息
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
      //2.初始化 Spark  StreamingContext
      private val streamContext = new StreamingContext(sparkConf,Seconds(5))
    
    //  //2.定义kafka参数
    //  val brokers = "hadoop102:9092,hadoop103:9092,hadoop101:9092"
    //  val topic = "source"
    //  val consumerGroup = "spark"
    
      //TODO 保存数据的状态,需要设置检查点路径
      streamContext.sparkContext.setCheckpointDir("checkpoint1")
    
      private val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(
        streamContext,
        "hadoop101:2181",
        "sparkstream",
        Map("stream" -> 3)
      )
      private val wordStreams: DStream[String] = kafkaDStream.flatMap(t=>t._2.split(" "))
    
      //将单词映射成元组(word,1)
      val wordAndOneStreams = wordStreams.map((_, 1))
    
    //  //将相同的单词次数做统计 ,无状态统计
    //  val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)
    //  //打印
    //    wordAndCountStreams.print()
    
      // TODO 使用有状态统计更新
      private val stateDStream: DStream[(String, Int)] = wordAndOneStreams.updateStateByKey {
        case (seq, buffer) => {
          val sum = buffer.getOrElse(0) + seq.sum
          Option(sum)
        }
      }
      stateDStream.print()
    
    
      //启动SparkStreamingContext
      streamContext.start()
      streamContext.awaitTermination()
    }
    
    <人追求理想之时,便是坠入孤独之际.> By 史泰龙
  • 相关阅读:
    石头汤
    win8激活 DNS名称不存在
    IE系列hack
    CSS缩写
    MSDN资源
    项目管理之软件文档知多少
    jqModal
    Microsoft Security Essentials 不升级方法
    XP没有声音服务解决方案
    office2003安全模式启动,默认模板问题/打开word就显示“无法装载这个对象”
  • 原文地址:https://www.cnblogs.com/jason-Gan/p/13644619.html
Copyright © 2011-2022 走看看