zoukankan      html  css  js  c++  java
  • No output operations registered, so nothing to execute

    SparkStreaming和KafKa结合报错!报错之前代码如下:

     1 object KafkaWordCount{
     2    val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])])=>{
     3       iter.flatMap{case(x,y,z) => Some(y.sum+z.getOrElse(0)).map(i => (x,i))}
     4    }
     5    def main(args: Array[String]): Unit = {
     6       val Array(zkQuorum, group, topics, numThreads) = args
     7       val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
     8       val ssc = new StreamingContext(sparkConf, Seconds(5))
     9       ssc.checkpoint("c://ck2")
    10       val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    11       val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
    12       val words = data.map(_._2).flatMap(_.split(" "))
    13       val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    14       println(wordCounts)
    15       ssc.start()
    16       ssc.awaitTermination()
    17    }
    18 }

        注意:  在14行 应该是 wordCounts.print()  报错原因 :  在使用Streaming 的时候需要触发如下方法 print否则出现如下的错误

     1 17/07/28 17:11:59 ERROR StreamingContext: Error starting the context, marking it as stopped
     2 java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
     3     at scala.Predef$.require(Predef.scala:233)
     4     at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
     5     at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
     6     at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
     7     at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
     8     at org.bianqi.spark.day5.KafkaWordCount$.main(KafkaWordCount.scala:24)
     9     at org.bianqi.spark.day5.KafkaWordCount.main(KafkaWordCount.scala)
    10 Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    11     at scala.Predef$.require(Predef.scala:233)

       修改后的代码如下:

       

     1 object KafkaWordCount{
     2    val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])])=>{
     3       iter.flatMap{case(x,y,z) => Some(y.sum+z.getOrElse(0)).map(i => (x,i))}
     4    }
     5    def main(args: Array[String]): Unit = {
     6       val Array(zkQuorum, group, topics, numThreads) = args
     7       val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
     8       val ssc = new StreamingContext(sparkConf, Seconds(5))
     9       ssc.checkpoint("c://ck2")
    10       val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    11       val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
    12       val words = data.map(_._2).flatMap(_.split(" "))
    13       val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    14       wordCounts.print()
    15       ssc.start()
    16       ssc.awaitTermination()
    17    }
    18 }

    但是在stackoverflow上看到 报这样的错误 会是另外一在原因 具体地址如下:

       https://stackoverflow.com/questions/34188274/spark-no-output-operations-registered-so-nothing-to-execute

  • 相关阅读:
    OutputCache 缓存key的创建 CreateOutputCachedItemKey
    Asp.net Web Api源码调试
    asp.net mvc源码分析DefaultModelBinder 自定义的普通数据类型的绑定和验证
    Asp.net web Api源码分析HttpParameterBinding
    Asp.net web Api源码分析HttpRequestMessage的创建
    asp.net mvc源码分析ActionResult篇 RazorView.RenderView
    Asp.Net MVC 项目预编译 View
    Asp.net Web.config文件读取路径你真的清楚吗?
    asp.net 动态创建TextBox控件 如何加载状态信息
    asp.net mvc源码分析BeginForm方法 和ClientValidationEnabled 属性
  • 原文地址:https://www.cnblogs.com/bianqi/p/7251446.html
Copyright © 2011-2022 走看看