zoukankan      html  css  js  c++  java
  • spark2.1消费kafka0.8的数据 Recevier && Direct

    官网案例:

    http://spark.apache.org/docs/2.1.1/streaming-kafka-0-8-integration.html

    pom.xml依赖

        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>2.1.1</version>
          <!--      <scope>provided</scope>   -->
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
          <version>2.1.1</version>
        </dependency>

    Receiver  方式代码:

    package SpartStreamingaiqiyi
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    
    object kafkaReading {
      def main(args: Array[String]): Unit = {
        if (args.length != 4){
          println("usage: SpartStreamingaiqiyi.test <zkQuorm> <group> <topics> <numthreads>")
        }
        val Array(zkQuorm,group,topics,numthreads) = args
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(5))
        val topicMap=topics.split(",").map((_,numthreads.toInt)).toMap
        val messages = KafkaUtils.createStream(ssc,
          zkQuorm, group,topicMap)
        messages.map(_._2).print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

    Direct模式代码:

    package SpartStreamingaiqiyi
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.kafka.common.serialization.StringDeserializer
    object kafkaReading {
      def main(args: Array[String]): Unit = {
        if (args.length != 2){
          System.err.print("usage: SpartStreamingaiqiyi.test <brockers>  <topics> ")
          System.exit(1)
        }
        val Array(brokers,topics) = args
    
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(5))
        val kafkaParams=Map[String, String]("bootstrap.servers" -> brokers)
        val topicSet=topics.split(",").toSet
        val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
        messages.map(_._2).print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • 相关阅读:
    utils:一个通用枚举类
    代码片段(二):SQL片段
    Scala:(一) 特点及安装环境配置
    Scala:(二) 语言基础-数据结构、表达式(判断、循环、块表达式)
    11-docker搭建mysql一主一从
    10-docker搭建rabbitmq集群
    尚硅谷周阳面试第二季
    docker 修改mysql 表大小写铭感
    volatile的理解
    消息队列优缺点及其选型
  • 原文地址:https://www.cnblogs.com/students/p/12035376.html
Copyright © 2011-2022 走看看