zoukankan      html  css  js  c++  java
  • Spark Streaming 的一些问题

    Spark Streaming 的一些问题,做选型前关注这些问题可以有效的降低使用风险。

    checkpoint


    checkpoint 是个很好的恢复机制。但是方案比较粗暴,直接通过序列化的机制写入到文件系统,导致代码变更和配置变更无法生效。实际场景是升级往往比系统崩溃的频率高太多。但是升级需要能够无缝的衔接上一次的偏移量。所以spark streaming在无法容忍数据有丢失的情况下,你需要自己记录偏移量,然后从上一次进行恢复。



    我们目前是重写了相关的代码,每次记录偏移量,不过只有在升级的时候才会读取自己记录的偏移量,其他情况都是依然采用checkpoint机制。


    Kafka

    这个和Spark Streaming相关,也不太相关。说相关是因为Spark 对很多异常处理比较简单。很多是和Kafka配置相关的。我举个例子:

    如果消息体太大了,超过 fetch.message.max.bytes=1m,那么Spark Streaming会直接抛出OffsetOutOfRangeException异常,然后停止服务。
    对应的错误会从这行代码抛出:
    if (!iter.hasNext) {
     assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
     finished = true
     null.asInstanceOf[R]
    }


    其实就是消费的完成后 实际的消费数据量和预先估计的量不一致。
    你在日志中看到的信息其实是这个代码答应出来的:
    private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
     s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
     s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +    " This should not happen, and indicates that messages may have been lost"


    解决办法自然是把 fetch.message.max.bytes 设置大些。


    如果你使用Spark Streaming去追数据,从头开始消费kafka,而Kafka因为某种原因,老数据快速的被清理掉,也会引发OffsetOutOfRangeException错误。并且使得Spark Streaming程序异常的终止。


    解决办法是事先记录kafka偏移量和时间的关系(可以隔几秒记录一次),然后根据时间找到一个较大的偏移量开始消费。

    或者你根据目前Kafka新增数据的消费速度,给smallest获取到的偏移量再加一个较大的值,避免出现Spark Streaming 在fetch的时候数据不存在的情况。


    textFileStream

    其实使用textFileStream 的人应该也不少。因为可以很方便的监控HDFS上某个文件夹下的文件,并且进行计算。这里我们遇到的一个问题是,如果底层比如是压缩文件,遇到有顺坏的文件,你是跳不过去的,直接会让Spark Streaming 异常退出。 官方并没有提供合适的方式让你跳过损坏的文件。我们目前是通过重写FileInputDStream 等相关类来修正该问题。


    内存

    Shuffle (尤其是每个周期数据量很大的情况)是Spark Streaming 不可避免的疼痛。譬如,与Kafka的集成, Kafka的分区数决定了你的并行度(我们假设你使用Direct Approach的模式集成)。你为了获得更大的并行度,则需要进行一次repatition。 为了能够避免Shuffle,并且提高Spark Streaming处理的并行度,我们重写了DirectKafkaInputDStream,KafkaRDD,KafkaUtils等类,实现可以按Kafka 分区按倍数扩大并行度。

    我们期望官方能够实现将一个Kafka的partition 映射为多个Spark 的partition,避免数据的多次移动。


    再次,如果单个Executor 并行度过大,可能也会导致对内存压力增大。在使用Spark Streaming的过程中,我们多次遇到Executor Lost 相关的问题(譬如 shuffle fetch 失败,Task失败重试等),目前比较有效的方式是:

    提高Executor 数目
    减少单个Executor的 CPU 核数

    为了保证处理的效率,请保证CPU总核数保持不变。


    监控

    Spark Streaming 的UI 上的Executors Tab缺少一个最大的监控,就是Worker内存GC详情。虽然我们可以将这些信息导入到 第三方监控中,然而终究是不如在 Spark UI上展现更加方便。 为此我们也将该功能列入研发计划。

  • 相关阅读:
    朴素贝叶斯分类模型
    进程同步机制
    Django知识点汇总
    Django框架_URLconf、Views、template、ORM
    WebSocket介绍
    Django小练习
    数据库(11)-- Hash索引和BTree索引 的区别
    MySQL数据库--练习
    Web 框架 Flask
    Django之Model操作
  • 原文地址:https://www.cnblogs.com/dailidong/p/7571139.html
Copyright © 2011-2022 走看看