zoukankan      html  css  js  c++  java
  • SparkStreaming操作Kafka

    Kafka为一个分布式的消息队列,spark流操作kafka有两种方式:

    一种是利用接收器(receiver)和kafaka的高层API实现。

    一种是不利用接收器,直接用kafka底层的API来实现(spark1.3以后引入)。 

    Receiver方式

    基于Receiver方式实现会利用Kakfa的高层消费API,和所有的其他Receivers一样,接受到的数据会保存到excutors中,然后由spark Streaming 来启动Job进行处理这些数据。

    在默认的配置下,这种方式在失败的情况下,会丢失数据,如果要保证零数据丢失,需要启用WAL(Write Ahead Logs)。它同步将接受到数据保存到分布式文件系统上比如HDFS。 所以数据在出错的情况下可以恢复出来。

    使用两个步骤:

    1、添加依赖:spark-streaming-kafka_2.10-1.3.0

    2、编程:import org.apache.spark.streaming.kafka._

    val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

    注意:

    • kafka的分区数和Spark的RDD的分区不是一个概念。所以在上述函数中增加特定主题的分区数,仅仅增加了一个receiver中消费topic的线程数,并不难增加spark并行处理数据的数量。

    (那是不是多少个paratition最好对应多少个receiver的消费线程啊?)

    • 对于不同的group和topic,可以使用多个recivers创建多个DStreams来并行处理数据(如果是同一个topic如何保证数据不被重复消费?)
    • 如果启用了WAL,接收到的数据会被持久化一份到日志中,因此需要将storage_lever设置成StorgeLevel.MEMORY_AND_DISK_SER
      开启:
       

      val conf = new SparkConf()
      conf.set("spark.streaming.receiver.writeAheadLog.enable","true")
      val sc= new SparkContext(conf)
      val ssc = new StreamingContext(sc,Seconds(5))
      ssc.checkpoint("checkpoint")
      val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
       
      //开启在强行终止的情况下,数据仍然会丢失,解决办法:
      sys.addShutdownHook({
        ssc.stop(true,true)
      )})

     3、运行

    运行提交代码的时候,需要添加以下基本Jar包依赖:

     --jars lib/spark-streaming-kafka_2.10-1.3.0.jar,

        lib/spark-streaming_2.10-1.3.0.jar,

        lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,

     

    4、例子

    object KafkaWordCount {
      def main(args: Array[String]) {
        if (args.length < 4) {
          System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
          System.exit(1)
        }
      
        StreamingExamples.setStreamingLogLevels()
      
        val Array(zkQuorum, group, topics, numThreads) = args
        val sparkConf = new SparkConf().setAppName("KafkaWordCount")
        val ssc =  new StreamingContext(sparkConfSeconds(2))
        //保证元数据恢复,就是Driver端挂了之后数据仍然可以恢复
        ssc.checkpoint("checkpoint")
      
        val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1L))
          .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
        wordCounts.print()
      
        ssc.start()
        ssc.awaitTermination()
      }
    }

    5、图示:

    <接收示意图>

    <元数据恢复>

    直接操作方式

    不同于Receiver接收数据方式,这种方式定期从kafka的topic下对应的partition中查询最新偏移量,并在每个批次中根据相应的定义的偏移范围进行处理。Spark通过调用kafka简单的消费者API读取一定范围的数据。

    相比基于Receiver方式有几个优点:

    • 简单的并发:

    不需要创建多个kafka输入流,然后Union他们,而使用DirectStream,spark Streaming将会创建和kafka分区一样的RDD的分区数,而且会从kafka并行读取数据,Spark的分区数和Kafka的分区数是一一对应的关系。

    • 高效

    第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次:一次是被Kafka复制;另一次是写入到WAL中,没有Receiver消除了这个问题。

    • 仅一次语义:

    Receiver方式读取kafka,使用的是高层API将偏移量写入ZK中,虽然这种方法可以通过数据保存在WAL中保证数据的不对,但是可能会因为sparkStreaming和ZK中保存的偏移量不一致而导致数据被消费了多次,

    第二种方式不采用ZK保存偏移量,消除了两者的不一致,保证每个记录只被Spark Streaming操作一次,即使是在处理失败的情况下。如果想更新ZK中的偏移量数据,需要自己写代码来实现。

    1、引入依赖

    同第一种方式。

    2、编程

    import org.apache.spark.streaming.kafka._
     
     
    val directKafkaStream = KafkaUtils.createDirectStream[[key class], [value class], [key decoder class], [value decoder class] ](streamingContext, [map of Kafka parameters], [set of topics to consume])

    如果想获得每个topic中每个分区的在spark streaming中的偏移量,可以通过以下代码:

    directKafkaStream.foreachRDD { rdd =>
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
        // offsetRanges.length = # of Kafka partitions being consumed
        ...
    }
    //例子:
    val ssc = new StreamingContext(sc, Seconds(2))
    val kafkaParams = Map("zookeeper.connect" -> zkConnect,
          "group.id" -> kafkaGroupId,
          "metadata.broker.list" -> "10.15.42.23:8092,10.15.42.22:8092",
          "auto.offset.reset" -> "smallest"
        )
    val topics = Set(topic)
     
    val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topics)
     
    //KafkaCluster 需要从源码拷贝,此类是私有类。
    directKafkaStream.foreachRDD(
     rdd => {
     val offsetLists = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
     val kc = new KafkaCluster(kafkaParams)
     for (offsets <- offsetLists) {
     val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
     val o = kc.setConsumerOffsets(kafkaGroupId, Map((topicAndPartition, offsets.untilOffset)))
     if (o.isLeft) {
     println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
     }
     }
     }
    )
     
     
     

    3、部署:

    同第一种方式。

    4、图示:

     
  • 相关阅读:
    常用排序算法及java语言实现
    机器学习实战笔记(python3实现)01--概述
    笔试错题--(字符串常量池和JVM运行时数据区)
    笔试错题(典型题)
    java进阶--java网络编程
    01_Java基础_第1天(Java概述、环境变量、注释、关键字、标识符、常量)
    数据库3(DBUtils)
    数据库2(JDBC、DBUtils)
    数据库1(数据库、表及表数据、SQL语句)
    Linux的基本命令
  • 原文地址:https://www.cnblogs.com/seaspring/p/5920414.html
Copyright © 2011-2022 走看看