zoukankan      html  css  js  c++  java
  • SparkStreaming整合kafka编程

    1、下载spark-streaming-kafka插件包

    由于Linux集群环境我使用spark是spark-2.1.1-bin-hadoop2.7,kafka是kafka_2.11-0.8.2.1,所以我下载的是spark-streaming-kafka-0-8_2.11-2.1.1.jar。

    官网下载地址:http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.1.1

    百度云下载地址:链接:http://pan.baidu.com/s/1o83DOHO 密码:2dgx

    2、整合spark和kafka的jar包

    2.1添加spark-streaming-kafka插件包

    新建一个lib目录,首先把1步骤下载的spark-streaming-kafka-0-8_2.11-2.1.1.jar放进去

    如图:

    2.2添加spark依赖包

    找到spark-2.1.1-bin-hadoop2.7/jars目录下所有的jar包,如图:

    把spark-2.1.1-bin-hadoop2.7/jars目录下所有的jar包复制到上述新建的lib目录下,如图:

    2.3添加kafka依赖包

    找到kafka_2.11-0.8.2.1/libs目录下所有的jar包,如图:

    把kafka_2.11-0.8.2.1/libs目录下所有的jar包复制到上述新建的lib目录下,如图:

    3、新建测试工程

    新建scala project,引用上述lib目录下的所有jar包;新建一个KafkaWordCount.scala用于测试:

    
    
    1. import org.apache.spark.streaming.StreamingContext
    2. import org.apache.spark.SparkConf
    3. import org.apache.spark.streaming.kafka.KafkaUtils
    4. import org.apache.spark.streaming.Seconds
    5. import org.apache.spark.streaming.Minutes
    6. import org.apache.spark.SparkContext
    7. import kafka.serializer.StringDecoder
    8.  
    9. object KafkaWordCount {
    10.   def main(args: Array[String]) {
    11.     val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    12.     sparkConf.set("spark.port.maxRetries","128")
    13.     val ssc = new StreamingContext(sparkConf, Seconds(2))
    14.     ssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint")
    15.     val zkQuorum = "192.168.168.200:2181"
    16.     val group = "test-group"
    17.     val topics = "test"
    18.     val numThreads = 1
    19.     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    20.     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    21.     val words = lines.flatMap(_.split(" "))
    22.     val wordCounts = words.map(=> (x, 1L))
    23.       .reduceByKeyAndWindow(+ _, _ - _, Minutes(10), Seconds(2), 2)
    24.     wordCounts.print()
    25.     ssc.start()
    26.     ssc.awaitTermination()          
    27.   }
    28. }

    如图:

    启动spark集群和kafka集群,默认已经开启,默认kafka有test主题,这是默认要会的,在这里不在详述。

    运行成功,如图:

    
    
    1. SLF4J: Class path contains multiple SLF4J bindings.
    2. SLF4J: Found binding in [jar:file:/I:/001sourceCode/020SparkStreaming/%e5%a4%a7%e6%95%b0%e6%8d%ae%e5%bc%80%e5%8f%91%e6%96%b9%e6%a1%88%e8%b5%84%e6%96%99%ef%bc%88%e5%a4%a9%e7%bb%b4%e5%b0%94%ef%bc%89/%e5%bc%80%e5%8f%91%e6%89%80%e9%9c%80jar%e5%8c%85/lib/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    3. SLF4J: Found binding in [jar:file:/I:/001sourceCode/020SparkStreaming/%e5%a4%a7%e6%95%b0%e6%8d%ae%e5%bc%80%e5%8f%91%e6%96%b9%e6%a1%88%e8%b5%84%e6%96%99%ef%bc%88%e5%a4%a9%e7%bb%b4%e5%b0%94%ef%bc%89/%e5%bc%80%e5%8f%91%e6%89%80%e9%9c%80jar%e5%8c%85/lib/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    4. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    5. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    6. -------------------------------------------
    7. Time: 1499667652000 ms
    8. -------------------------------------------
    9.  
    10. -------------------------------------------
    11. Time: 1499667654000 ms
    12. -------------------------------------------
    13.  
    14. -------------------------------------------
    15. Time: 1499667656000 ms
    16. -------------------------------------------

    4、接收kafka的主题消息

    启动一个kafka的生产者客户端:

    
    
    1. [root@master ~]# kafka-console-producer.sh --broker-list 192.168.168.200:9092 --topic test
    2. test success
    3. spark
    4. kafka

    运行日志如下:

    
    
    1. -------------------------------------------
    2. Time: 1499667830000 ms
    3. -------------------------------------------
    4.  
    5. -------------------------------------------
    6. Time: 1499667832000 ms
    7. -------------------------------------------
    8. (test,1)
    9. (success,1)
    10.  
    11. -------------------------------------------
    12. Time: 1499667834000 ms
    13. -------------------------------------------
    14. (test,1)
    15. (success,1)
    16.  
    17. -------------------------------------------
    18. Time: 1499667836000 ms
    19. -------------------------------------------
    20. (test,1)
    21. (spark,1)
    22. (success,1)
    23.  
    24. -------------------------------------------
    25. Time: 1499667838000 ms
    26. -------------------------------------------
    27. (kafka,1)
    28. (test,1)
    29. (spark,1)
    30. (success,1)

    5、sparkStreaming收不到kafka主题消息

    如果出现kakfa的消费者客户端可以收到消息,而spark的消费者客户端收不到消息,后台也没有报错,那么要仔细检查kafka_home/conf目录下的server.properties,有没有配置:

    
    
    1. ############################# Socket Server Settings #############################
    2. # The port the socket server listens on
    3. port=9092
    4. # Hostname the broker will bind to. If not set, the server will bind to all interfaces
    5. host.name=192.168.168.200
    一定要配置host.name,否则只能在kafk消费客户端收到消息,不能在sparkStreaming创建的topic消息客户端收到。

    6、sbtassembly打包代码并上传到spark运行

    可参考以下资料:

    使用SBT构建Scala项目

    本地开发spark代码上传spark集群服务并运行

  • 相关阅读:
    laravel中get方式表单提交后, 地址栏数据重复的问题
    laravel中firstOrCreate的使用
    表单使用clone方法后, 原有select无法生效
    浏览器提示内容编码错误, 不支持此压缩格式
    web端访问文件没有权限的问题
    $(this)在ajax里面不生效的探究
    如何在github上提交pr
    IDEA配置文件的配置文件配置
    quartz定时任务时间设置
    通过 EXPLAIN 分析低效 SQL 的执行计划
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723896.html
Copyright © 2011-2022 走看看