zoukankan      html  css  js  c++  java
  • scala spark(2.10)读取kafka(2.11_1.0.0)示例

    1、pom加载jar包

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.0.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.0</version>
    </dependency>


    2、代码
    package cn.piesat
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    object App {
    private val brokers="hadoop01:9092"
    def main(args:Array[String]):Unit={
    val spark=getSparkSession()
    val sc=spark.sparkContext
    val ssc=new StreamingContext(sc,Seconds(3))
    val topics=Array("lj01")
    val kafkaParams=Map[String,Object](
    "bootstrap.servers"->brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "use_a_separate_group_id_for_each_stream",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val messages=KafkaUtils.createDirectStream[String,String](
    ssc,
    PreferConsistent,
    Subscribe[String,String](topics,kafkaParams)
    )
    val lines=messages.map(x=>{
    x.value()
    })
    val wordCounts=lines.flatMap(x=>{
    x.split(" ").map(x=>(x,1))
    }).reduceByKey(_+_)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
    }

    def getSparkSession():SparkSession={
    val spark=SparkSession
    .builder()
    .appName("sparkSql")
    .config("spark.some.config.option","some-value")
    .master("local[4]")
    .getOrCreate()
    spark
    }
    }
  • 相关阅读:
    【动态规划】CDOJ1651 Uestc的命运之旅
    【动态规划】【二分】CDOJ1006 最长上升子序列
    【动态规划】CDOJ1271 Search gold
    【概率dp】【滚动数组】CDOJ1652 都市大飙车
    【混合背包】CDOJ1606 难喝的饮料
    【状压dp】CDOJ1608 暑假集训
    【构造】CDOJ1607 大学生足球联赛
    【二分】Codeforces Round #417 (Div. 2) C. Sagheer and Nubian Market
    【动态规划】Codeforces Round #417 (Div. 2) B. Sagheer, the Hausmeister
    HBase简介
  • 原文地址:https://www.cnblogs.com/runnerjack/p/8604410.html
Copyright © 2011-2022 走看看