zoukankan      html  css  js  c++  java
  • scala spark-streaming整合kafka (spark 2.3 kafka 0.10)

     Maven组件如下:   

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.3.0</version>
    </dependency>

     官网代码如下:

    /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements. See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */

    // scalastyle:off println
    package org.apache.spark.examples.streaming

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka010._

    /**
    * Consumes messages from one or more topics in Kafka and does wordcount.
    * Usage: DirectKafkaWordCount <brokers> <topics>
    * <brokers> is a list of one or more Kafka brokers
    * <topics> is a list of one or more kafka topics to consume from
    *
    * Example:
    * $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port
    * topic1,topic2
    */
    object DirectKafkaWordCount {
    def main(args: Array[String]) {
    if (args.length < 2) {
    System.err.println(s"""
    |Usage: DirectKafkaWordCount <brokers> <topics>
    | <brokers> is a list of one or more Kafka brokers
    | <topics> is a list of one or more kafka topics to consume from
    |
    """.stripMargin)
    System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
    }
    }
    // scalastyle:on println

    运行以上代码出现如下错误等:

     Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.

      由错误可见,是因为没有设置kafka相关参数。

     把官网代码修改如下:

    package cn.xdf.userprofile.stream
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010._

    import scala.collection.mutable

    object DirectKafka {
    def main(args: Array[String]): Unit = {
    if (args.length < 2) {
    System.err.println(
    s"""
    |Usage: DirectKafkaWordCount <brokers> <topics>
    | <brokers> is a list of one or more Kafka brokers
    | <topics> is a list of one or more kafka topics to consume from
    |
    """.stripMargin)
    System.exit(1)
    }
    val Array(brokers,topics)=args

    var conf = new SparkConf()
    .setAppName("DirectKafka")
    .setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(2))

    val topicsSet=topics.split(",").toSet
    val kafkaParams=mutable.HashMap[String,String]()
    //必须添加以下参数,否则会报错
    kafkaParams.put("bootstrap.servers" ,brokers)
    kafkaParams.put("group.id", "group1")
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParams.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer")
    val messages=KafkaUtils.createDirectStream [String,String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String,String](topicsSet,kafkaParams
    )
    )
    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()

    }
    }

     运行过程如下:

     启动kafka

       bin/kafka-server-start ./etc/kafka/server.properties &

    [2018-10-22 11:24:14,748] INFO [GroupCoordinator 0]: Stabilized group group1 generation 1 (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)
    [2018-10-22 11:24:14,761] INFO [GroupCoordinator 0]: Assignment received from leader for group group1 for generation 1 (kafka.coordinator.group.GroupCoordinator)
    [2018-10-22 11:24:14,779] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: __consumer_offsets-40. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
    [2018-10-22 11:28:19,010] INFO [GroupCoordinator 0]: Preparing to rebalance group group1 with old generation 1 (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)
    [2018-10-22 11:28:19,013] INFO [GroupCoordinator 0]: Group group1 with generation 2 is now empty (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)
    [2018-10-22 11:29:29,424] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 11 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    [2018-10-22 11:39:29,414] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    [2018-10-22 11:49:29,414] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

    运行spark

     /usr/local/spark-2.3.0/bin/spark-submit --class cn.xdf.userprofile.stream.DirectKafka --master yarn --driver-memory 2g     --num-executors 1      --executor-memory 2g     --executor-cores 1  userprofile2.0.jar localhost:9092 test 

    2018-10-22 11:28:16 INFO  DAGScheduler:54 - Submitting 1 missing tasks from ResultStage 483 (ShuffledRDD[604] at reduceByKey at DirectKafka.scala:46) (first 15 tasks are for partitions Vector(1))
    2018-10-22 11:28:16 INFO  TaskSchedulerImpl:54 - Adding task set 483.0 with 1 tasks
    2018-10-22 11:28:16 INFO  TaskSetManager:54 - Starting task 0.0 in stage 483.0 (TID 362, localhost, executor driver, partition 1, PROCESS_LOCAL, 7649 bytes)
    2018-10-22 11:28:16 INFO  Executor:54 - Running task 0.0 in stage 483.0 (TID 362)
    2018-10-22 11:28:16 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 1 blocks
    2018-10-22 11:28:16 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 0 ms
    2018-10-22 11:28:16 INFO  Executor:54 - Finished task 0.0 in stage 483.0 (TID 362). 1091 bytes result sent to driver
    2018-10-22 11:28:16 INFO  TaskSetManager:54 - Finished task 0.0 in stage 483.0 (TID 362) in 4 ms on localhost (executor driver) (1/1)
    2018-10-22 11:28:16 INFO  TaskSchedulerImpl:54 - Removed TaskSet 483.0, whose tasks have all completed, from pool 
    2018-10-22 11:28:16 INFO  DAGScheduler:54 - ResultStage 483 (print at DirectKafka.scala:47) finished in 0.008 s
    2018-10-22 11:28:16 INFO  DAGScheduler:54 - Job 241 finished: print at DirectKafka.scala:47, took 0.009993 s
    -------------------------------------------
    Time: 1540178896000 ms
    -------------------------------------------

     启动生产者

    [root@master kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --topic test --broker-list localhost:9092
    >  hello you

    >  hello me

    查看结果:

    (hello,2)
    (me,1)
    (you,1)
    2018-10-22 11:57:08 INFO  JobScheduler:54 - Finished job streaming job 1540180628000 ms.0 from job set of time 1540180628000 ms
    2018-10-22 11:57:08 INFO  JobScheduler:54 - Total delay: 0.119 s for time 1540180628000 ms (execution: 0.072 s)
    2018-10-22 11:57:08 INFO  ShuffledRDD:54 - Removing RDD 154 from persistence list
    2018-10-22 11:57:08 INFO  MapPartitionsRDD:54 - Removing RDD 153 from persistence list
    2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 153
    2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 154
    2018-10-22 11:57:08 INFO  MapPartitionsRDD:54 - Removing RDD 152 from persistence list
    2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 152
    2018-10-22 11:57:08 INFO  MapPartitionsRDD:54 - Removing RDD 151 from persistence list
    2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 151
    2018-10-22 11:57:08 INFO  KafkaRDD:54 - Removing RDD 150 from persistence list
    2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 150
  • 相关阅读:
    时间复杂度和空间复杂度的故事
    Go -- 并发编程的两种限速方法
    Go -- type 和断言 interface{}转换
    Go -- 实现二叉搜索树
    Go语言程序的状态监控
    Go -- 中开启gctrace
    Go --- GC优化经验
    Mysql 性能优化20个原则(4)
    Mysql 性能优化20个原则(3)
    查看 activex 组件的方法
  • 原文地址:https://www.cnblogs.com/abcdwxc/p/9829385.html
Copyright © 2011-2022 走看看