zoukankan      html  css  js  c++  java
  • No output operations registered, so nothing to execute

    SparkStreaming和KafKa结合报错!报错之前代码如下:

     1 object KafkaWordCount{
     2    val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])])=>{
     3       iter.flatMap{case(x,y,z) => Some(y.sum+z.getOrElse(0)).map(i => (x,i))}
     4    }
     5    def main(args: Array[String]): Unit = {
     6       val Array(zkQuorum, group, topics, numThreads) = args
     7       val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
     8       val ssc = new StreamingContext(sparkConf, Seconds(5))
     9       ssc.checkpoint("c://ck2")
    10       val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    11       val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
    12       val words = data.map(_._2).flatMap(_.split(" "))
    13       val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    14       println(wordCounts)
    15       ssc.start()
    16       ssc.awaitTermination()
    17    }
    18 }

        注意:  在14行 应该是 wordCounts.print()  报错原因 :  在使用Streaming 的时候需要触发如下方法 print否则出现如下的错误

     1 17/07/28 17:11:59 ERROR StreamingContext: Error starting the context, marking it as stopped
     2 java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
     3     at scala.Predef$.require(Predef.scala:233)
     4     at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
     5     at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
     6     at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
     7     at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
     8     at org.bianqi.spark.day5.KafkaWordCount$.main(KafkaWordCount.scala:24)
     9     at org.bianqi.spark.day5.KafkaWordCount.main(KafkaWordCount.scala)
    10 Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    11     at scala.Predef$.require(Predef.scala:233)

       修改后的代码如下:

       

     1 object KafkaWordCount{
     2    val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])])=>{
     3       iter.flatMap{case(x,y,z) => Some(y.sum+z.getOrElse(0)).map(i => (x,i))}
     4    }
     5    def main(args: Array[String]): Unit = {
     6       val Array(zkQuorum, group, topics, numThreads) = args
     7       val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
     8       val ssc = new StreamingContext(sparkConf, Seconds(5))
     9       ssc.checkpoint("c://ck2")
    10       val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    11       val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
    12       val words = data.map(_._2).flatMap(_.split(" "))
    13       val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    14       wordCounts.print()
    15       ssc.start()
    16       ssc.awaitTermination()
    17    }
    18 }

    但是在stackoverflow上看到 报这样的错误 会是另外一在原因 具体地址如下:

       https://stackoverflow.com/questions/34188274/spark-no-output-operations-registered-so-nothing-to-execute

  • 相关阅读:
    LAMP搭建示例
    MySQL主从
    list多字段去重
    mysql按照某一个条件进行分组统计,同时又要保证一个相同字段的数据只统计一次
    sentinel自定义异常处理
    sentinel规则持久化
    zookeeper
    shiro
    iframe之间传递参数
    自定义标签
  • 原文地址:https://www.cnblogs.com/bianqi/p/7251446.html
Copyright © 2011-2022 走看看