zoukankan      html  css  js  c++  java
  • Error- Overloaded method value createDirectStream in error Spark Streaming打包报错

    直接上代码

    StreamingExamples.setStreamingLogLevels()
        val Array(brokers, topics) = args
    
        // Create context with 2 second batch interval
        // 创建conf,spark streaming至少要启动两个线程,一个负责接受数据,一个负责处理数据
        val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
    
        // 创建StreamingContext,每隔2秒产生一个批次
        val ssc = new StreamingContext(conf, Seconds(2));
    
        val topicsSet = topics.split(",").toSet
    
        // 配置Kafka参数
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    
        // 用直连方式读取Kafka数据,在Kafka中读取偏移量
        val messages = KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,// 位置策略(如果Kafka和spark程序在同一台机器,会从最优位置读取数据【当前位置】)
          ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))// 订阅策略(可以指定用正则的方式读取topic【topic-*】)
    
        //====================在下面写业务逻辑============================
        val lines = messages.map(_.value())
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x=>(x, 1L)).reduceByKey(_+_)
        wordCounts.print()
        //====================在上面写业务逻辑============================
    
        ssc.start()
        ssc.awaitTermination()
    

      打包报错

    Error:(44, 49) overloaded method value createDirectStream with alternatives:
      (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,locationStrategy: org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy: org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String],perPartitionConfig: org.apache.spark.streaming.kafka010.PerPartitionConfig)org.apache.spark.streaming.api.java.JavaInputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] <and>
      (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,locationStrategy: org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy: org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String])org.apache.spark.streaming.api.java.JavaInputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] <and>
      (ssc: org.apache.spark.streaming.StreamingContext,locationStrategy: org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy: org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String],perPartitionConfig: org.apache.spark.streaming.kafka010.PerPartitionConfig)org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] <and>
      (ssc: org.apache.spark.streaming.StreamingContext,locationStrategy: org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy: org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String])org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
     cannot be applied to (org.apache.spark.streaming.StreamingContext, org.apache.spark.streaming.kafka010.LocationStrategy, org.apache.spark.streaming.kafka010.ConsumerStrategy[Nothing,Nothing])
        val messages = KafkaUtils.createDirectStream[String, String](

    这是一个很长的信息,说主题需要设置[字符串],而不是设置[字符]。

    我能看到解决这个问题的最佳方法是:

    val topicsSet = topics.toString.split(",").toSet

    但是,如果你真的只有一个主题,那么只需按照上面的Set(topics)将字符串拆分成一组单个字符。

  • 相关阅读:
    AODV路由协议的路由缓存队列详解
    NS各种常用资料(转)
    Zigbee之旅(二):第一个CC2430程序——LED灯闪烁实验(转)
    计算机核心期刊一览【转】
    NS2中能量模型的添加
    Zigbee之旅(一):开天辟地(转)
    NS2能量模型
    Zigbee之旅(三):几个重要的CC2430基础实验——外部中断(转)
    如何画MDI主窗体的背景
    Speed up the display of Delphi list components
  • 原文地址:https://www.cnblogs.com/RzCong/p/11400749.html
Copyright © 2011-2022 走看看