zoukankan      html  css  js  c++  java
  • SparkStreaming与Kafka整合

    代码示例:

    import org.apache.spark.SparkConf

    import org.apache.spark.SparkContext

    import org.apache.spark.streaming.StreamingContext

    import org.apache.spark.streaming.Seconds

    import org.apache.spark.streaming.kafka.KafkaUtils

       

    object Driver {

     

    def main(args: Array[String]): Unit = {

     

    //--启动线程数,至少是两个。一个线程用于监听数据源,其他线程用于消费或打印。至少是2

    val conf=new SparkConf().setMaster("local[5]").setAppName("kafkainput")

     

    val sc=new SparkContext(conf)

     

    val ssc=new StreamingContext(sc,Seconds(5))

    ssc.checkpoint("d://check1801")

     

    //--连接kafka,并消费数据

    val zkHosts="192.168.150.137:2181,192.168.150.138:2181,192.168.150.139:2181"

    val groupName="gp1"

    //--Mapkey是消费的主题名,value是消费的线程数。也可以消费多个主题,比如:Map("parkx"->1,"enbook"->2)

    val topic=Map("parkx"->1)

     

    //--获取kafka的数据源

    //--SparkStreaming作为Kafka消费的数据源,即从kafka中消费的偏移量(offset)存到zookeeper

    val kafkaStream=KafkaUtils.createStream(ssc, zkHosts, groupName, topic).map{data=>data._2}

     

    val wordcount=kafkaStream.flatMap { line =>line.split(" ") }.map { word=>(word,1) }

    .updateStateByKey{(seq,op:Option[Int])=>Some(seq.sum+op.getOrElse(0))}

     

    wordcount.print()

     

    ssc.start()

     

    //--保持SparkStreaming线程一直开启

    ssc.awaitTermination()

    }

    }

       

  • 相关阅读:
    Go 命令行参数,JSON 序列化与反序列化
    Go 文件操作
    Go 多态
    Go 接口
    Go 面向对象三大特性
    Go 面向对象编程应用
    Go 结构体方法
    Go 面向对象之结构体
    Go Map
    vue安装 vue-cli安装
  • 原文地址:https://www.cnblogs.com/shuzhiwei/p/11323142.html
Copyright © 2011-2022 走看看