zoukankan      html  css  js  c++  java
  • spark streaming 消费 kafka入门采坑解决过程

    kafka 服务相关的命令

    # 开启kafka的服务器
    bin/kafka-server-start.sh -daemon config/server.properties &
    # 创建topic
    bin/kafka-topics.sh --create --zookeeper bigdata-senior02.ibeifeng.com:2181 --replication-factor 1 --partitions 1 --topic orderTopic
    # 开启kafka的消费者
    bin/kafka-console-consumer.sh --zookeeper bigdata-senior02.ibeifeng.com:2181 --topic orderTopic --from-beginning
    # 开启kafka的生产者
    bin/kafka-console-producer.sh --broker-list bigdata-senior02.ibeifeng.com:9092 --topic orderTopic

    # 查看topic
    bin/kafka-topics.sh --zookeeper bigdata-senior02.ibeifeng.com:2181 --list

    # 标记删除kafka的topic
    bin/kafka-topics.sh --delete --zookeeper bigdata-senior02.ibeifeng.com:2181 --topic orderTopic

    环境准备(我使用的单机伪分布模式)

    首先开启zk,再开启kafka, 并启动kafka的服务

    ZK_HOME/bin/zkServer.sh start

    KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties &

    # 创建topic
    bin/kafka-topics.sh --create --zookeeper bigdata-senior02.ibeifeng.com:2181 --replication-factor 1 --partitions 1 --topic orderTopic

    # 查看topic
    bin/kafka-topics.sh --zookeeper bigdata-senior02.ibeifeng.com:2181 --list

    # 开启kafka的消费者
    bin/kafka-console-consumer.sh --zookeeper bigdata-senior02.ibeifeng.com:2181 --topic orderTopic --from-beginning
    # 开启kafka的生产者
    bin/kafka-console-producer.sh --broker-list bigdata-senior02.ibeifeng.com:9092 --topic orderTopic

    通过上面的测试,确保kafka可以正常运行

    spark streaming 代码编写(scala2.11.8,spark2.0.0,kafka1.1)

    maven依赖

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId> // 这里要特别注意自己的scala版本,不然会运行时会不兼容,
    <version>1.1.0</version>
    </dependency>

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
    </dependency>

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.1.0</version>
    </dependency>

    <!--&lt;!&ndash; https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka &ndash;&gt;-->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.11</artifactId>
    <version>1.6.3</version>
    </dependency>

    <!-- Spark Core -->
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
    </dependency>

    <!-- Spark SQL -->
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.0.0</version>
    </dependency>

    <!-- Spark Streaming -->
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.0.0</version>
    </dependency>

    生产端数据生成
    /**
    * 这是一个数据生产端
    *
    * 开启服务器,broker, 如果不开启这个会提示没有找到broker
    * bin/kafka-server-start.sh -daemon config/server.properties &
    *
    * 开启消费端
    * bin/kafka-console-consumer.sh --zookeeper bigdata-senior02.ibeifeng.com:2181 --topic orderTopic --from-beginning
    */
    //object OrderProductor {
    // def main(args: Array[String]): Unit = {
    //
    // val topic = "orderTopic"
    // val brokers = "bigdata-senior02.ibeifeng.com:9092"
    //
    // val props = new util.HashMap[String,Object]()
    // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
    // props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    // props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    //
    // val producer = new KafkaProducer[String,String](props)
    //
    //
    // // 每秒生成10个订单
    // while(true){
    // (1 to 10).foreach{messageNum =>
    // // 地区id, 订单id, 订单金额, 订单时间
    // val str = messageNum + "," + Random.nextInt(10)+","+Math.round(Random.nextDouble()*100)+","+ new Date().getTime
    // val message = new ProducerRecord[String, String](topic,null,str)
    // producer.send(message)
    // }
    //
    // Thread.sleep(1000)
    // }
    //
    // }
    //
    //}

    // Produces some random words between 1 and 100.
    object KafkaWordCountProducer {

    def main(args: Array[String]) {

    val topic = "orderTopic"
    val brokers = "bigdata-senior02.ibeifeng.com:9092"
    val messagesPerSec = 10
    val wordsPerMessage = 5

    val props = new util.HashMap[String,Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    while(true) {
    (1 to messagesPerSec.toInt).foreach { messageNum =>
    val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
    .mkString(" ")

    val message = new ProducerRecord[String, String](topic, null, str)
    producer.send(message)
    }

    Thread.sleep(1000)
    }
    }

    }

    消费数据
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._

    /**
    * Consumes messages from one or more topics in Kafka and does wordcount.
    * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
    * <zkQuorum> is a list of one or more zookeeper servers that make quorum
    * <group> is the name of kafka consumer group
    * <topics> is a list of one or more kafka topics to consume from
    * <numThreads> is the number of threads the kafka consumer should use
    *
    * Example:
    * `$ bin/run-example
    * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03
    * my-consumer-group topic1,topic2 1`
    */
    object KafkaWordCount {
    def main(args: Array[String]) {

    val zkQuorum = "bigdata-senior02.ibeifeng.com:2181"
    val group = "g1"
    val topics = "orderTopic"
    val numThreads = 2

    val conf= new SparkConf().setAppName("StatelessWordCount").setMaster("local[2]") // 核数至少给2,否则不会完成计算
    val ssc = new StreamingContext(conf,Seconds(2)) // 两秒进行一个批次

    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
    val wc = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap).map(_._2)
    .flatMap(_.split(" "))
    .map((_,1))
    .reduceByKey(_+_)
    .foreachRDD(x=>x.foreach(println))

    ssc.start()
    ssc.awaitTermination()
    }
    }

    以上就是spark streaming 消费 kafka的helloworld了
    ===============================================================================================================================
    注意点:
    1. 一定要选择兼容的版本,否则会出现各种各样奇奇怪怪的问题
    2. 在这里卡了将近一周的时间,都是因为上面版本不兼容和导包的时候,出现的失误
    3. 如果代码没有问题,没有出现运行时异常,看看版本的兼容性入手,或许更容易找到问题
    代码在我的github上,有问题请留言
    https://github.com/nulijiushimeili/spark01


  • 相关阅读:
    Python IDE
    python 3.x 不再提供raw_print()
    Python代码风格建议(转)
    在JSP页面中输出JSON格式数据
    MyEclipse 10 优化
    MB/GB/TB/PB/EB/ZB/YB/NB/DB/CB存储空间都是多大?如何换算?
    为什么民众不以偷税为耻_岑科
    彻底解剖人民币升值问题_岑科
    script 加载顺序问题的延展研究
    如何快速实现 markdown 转 HTML 文档?
  • 原文地址:https://www.cnblogs.com/nulijiushimeili/p/9317030.html
Copyright © 2011-2022 走看看