zoukankan      html  css  js  c++  java
  • SparkStreaming与Kafka整合

    代码示例:

    import org.apache.spark.SparkConf

    import org.apache.spark.SparkContext

    import org.apache.spark.streaming.StreamingContext

    import org.apache.spark.streaming.Seconds

    import org.apache.spark.streaming.kafka.KafkaUtils

       

    object Driver {

     

    def main(args: Array[String]): Unit = {

     

    //--启动线程数,至少是两个。一个线程用于监听数据源,其他线程用于消费或打印。至少是2

    val conf=new SparkConf().setMaster("local[5]").setAppName("kafkainput")

     

    val sc=new SparkContext(conf)

     

    val ssc=new StreamingContext(sc,Seconds(5))

    ssc.checkpoint("d://check1801")

     

    //--连接kafka,并消费数据

    val zkHosts="192.168.150.137:2181,192.168.150.138:2181,192.168.150.139:2181"

    val groupName="gp1"

    //--Mapkey是消费的主题名,value是消费的线程数。也可以消费多个主题,比如:Map("parkx"->1,"enbook"->2)

    val topic=Map("parkx"->1)

     

    //--获取kafka的数据源

    //--SparkStreaming作为Kafka消费的数据源,即从kafka中消费的偏移量(offset)存到zookeeper

    val kafkaStream=KafkaUtils.createStream(ssc, zkHosts, groupName, topic).map{data=>data._2}

     

    val wordcount=kafkaStream.flatMap { line =>line.split(" ") }.map { word=>(word,1) }

    .updateStateByKey{(seq,op:Option[Int])=>Some(seq.sum+op.getOrElse(0))}

     

    wordcount.print()

     

    ssc.start()

     

    //--保持SparkStreaming线程一直开启

    ssc.awaitTermination()

    }

    }

       

  • 相关阅读:
    让 ijkplayer 支持兼容armv7 armv7s
    以太网私网建立:同一台电脑,不同电脑运行多个节点。
    solidity 语言总结笔记
    web.js 方法详解
    Fabric 环境搭建
    浅谈区块链1
    以太坊私链建立和geth的使用
    搭建联盟链
    fabric 网络 合约部署 和 测试
    【Advanced Windows Phone Programming】番外篇 WP8与WP7
  • 原文地址:https://www.cnblogs.com/shuzhiwei/p/11323142.html
Copyright © 2011-2022 走看看