zoukankan      html  css  js  c++  java
  • spark2.1消费kafka0.8的数据 Recevier && Direct

    官网案例:

    http://spark.apache.org/docs/2.1.1/streaming-kafka-0-8-integration.html

    pom.xml依赖

        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>2.1.1</version>
          <!--      <scope>provided</scope>   -->
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
          <version>2.1.1</version>
        </dependency>

    Receiver  方式代码:

    package SpartStreamingaiqiyi
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    
    object kafkaReading {
      def main(args: Array[String]): Unit = {
        if (args.length != 4){
          println("usage: SpartStreamingaiqiyi.test <zkQuorm> <group> <topics> <numthreads>")
        }
        val Array(zkQuorm,group,topics,numthreads) = args
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(5))
        val topicMap=topics.split(",").map((_,numthreads.toInt)).toMap
        val messages = KafkaUtils.createStream(ssc,
          zkQuorm, group,topicMap)
        messages.map(_._2).print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

    Direct模式代码:

    package SpartStreamingaiqiyi
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.kafka.common.serialization.StringDeserializer
    object kafkaReading {
      def main(args: Array[String]): Unit = {
        if (args.length != 2){
          System.err.print("usage: SpartStreamingaiqiyi.test <brockers>  <topics> ")
          System.exit(1)
        }
        val Array(brokers,topics) = args
    
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(5))
        val kafkaParams=Map[String, String]("bootstrap.servers" -> brokers)
        val topicSet=topics.split(",").toSet
        val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
        messages.map(_._2).print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • 相关阅读:
    .Net/C# 应用程序直接读取本地 Cookies 文件(WinXP SP2 调用 API: InternetGetCookie 无果)
    wininet.dll函数库:不会过期的cookie
    WinForm中TextBox控件循环自动滚动示例
    JScript中Date.getTime转.Net中的DateTime
    js gettime c# ticks
    mysql查看整库个表详情
    rds分区实践
    mysql5.7.21源码安装
    EXPLAIN详解
    C#基础温习(4):C#中string数组和list的相互转换
  • 原文地址:https://www.cnblogs.com/students/p/12035376.html
Copyright © 2011-2022 走看看