zoukankan      html  css  js  c++  java
  • Spark-Streaming kafka count 案例

    Streaming 统计来自 kafka 的数据,这里涉及到的比较,kafka 的数据是使用从 flume 获取到的,这里相当于一个小的案例。

    1. 启动 kafka

    Spark-Streaming hdfs count 案例
    

    2. 启动 flume

    flume-ng agent -c conf -f conf/kafka_test.conf -n a1 -Dflume.root.logger=INFO,console
    

      flume 配置文件如下

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -f /root/code/flume_exec_test.txt
    
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.brokerList=master:9092
    a1.sinks.k1.topic=kaka
    a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

      这里 flume 是的数据是来自一个文件,只要这个文件有数据进入,就会被flume监控到,测试的时候只需要往这个文件里写数据就可以了。

    3. 启动 kafka 消费者来观察

    kafka-console-consumer.sh --bootstrap-server master:9092 --topic kaka
    

    4. 下面就是 Streaming 的统计代码

    package com.hw.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
    
    object KafkaWordCount {
      def main(args: Array[String]): Unit = {
        if (args.length < 4) {
          System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
          System.exit(1)
        }
    
        val Array(zkQuorum, group, topics, numThreads) = args
        val sparkConf = new SparkConf().setAppName("KafkaWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
    
        val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
        val words = lines.flatMap(_.split(",")(1))
    //    窗口大小10秒,滑动大小2秒,这里的窗口大小一定要是滑动大小的倍数关系才行
        val wordCounts = words.map((_, 1L)).reduceByKeyAndWindow(_ + _,_ - _,Seconds(10), Seconds(2))
        wordCounts.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    

    5. 执行脚本

    # kafka count bash
    $SPARK_HOME/bin/spark-submit
            --class com.hw.streaming.KafkaWordCount
            --master yarn-cluster 
            --executor-memory 1G 
            --total-executor-cores 2 
            --files $HIVE_HOME/conf/hive-site.xml 
            --jars $HIVE_HOME/lib/mysql-connector-java-5.1.25-bin.jar,$SPARK_HOME/jars/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/jars/datanucleus-core-3.2.10.jar,$SPARK_HOME/jars/datanucleus-rdbms-3.2.9.jar,$SPARK_HOME/jars/guava-14.0.1.jar 
            ./SparkPro-1.0-SNAPSHOT-jar-with-dependencies.jar 
            master:2181 group_id_1 kaka 1
    

    6. 写数据,写到对应flume 监控的文件就行

    import random
    import time
    readFileName="/root/orders.csv"
    writeFileName="/root/code/flume_exec_test.txt"
    with open(writeFileName,'a+')as wf:
        with open(readFileName,'rb') as f:
            for line in f.readlines():
                for word in line.split(" "):
                    ss = line.strip()
                    if len(ss)<1:
                        continue
                    wf.write(ss+'
    ')
                rand_num = random.random()
                time.sleep(rand_num)
    

    7. 观察消费者是否消费到数据,在执行脚本的时候发现以下错误,一个是窗口时间的问题,一个是要设置 checkpoint。

    窗口时间设置不对,会报以下错误

    User class threw exception: java.lang.IllegalArgumentException: requirement failed: The window duration of ReducedWindowedDStream (3000 ms) must be multiple of the slide duration of parent DStream (10000 ms)
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.streaming.dstream.ReducedWindowedDStream.<init>(ReducedWindowedDStream.scala:39)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$6.apply(PairDStreamFunctions.scala:348)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$6.apply(PairDStreamFunctions.scala:343)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:693)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scala:343)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$5.apply(PairDStreamFunctions.scala:311)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$5.apply(PairDStreamFunctions.scala:311)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:693)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scala:310)
    at com.badou.streaming.KafkaWordCount$.main(KafkaWordCount.scala:22)
    at com.badou.streaming.KafkaWordCount.main(KafkaWordCount.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
    

    错误修改,需要将窗口时间设置成滑动时间的倍数。上面给出的脚本已经是修改过的,如果安装上面的步骤操作,就不会报这个错误了。

    如果没有增加 checkpoint,也会报错,报错如下:

    requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
    

    设置相应的 checkpoint 即可。

    # 在统计代码中加入下面这个语句
    # val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.setCheckPoint("/root/checkpoint")
    

    如果以上执行完成,可以在浏览器中查看日志,会看到对应的统计信息。 

    # 登录 192.168.56.122:8080
    # 查看对应的日志信息
    

    总结,在测试的时候,启动 flume 的时候遇到了一个错误,错误如下:

    [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)] 
    Error while fetching metadata     partition 4     leader: none    replicas:       isr
    :    isUnderReplicated: false for topic partition [default-flume-topic,4]: 
    [class kafka.common.LeaderNotAvailableException]

    遇到这个错误的原因主要是 flume 配置文件中,设置的 kafka sink 不对导致的,可以看到本应该监听的 topic 是 kaka,但是这里监控的却是默认的 default-flume-topic,经过检查终于发现错误是由于不细心导致的,把 sinks 写成 sink 了,一定要注意细节,一定要学会看日志。

  • 相关阅读:
    cf1100 F. Ivan and Burgers
    cf 1033 D. Divisors
    LeetCode 17. 电话号码的字母组合
    LeetCode 491. 递增的子序列
    LeetCode 459.重复的子字符串
    LeetCode 504. 七进制数
    LeetCode 3.无重复字符的最长子串
    LeetCode 16.06. 最小差
    LeetCode 77. 组合
    LeetCode 611. 有效三角形个数
  • 原文地址:https://www.cnblogs.com/hanwen1014/p/11260456.html
Copyright © 2011-2022 走看看