zoukankan      html  css  js  c++  java
  • spark2.1消费kafka0.8的数据 Recevier && Direct

    官网案例:

    http://spark.apache.org/docs/2.1.1/streaming-kafka-0-8-integration.html

    pom.xml依赖

        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>2.1.1</version>
          <!--      <scope>provided</scope>   -->
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
          <version>2.1.1</version>
        </dependency>

    Receiver  方式代码:

    package SpartStreamingaiqiyi
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    
    object kafkaReading {
      def main(args: Array[String]): Unit = {
        if (args.length != 4){
          println("usage: SpartStreamingaiqiyi.test <zkQuorm> <group> <topics> <numthreads>")
        }
        val Array(zkQuorm,group,topics,numthreads) = args
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(5))
        val topicMap=topics.split(",").map((_,numthreads.toInt)).toMap
        val messages = KafkaUtils.createStream(ssc,
          zkQuorm, group,topicMap)
        messages.map(_._2).print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

    Direct模式代码:

    package SpartStreamingaiqiyi
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.kafka.common.serialization.StringDeserializer
    object kafkaReading {
      def main(args: Array[String]): Unit = {
        if (args.length != 2){
          System.err.print("usage: SpartStreamingaiqiyi.test <brockers>  <topics> ")
          System.exit(1)
        }
        val Array(brokers,topics) = args
    
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(5))
        val kafkaParams=Map[String, String]("bootstrap.servers" -> brokers)
        val topicSet=topics.split(",").toSet
        val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
        messages.map(_._2).print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • 相关阅读:
    小程序文档
    display: flex;
    时间戳格式化
    transition-分栏按钮动画
    animation与transition区别
    放大镜
    原生js实现瀑布流效果
    Javascript获取数组中最大和最小值
    scss基础
    C/C++ XMPP/Jabber 客户端类库对比/点评 (转)
  • 原文地址:https://www.cnblogs.com/students/p/12035376.html
Copyright © 2011-2022 走看看