zoukankan      html  css  js  c++  java
  • scala spark(2.10)读取kafka(2.11_1.0.0)示例

    1、pom加载jar包

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.0.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.0</version>
    </dependency>


    2、代码
    package cn.piesat
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    object App {
    private val brokers="hadoop01:9092"
    def main(args:Array[String]):Unit={
    val spark=getSparkSession()
    val sc=spark.sparkContext
    val ssc=new StreamingContext(sc,Seconds(3))
    val topics=Array("lj01")
    val kafkaParams=Map[String,Object](
    "bootstrap.servers"->brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "use_a_separate_group_id_for_each_stream",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val messages=KafkaUtils.createDirectStream[String,String](
    ssc,
    PreferConsistent,
    Subscribe[String,String](topics,kafkaParams)
    )
    val lines=messages.map(x=>{
    x.value()
    })
    val wordCounts=lines.flatMap(x=>{
    x.split(" ").map(x=>(x,1))
    }).reduceByKey(_+_)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
    }

    def getSparkSession():SparkSession={
    val spark=SparkSession
    .builder()
    .appName("sparkSql")
    .config("spark.some.config.option","some-value")
    .master("local[4]")
    .getOrCreate()
    spark
    }
    }
  • 相关阅读:
    java8
    java7
    java6
    java5
    java复习4
    学习笔记
    Reflection笔记
    通过Reflection来获得方法和信息
    學習反射2
    學習反射1
  • 原文地址:https://www.cnblogs.com/runnerjack/p/8604410.html
Copyright © 2011-2022 走看看