zoukankan      html  css  js  c++  java
  • scala spark(2.10)读取kafka(2.11_1.0.0)示例

    1、pom加载jar包

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.0.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.0</version>
    </dependency>


    2、代码
    package cn.piesat
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    object App {
    private val brokers="hadoop01:9092"
    def main(args:Array[String]):Unit={
    val spark=getSparkSession()
    val sc=spark.sparkContext
    val ssc=new StreamingContext(sc,Seconds(3))
    val topics=Array("lj01")
    val kafkaParams=Map[String,Object](
    "bootstrap.servers"->brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "use_a_separate_group_id_for_each_stream",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val messages=KafkaUtils.createDirectStream[String,String](
    ssc,
    PreferConsistent,
    Subscribe[String,String](topics,kafkaParams)
    )
    val lines=messages.map(x=>{
    x.value()
    })
    val wordCounts=lines.flatMap(x=>{
    x.split(" ").map(x=>(x,1))
    }).reduceByKey(_+_)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
    }

    def getSparkSession():SparkSession={
    val spark=SparkSession
    .builder()
    .appName("sparkSql")
    .config("spark.some.config.option","some-value")
    .master("local[4]")
    .getOrCreate()
    spark
    }
    }
  • 相关阅读:
    Python数据挖掘—回归—一元非线性回归
    Python数据挖掘—回归—线性回归
    Python 线程 进程 协程
    Python 异常处理
    Python Socket
    python mysql
    Python 面向对象
    Python 正则表达式
    Python 算法
    python 迭代器 生成器 装饰器
  • 原文地址:https://www.cnblogs.com/runnerjack/p/8604410.html
Copyright © 2011-2022 走看看