zoukankan      html  css  js  c++  java
  • Spark-Streaming kafka count 案例

    Streaming 统计来自 kafka 的数据,这里涉及到的比较,kafka 的数据是使用从 flume 获取到的,这里相当于一个小的案例。

    1. 启动 kafka

    Spark-Streaming hdfs count 案例
    

    2. 启动 flume

    flume-ng agent -c conf -f conf/kafka_test.conf -n a1 -Dflume.root.logger=INFO,console
    

      flume 配置文件如下

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -f /root/code/flume_exec_test.txt
    
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.brokerList=master:9092
    a1.sinks.k1.topic=kaka
    a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

      这里 flume 是的数据是来自一个文件,只要这个文件有数据进入,就会被flume监控到,测试的时候只需要往这个文件里写数据就可以了。

    3. 启动 kafka 消费者来观察

    kafka-console-consumer.sh --bootstrap-server master:9092 --topic kaka
    

    4. 下面就是 Streaming 的统计代码

    package com.hw.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
    
    object KafkaWordCount {
      def main(args: Array[String]): Unit = {
        if (args.length < 4) {
          System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
          System.exit(1)
        }
    
        val Array(zkQuorum, group, topics, numThreads) = args
        val sparkConf = new SparkConf().setAppName("KafkaWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
    
        val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
        val words = lines.flatMap(_.split(",")(1))
    //    窗口大小10秒,滑动大小2秒,这里的窗口大小一定要是滑动大小的倍数关系才行
        val wordCounts = words.map((_, 1L)).reduceByKeyAndWindow(_ + _,_ - _,Seconds(10), Seconds(2))
        wordCounts.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    

    5. 执行脚本

    # kafka count bash
    $SPARK_HOME/bin/spark-submit
            --class com.hw.streaming.KafkaWordCount
            --master yarn-cluster 
            --executor-memory 1G 
            --total-executor-cores 2 
            --files $HIVE_HOME/conf/hive-site.xml 
            --jars $HIVE_HOME/lib/mysql-connector-java-5.1.25-bin.jar,$SPARK_HOME/jars/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/jars/datanucleus-core-3.2.10.jar,$SPARK_HOME/jars/datanucleus-rdbms-3.2.9.jar,$SPARK_HOME/jars/guava-14.0.1.jar 
            ./SparkPro-1.0-SNAPSHOT-jar-with-dependencies.jar 
            master:2181 group_id_1 kaka 1
    

    6. 写数据,写到对应flume 监控的文件就行

    import random
    import time
    readFileName="/root/orders.csv"
    writeFileName="/root/code/flume_exec_test.txt"
    with open(writeFileName,'a+')as wf:
        with open(readFileName,'rb') as f:
            for line in f.readlines():
                for word in line.split(" "):
                    ss = line.strip()
                    if len(ss)<1:
                        continue
                    wf.write(ss+'
    ')
                rand_num = random.random()
                time.sleep(rand_num)
    

    7. 观察消费者是否消费到数据,在执行脚本的时候发现以下错误,一个是窗口时间的问题,一个是要设置 checkpoint。

    窗口时间设置不对,会报以下错误

    User class threw exception: java.lang.IllegalArgumentException: requirement failed: The window duration of ReducedWindowedDStream (3000 ms) must be multiple of the slide duration of parent DStream (10000 ms)
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.streaming.dstream.ReducedWindowedDStream.<init>(ReducedWindowedDStream.scala:39)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$6.apply(PairDStreamFunctions.scala:348)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$6.apply(PairDStreamFunctions.scala:343)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:693)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scala:343)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$5.apply(PairDStreamFunctions.scala:311)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$5.apply(PairDStreamFunctions.scala:311)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:693)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
    at org.apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scala:310)
    at com.badou.streaming.KafkaWordCount$.main(KafkaWordCount.scala:22)
    at com.badou.streaming.KafkaWordCount.main(KafkaWordCount.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
    

    错误修改,需要将窗口时间设置成滑动时间的倍数。上面给出的脚本已经是修改过的,如果安装上面的步骤操作,就不会报这个错误了。

    如果没有增加 checkpoint,也会报错,报错如下:

    requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
    

    设置相应的 checkpoint 即可。

    # 在统计代码中加入下面这个语句
    # val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.setCheckPoint("/root/checkpoint")
    

    如果以上执行完成,可以在浏览器中查看日志,会看到对应的统计信息。 

    # 登录 192.168.56.122:8080
    # 查看对应的日志信息
    

    总结,在测试的时候,启动 flume 的时候遇到了一个错误,错误如下:

    [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)] 
    Error while fetching metadata     partition 4     leader: none    replicas:       isr
    :    isUnderReplicated: false for topic partition [default-flume-topic,4]: 
    [class kafka.common.LeaderNotAvailableException]

    遇到这个错误的原因主要是 flume 配置文件中,设置的 kafka sink 不对导致的,可以看到本应该监听的 topic 是 kaka,但是这里监控的却是默认的 default-flume-topic,经过检查终于发现错误是由于不细心导致的,把 sinks 写成 sink 了,一定要注意细节,一定要学会看日志。

  • 相关阅读:
    2020年下半年学习进度04
    2020年下半年学习进度03
    2020年下半年学习进度02
    2020年下半年学习进度01
    数据爬取
    个人课程总结
    Syncnavigator V8.6.2在线说明书
    SyncNavigator V8.6.2企业版下载链接
    Syncnavigator V8.6.2帮助文档(说明书)下载
    SQL Server 自动同步到 MySQL
  • 原文地址:https://www.cnblogs.com/hanwen1014/p/11260456.html
Copyright © 2011-2022 走看看