zoukankan      html  css  js  c++  java
  • 使用sbt构建spark 程序

    今日在学习scala和spark相关的知识。之前在eclipse下编写了wordcount程序。但是关于导出jar包这块还是很困惑。于是学习sbt构建scala。

    关于sbt的介绍网上有很多的资料,这里就不解释了。参考:http://wiki.jikexueyuan.com/project/sbt-getting-started/install-sbt.html

    关于linux下(centos)安装sbt: 依次执行

    curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
    sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/
    sudo yum install sbt

    使用sbt需要按照sbt的要求生成相关的目录:

    其中,kafka是项目根目录,build.sbt的内容如下:

    name := "test"
    
    version := "1.0"
    
    scalaVersion := "2.10.5"
    
    libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2" % "provided"
    
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2" % "provided"
    
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.2" % "provided"
    
    libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2"  //注意版本
    

     scalaVersion是指定编译程序的scala版本,因为这里用的是spark1.6.2,所以对应的scala版本为2.10.5

    libraryDependencies 是指程序的库依赖,最后的provided 的意思是,spark内已经提供了这几个库,打包时,无需考虑这几个。

    src是项目源代码所在位置:

    KafkaWordCount.scala内容如下:

    import java.util.HashMap
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf
    
    object KafkaWordCount {
      def main(args: Array[String]) {
        if (args.length < 4) {
          System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
          System.exit(1)
        }
    
        val Array(zkQuorum, group, topics, numThreads) = args
        val sparkConf = new SparkConf().setAppName("KafkaWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint("checkpoint")
    
        val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1L))
          .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
        wordCounts.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

     在项目的根目录,即kafka目录下面,运行: sbt compile 对项目进行编译。sbt package 导出jar包。

    在spark目录运行:

    ./bin/spark-submit --master spark://192.168.1.241:7077 --class KafkaWordCount /root/kafka/target/scala-2.10/test_2.10-1.0.jar 127.0.0.1:2181 2 2 2

    这样运行出错,提示没有kafkaUtil这个类,网上查了下,是使用package打包时,并没有将依赖的jar包打成一个,因此需要使用assembly插件

    并且运行的时候,会有类似错误:

    Exception in thread "main" org.apache.spark.SparkException:
    Checkpoint RDD CheckpointRDD[85] at foreachRDD at WebPagePopularityValueCalculator.scala:68(0)
    has different number of partitions than original RDD MapPartitionsRDD[81] at updateStateByKey at WebPagePopularityValueCalculator.scala:62(2)

    这是因为在集群模式运行时,需要将checkpoint文件夹设置为hdfs类似的路径。解决方法为:使用hdfs的路径: hdfs:ip:9000/data

    关于assembly的介绍: http://blog.csdn.net/beautygao/article/details/32306637

     参考:http://stackoverflow.com/questions/27198216/sbt-assembly-deduplicate-error-exclude-error

         http://blog.csdn.net/ldds_520/article/details/51443606

       http://www.cnblogs.com/scnu-ly/p/5106726.html

  • 相关阅读:
    明暗文切换(密码输入框)遇到的坑
    iOS11适配tableView顶部空白
    macOS升级到high Sierra后, Cocoapods不能使用解决办法
    Xcode插件失效以后的处理方法
    iOS正确使用const,static,extern
    centos7安装magento随记 这就是个坑,果断放弃
    关于迅雷试用短租日租会员的一些渠道收集
    json中含有Unicode的处理办法 C#
    c#中奖算法的实现
    2016年最新mac下vscode配置golang开发环境支持debug
  • 原文地址:https://www.cnblogs.com/missmzt/p/6002858.html
Copyright © 2011-2022 走看看