zoukankan      html  css  js  c++  java
  • Spark Streaming集成Kafak的问题之Ran out of messages

    Caused by: java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 264251742 for topic topic partition 1 start 264245135. This should not happen, and indicates that messages may have been lost
        at scala.Predef$.assert(Predef.scala:179)
        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:165)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:203)
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
     
     
    此处异常是由于kafka在读取指定offset日志时(此处是264245135到264251742),由于日志过大,导致日志的总大小超过 fetch.message.max.bytes的设定的值(默认为1024*1024),导致此错误。解决方法是,在kafka client的参数中加大fetch.message.max.bytes的值。
     
    比如:
    //kafka的配置文件
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,"fetch.message.max.bytes"->"10485760")
    //工作流
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
  • 相关阅读:
    ASP.NET ZERO 学习 JTable的ChildTable用法
    ASP.NET ZERO Core Application 学习笔记
    uploadify ASP.net 使用笔记
    金额的加减乘除运算
    利用autoit自动关闭指定标题窗口
    Struts2源代码解读之Action调用
    利用btrace工具监控在线运行java程序
    自己实现的简单MVC框架(类似Struts2+Spring)
    简单实用后台任务执行框架(Struts2+Spring+AJAX前端web界面可以获取进度)
    mybatis源代码分析:mybatis延迟加载机制改进
  • 原文地址:https://www.cnblogs.com/luckuan/p/4968919.html
Copyright © 2011-2022 走看看