zoukankan      html  css  js  c++  java
  • Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用

    前言

    Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。 在Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。 本文的目标是写一个基于kafka的scala工程,在一个spark standalone的集群环境中运行。

    项目结构和文件说明

    说明

    这个工程包含了两个应用。 一个Consumer应用:CusomerApp - 实现了通过Spark的Stream+Kafka的技术来实现处理消息的功能。 一个Producer应用:ProducerApp - 实现了向Kafka集群发消息的功能。

    文件结构

    KafkaSampleApp   # 项目目录
    |-- build.bat    # build文件    
    |-- src
        |-- main
            |-- scala
                |-- ConsumerApp.scala  # Consumer应用
                |-- ProducerApp.scala  # Producer应用

    构建工程目录

    可以运行:

    mkdir KafkaSampleApp
    mkdir -p /KafkaSampleApp/src/main/scala

    代码

    build.sbt

    name := "kafka-sample-app"
     
    version := "1.0"
     
    scalaVersion := "2.11.8"
    
    scalacOptions += "-feature"
    scalacOptions += "-deprecation"
    scalacOptions += "-language:postfixOps"
     
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % "2.0.0",
      "org.apache.spark" %% "spark-streaming" % "2.0.0",
      "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.0",
      "org.apache.kafka" %% "kafka" % "0.8.2.1"
    )

    CusomerApp.scala

    这个例子中使用了Spark自带的Stream+Kafka结合的技术,有个限制的绑定了kafka的8.x版本。 我个人建议只用Kafka的技术,写一个Consomer,或者使用其自带的Consumer,来接受消息。 然后再使用Spark的技术。 这样可以跳过对kafak版本的限制。

    import java.util.Properties
    import _root_.kafka.serializer.StringDecoder
    
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf
    
    object ConsumerApp {
      def main(args: Array[String]) {
        val brokers = "localhost:9092"
        val topics = "test-topic"
    
        // Create context with 10 second batch interval
        val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(10))
    
        // Create direct kafka stream with brokers and topics
        val topicsSet = topics.split(",").toSet
        val kafkaParams = Map[String, String]("bootstrap.servers" -> brokers)
        val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet)
    
        // Get the lines, split them into words, count the words and print
        val lines = messages.map(_._2)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
        println("============== Start ==============")
        wordCounts.print
        println("============== End   ==============")
    
        // Start the computation
        ssc.start()
        ssc.awaitTermination()
      }
    }

    ProducerApp.scala

    import java.util.Arrays
    import java.util.List
    import java.util.Properties
    import org.apache.kafka.clients.producer._
    
    object ProducerApp {
      def main(args: Array[String]): Unit = {
    
        val props = new Properties()
        // Must-have properties
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    
        // Optional properties
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none")
        props.put(ProducerConfig.SEND_BUFFER_CONFIG, (1024*100).toString)
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, (100).toString)
        props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, (5*60*1000L).toString)
        //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, (60*1000l).toString)
        props.put(ProducerConfig.ACKS_CONFIG, (0).toString)
        //props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, (1500).toString)
        props.put(ProducerConfig.RETRIES_CONFIG, (3).toString)
        props.put(ProducerConfig.LINGER_MS_CONFIG, (1000).toString)
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (32 * 1024 * 1024L).toString)
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, (200).toString)
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-app-producer")
    
        val producer = new KafkaProducer[String, String](props)
    
        // Thread hook to close produer
        Runtime.getRuntime.addShutdownHook(new Thread() {
          override def run() {
            producer.close()
          }
        })
    
        // send 10 messages
        var i = 0
        for( i <- (1 to 10)) {
          val data = new ProducerRecord[String, String]("test-topic", "test-key", s"test-message $i")
          producer.send(data)
        }
    
        // Reduce package lost
        Thread.sleep(1000)
        producer.close()
      }
    }

    构建工程

    进入目录KafkaSampleApp。运行:

    sbt package

    第一次运行时间会比较长。

    测试应用

    启动Kafka服务

    # Start zookeeper server
    gnome-terminal -x sh -c '$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties; bash'
    
    # Wait zookeeper server is started.
    sleep 8s
    
    # Start kafka server
    gnome-terminal -x sh -c '$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties; bash'
    
    # Wait kafka server is started.
    sleep 5s

    注:使用Ctrl+C可以中断服务。

    • 创建一个topic
    # Create a topic
    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
    
    # List topics
    $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181

    启动Spark服务

    • 启动spark集群master server
    $SPARK_HOME/sbin/start-master.sh

    master服务,默认会使用7077这个端口。可以通过其日志文件查看实际的端口号。

    • 启动spark集群slave server
    $SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077

    启动Consumer应用

    新起一个终端,来运行:

    $SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --master spark://$(hostname):7077 --class ConsumerApp target/scala-2.11/kafka-sample-app_2.11-1.0.jar

    注:如果定义的topic没有create,第一次运行会失败,再运行一次就好了。 如果出现java.lang.NoClassDefFoundError错误, 请参照Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境, 确保kafka的包在Spark中设置好了。

    启动Producer应用

    java -classpath ./target/scala-2.11/kafka-sample-app_2.11-1.0.jar:$KAFKA_HOME/libs/* ProducerApp
    # or
    # $KAFKA_HOME/bin/kafka-run-class.sh -classpath ./target/scala-2.11/kafka-sample-app_2.11-1.0.jar:$KAFKA_HOME/libs/* ProducerApp

    然后:看看Consumer应用是否收到了消息。

    总结

    建议写一个Kafka的Consumer,然后调用Spark功能,而不是使用Spark的Stream+Kafka的编程方式。 好处是可以使用最新版本的Kafka。

    Kafka的包中带有一个Sample代码,可以从中学习一些编写程序的方法。

    参照

  • 相关阅读:
    基本技能训练之线程
    关于UEditor的使用配置(图片上传配置)
    PAT 乙级练习题1002. 写出这个数 (20)
    codeforces 682C Alyona and the Tree DFS
    codeforces 681D Gifts by the List dfs+构造
    codeforces 678E Another Sith Tournament 概率dp
    codeforces 680E Bear and Square Grid 巧妙暴力
    codeforces 678D Iterated Linear Function 矩阵快速幂
    codeforces 679A Bear and Prime 100 交互
    XTUOJ 1248 TC or CF 搜索
  • 原文地址:https://www.cnblogs.com/liuys635/p/11052860.html
Copyright © 2011-2022 走看看