zoukankan      html  css  js  c++  java
  • SparkStreaming与Kafka整合

    代码示例:

    import org.apache.spark.SparkConf

    import org.apache.spark.SparkContext

    import org.apache.spark.streaming.StreamingContext

    import org.apache.spark.streaming.Seconds

    import org.apache.spark.streaming.kafka.KafkaUtils

       

    object Driver {

     

    def main(args: Array[String]): Unit = {

     

    //--启动线程数,至少是两个。一个线程用于监听数据源,其他线程用于消费或打印。至少是2

    val conf=new SparkConf().setMaster("local[5]").setAppName("kafkainput")

     

    val sc=new SparkContext(conf)

     

    val ssc=new StreamingContext(sc,Seconds(5))

    ssc.checkpoint("d://check1801")

     

    //--连接kafka,并消费数据

    val zkHosts="192.168.150.137:2181,192.168.150.138:2181,192.168.150.139:2181"

    val groupName="gp1"

    //--Mapkey是消费的主题名,value是消费的线程数。也可以消费多个主题,比如:Map("parkx"->1,"enbook"->2)

    val topic=Map("parkx"->1)

     

    //--获取kafka的数据源

    //--SparkStreaming作为Kafka消费的数据源,即从kafka中消费的偏移量(offset)存到zookeeper

    val kafkaStream=KafkaUtils.createStream(ssc, zkHosts, groupName, topic).map{data=>data._2}

     

    val wordcount=kafkaStream.flatMap { line =>line.split(" ") }.map { word=>(word,1) }

    .updateStateByKey{(seq,op:Option[Int])=>Some(seq.sum+op.getOrElse(0))}

     

    wordcount.print()

     

    ssc.start()

     

    //--保持SparkStreaming线程一直开启

    ssc.awaitTermination()

    }

    }

       

  • 相关阅读:
    YOLO2 (2) 测试自己的数据
    Ubuntu 14.04服务器配置 (1) 安装和配置
    window10+linux双系统安装
    机械纪元 尼奥
    如何标数据
    usb-cam (3)摄像机标定文件-ORB-SLAM标定文件
    ORB-SLAM2(3) ROS下实时跑ORB_SLAM2
    usb-cam(1)安装
    usb-cam (2)摄像机标定
    Linux下的压缩zip,解压缩unzip命令详解及实例
  • 原文地址:https://www.cnblogs.com/shuzhiwei/p/11323142.html
Copyright © 2011-2022 走看看