zoukankan      html  css  js  c++  java
  • Kafka集成SparkStreaming

    Spark Streaming + Kafka集成指南

    Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个独立的相应Spark Streaming包可用。请选择正确的包, 请注意,0.8集成与后来的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容。

    注意:从Spark 2.3.0开始,不推荐使用Kafka 0.8支持。

    Spark Streaming从Kafka接收数据,转换为spark streaming中的数据结构Dstream。数据接收方式有两种 :1 使用Receiver接收的旧方法:2使用Direct拉取的新方法(在Spark 1.3中引入)。

    https://spark.apache.org/docs/1.6.3/streaming-kafka-integration.html

    https://spark.apache.org/docs/2.3.1/streaming-kafka-0-10-integration.html

    Receiver方式

         Received是使用Kafka高级Consumer API实现的。与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark Executor的内存中,然后由Spark Streaming启动的job来处理数据。然而默认配置下,这种方式可能会因为底层的失败而丢失数据(请参阅接收器可靠性)。如果要启用高可靠机制,确保零数据丢失,要启用Spark Streaming的预写日志机制(Write Ahead Log,(已引入)在Spark 1.2)。该机制会同步地将接收到的Kafka数据保存到分布式文件系统(比如HDFS)上的预写日志中,以便底层节点在发生故障时也可以使用预写日志中的数据进行恢复。

    如下图:

    接下来,我们将讨论如何在流应用程序中使用此方法。

    1 链接 

    对于使用Maven项目定义的Scala / Java应用程序时,我们需要添加相应的依赖包:

    <dependency><!-- Spark Streaming Kafka -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.11</artifactId>
        <version>1.6.3</version>
    </dependency>

    2 编程 

    在流应用程序代码中,导入KafkaUtils并创建输入DStream,如下所示。

    Scala编程:

    import org.apache.spark.streaming.kafka._
    
       val kafkaStream = KafkaUtils.createStream(streamingContext, 
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

    Java编程

     import org.apache.spark.streaming.kafka.*;
    
     JavaPairReceiverInputDStream<String, String> kafkaStream = 
         KafkaUtils.createStream(streamingContext,
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);

    还有几个需要注意的点:

    • Kafka中topic的partition与Spark Streaming中生成的RDD的partition无关,因此,在KafkaUtils.createStream()中,增加某个topic的partition的数量,只会增加单个Receiver消费topic的线程数,也就是读取Kafka中topic partition的线程数量,它不会增加Spark在处理数据时的并行性。
    • 可以使用不同的consumer group和topic创建多个Kafka输入DStream,以使用多个receiver并行接收数据。
    • 如果已使用HDFS等复制文件系统启用了“预读日志”,则接收的数据已在日志中复制。因此,输入流的存储级别的存储级别StorageLevel.MEMORY_AND_DISK_SER(即,使用KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER))。

    3 部署

    与任何Spark应用程序一样,spark-submit用于启动应用程序。但是,Scala / Java应用程序和Python应用程序的细节略有不同。

    对于Scala和Java应用程序,如果您使用SBT或Maven进行项目管理,则将spark-streaming-kafka_2.11其及其依赖项打包到应用程序JAR中。确保spark-core_2.10spark-streaming_2.10标记为providedSpark安装中已存在的依赖项。然后使用spark-submit启动应用程序

    对于缺少SBT / Maven项目管理的Python应用程序,spark-streaming-kafka_2.11可以直接将其依赖项添加到spark-submit使用中--packages那是,

     ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3 ...

    另外,您也可以下载Maven构件的JAR spark-streaming-kafka-assembly从 Maven仓库,并将其添加到spark-submit--jars

    Direct方式

    在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。其形式如下图:

    这种方法相较于Receiver方式的优势在于:

    • 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
    • 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
    • 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

    请注意,此方法的一个缺点是它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监视工具将不会显示进度。但是,您可以在每个批处理中访问此方法处理的偏移量,并自行更新Zookeeper。

    接下来,我们将讨论如何在流应用程序中使用此方法。

    1 链接

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>

    2 编程

    请注意,导入的命名空间包括版本org.apache.spark.streaming.kafka010

    Scala编程

    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node21:9092,node22:9092,node23:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    
    val topics = Array("topicA", "topicB")
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    
    stream.map(record => (record.key, record.value))

    流中的每个项目都是ConsumerRecord,有关可能的kafkaParams,请参阅Kafka使用者配置文档。如果Spark批处理持续时间大于默认的Kafka心跳会话超时(30秒),请适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批次,这将需要在代理上更改group.max.session.timeout.ms。请注意,该示例将enable.auto.commit设置为false,有关讨论,请参阅存储偏移

    3 Direct方式案例

    package com.xyg.spark
     
    import kafka.serializer.{StringDecoder, Decoder}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkContext, SparkConf}
     
    import scala.reflect.ClassTag
     
    /**
      * Created by Administrator on 2018/7/28.
      */
    object SparkStreamDemo {
      def main(args: Array[String]) {
     
        val conf = new SparkConf()
        conf.setAppName("spark_streaming")
        conf.setMaster("local[*]")
     
        val sc = new SparkContext(conf)
        sc.setCheckpointDir("D:/checkpoints")
        sc.setLogLevel("ERROR")
     
        val ssc = new StreamingContext(sc, Seconds(5))
     
        // val topics = Map("spark" -> 2)
     
        val kafkaParams = Map[String, String](
          "bootstrap.servers" -> "node21:9092,node22:9092,node23:9092",
          "group.id" -> "spark",
          "auto.offset.reset" -> "smallest"
        )
        // 直连方式拉取数据,这种方式不会修改数据的偏移量,需要手动的更新
        val lines =  KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("spark")).map(_._2)
        // val lines = KafkaUtils.createStream(ssc, "node21:2181,node22:2181,node23:2181", "spark", topics).map(_._2)
     
        val ds1 = lines.flatMap(_.split(" ")).map((_, 1))
     
        val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => {
          Some(x.sum + y.getOrElse(0))
        })
     
        ds2.print()
     
        ssc.start()
        ssc.awaitTermination()
     
      }
    }

    Spark向kafka中写入数据

    上文阐述了Spark如何从Kafka中流式的读取数据,下面我整理向Kafka中写数据。与读数据不同,Spark并没有提供统一的接口用于写入Kafka,所以我们需要使用底层Kafka接口进行包装。
    最直接的做法我们可以想到如下这种方式:

    input.foreachRDD(rdd =>
      // 不能在这里创建KafkaProducer
      rdd.foreachPartition(partition =>
        partition.foreach{
          case x:String=>{
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            println(x)
            val producer = new KafkaProducer[String,String](props)
            val message=new ProducerRecord[String, String]("output",null,x)
            producer.send(message)
          }
        }
      )
    ) 

    但是这种方式缺点很明显,对于每个partition的每条记录,我们都需要创建KafkaProducer,然后利用producer进行输出操作,注意这里我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的(not serializable)。显然这种做法是不灵活且低效的,因为每条记录都需要建立一次连接。如何解决呢?

    1.首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:

    import java.util.concurrent.Future
    import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
    class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
      /* This is the key idea that allows us to work around running into
         NotSerializableExceptions. */
      lazy val producer = createProducer()
      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, key, value))
      def send(topic: String, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, value))
    }
    
    object KafkaSink {
      import scala.collection.JavaConversions._
      def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
        val createProducerFunc = () => {
          val producer = new KafkaProducer[K, V](config)
          sys.addShutdownHook {
            // Ensure that, on executor JVM shutdown, the Kafka producer sends
            // any buffered messages to Kafka before shutting down.
            producer.close()
          }
          producer
        }
        new KafkaSink(createProducerFunc)
      }
      def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
    }

    2.之后我们利用广播变量的形式,将KafkaProducer广播到每一个executor,如下:

    // 广播KafkaSink
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", Conf.brokers)
        p.setProperty("key.serializer", classOf[StringSerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      log.warn("kafka producer init done!")
      ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
    }

    这样我们就能在每个executor中愉快的将数据输入到kafka当中:

    //输出到kafka
    segmentedStream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {
        rdd.foreach(record => {
          kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
          // do something else
        })
      }
    })

    Spark streaming+Kafka应用

    一般Spark Streaming进行流式处理,首先利用上文我们阐述的Direct方式从Kafka拉取batch,之后经过分词、统计等相关处理,回写到DB上(一般为Hbase或者Mysql),由此高效实时的完成每天大量数据的词频统计任务。

    Spark streaming+Kafka调优

    Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置。

    1 合理的批处理时间(batchDuration)

    几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。而且,一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整,如下图:

    2 合理的Kafka拉取量(maxRatePerPartition重要)

    对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的,配置参数为:spark.streaming.kafka.maxRatePerPartition。这个参数默认是没有上线的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time,如下图:

    3 缓存反复使用的Dstream(RDD)

    Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache(),将该数据流缓存起来,防止过度的调度资源造成的网络开销。可以参考观察Scheduling Delay参数,如下图:

    4 设置合理的GC

    长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:

    --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

    5 设置合理的CPU资源数

    CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。

    6 设置合理的parallelism

    partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。
    在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

    7 使用高性能的算子

    这里参考了美团技术团队的博文,并没有做过具体的性能测试,其建议如下:

    • 使用reduceByKey/aggregateByKey替代groupByKey
    • 使用mapPartitions替代普通map
    • 使用foreachPartitions替代foreach
    • 使用filter之后进行coalesce操作
    • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

    8 使用Kryo优化序列化性能

    这个优化原则我本身也没有经过测试,但是好多优化文档有提到,这里也记录下来。
    在Spark中,主要有三个地方涉及到了序列化:

    • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
    • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
    • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

    对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

    以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

    // 创建SparkConf对象。
    val conf = new SparkConf().setMaster(...).setAppName(...)
    // 设置序列化器为KryoSerializer。
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    // 注册要序列化的自定义类型。
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

    结果

    经过种种调试优化,我们最终要达到的目的是,Spark Streaming能够实时的拉取Kafka当中的数据,并且能够保持稳定,如下图所示:

    当然不同的应用场景会有不同的图形,这是本文词频统计优化稳定后的监控图,我们可以看到Processing Time这一柱形图中有一Stable的虚线,而大多数Batch都能够在这一虚线下处理完毕,说明整体Spark Streaming是运行稳定的。

  • 相关阅读:
    华为 简单OSPF实验
    华为 基于MAC地址的VLAN划分
    完全背包
    01背包问题
    90. 子集 II
    Java去除字符串中的特殊符号或者指定的字符
    Java查找指定文件夹下的所有文件
    Java面试基础
    Spring获取ApplicationContext
    JSP & EL & JSTL
  • 原文地址:https://www.cnblogs.com/frankdeng/p/9308585.html
Copyright © 2011-2022 走看看