zoukankan      html  css  js  c++  java
  • SparkStreaming整合kafka编程

    1、下载spark-streaming-kafka插件包

    由于Linux集群环境我使用spark是spark-2.1.1-bin-hadoop2.7,kafka是kafka_2.11-0.8.2.1,所以我下载的是spark-streaming-kafka-0-8_2.11-2.1.1.jar。

    官网下载地址:http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.1.1

    百度云下载地址:链接:http://pan.baidu.com/s/1o83DOHO 密码:2dgx

    2、整合spark和kafka的jar包

    2.1添加spark-streaming-kafka插件包

    新建一个lib目录,首先把1步骤下载的spark-streaming-kafka-0-8_2.11-2.1.1.jar放进去

    如图:

    2.2添加spark依赖包

    找到spark-2.1.1-bin-hadoop2.7/jars目录下所有的jar包,如图:

    把spark-2.1.1-bin-hadoop2.7/jars目录下所有的jar包复制到上述新建的lib目录下,如图:

    2.3添加kafka依赖包

    找到kafka_2.11-0.8.2.1/libs目录下所有的jar包,如图:

    把kafka_2.11-0.8.2.1/libs目录下所有的jar包复制到上述新建的lib目录下,如图:

    3、新建测试工程

    新建scala project,引用上述lib目录下的所有jar包;新建一个KafkaWordCount.scala用于测试:

    
    
    1. import org.apache.spark.streaming.StreamingContext
    2. import org.apache.spark.SparkConf
    3. import org.apache.spark.streaming.kafka.KafkaUtils
    4. import org.apache.spark.streaming.Seconds
    5. import org.apache.spark.streaming.Minutes
    6. import org.apache.spark.SparkContext
    7. import kafka.serializer.StringDecoder
    8.  
    9. object KafkaWordCount {
    10.   def main(args: Array[String]) {
    11.     val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    12.     sparkConf.set("spark.port.maxRetries","128")
    13.     val ssc = new StreamingContext(sparkConf, Seconds(2))
    14.     ssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint")
    15.     val zkQuorum = "192.168.168.200:2181"
    16.     val group = "test-group"
    17.     val topics = "test"
    18.     val numThreads = 1
    19.     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    20.     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    21.     val words = lines.flatMap(_.split(" "))
    22.     val wordCounts = words.map(=> (x, 1L))
    23.       .reduceByKeyAndWindow(+ _, _ - _, Minutes(10), Seconds(2), 2)
    24.     wordCounts.print()
    25.     ssc.start()
    26.     ssc.awaitTermination()          
    27.   }
    28. }

    如图:

    启动spark集群和kafka集群,默认已经开启,默认kafka有test主题,这是默认要会的,在这里不在详述。

    运行成功,如图:

    
    
    1. SLF4J: Class path contains multiple SLF4J bindings.
    2. SLF4J: Found binding in [jar:file:/I:/001sourceCode/020SparkStreaming/%e5%a4%a7%e6%95%b0%e6%8d%ae%e5%bc%80%e5%8f%91%e6%96%b9%e6%a1%88%e8%b5%84%e6%96%99%ef%bc%88%e5%a4%a9%e7%bb%b4%e5%b0%94%ef%bc%89/%e5%bc%80%e5%8f%91%e6%89%80%e9%9c%80jar%e5%8c%85/lib/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    3. SLF4J: Found binding in [jar:file:/I:/001sourceCode/020SparkStreaming/%e5%a4%a7%e6%95%b0%e6%8d%ae%e5%bc%80%e5%8f%91%e6%96%b9%e6%a1%88%e8%b5%84%e6%96%99%ef%bc%88%e5%a4%a9%e7%bb%b4%e5%b0%94%ef%bc%89/%e5%bc%80%e5%8f%91%e6%89%80%e9%9c%80jar%e5%8c%85/lib/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    4. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    5. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    6. -------------------------------------------
    7. Time: 1499667652000 ms
    8. -------------------------------------------
    9.  
    10. -------------------------------------------
    11. Time: 1499667654000 ms
    12. -------------------------------------------
    13.  
    14. -------------------------------------------
    15. Time: 1499667656000 ms
    16. -------------------------------------------

    4、接收kafka的主题消息

    启动一个kafka的生产者客户端:

    
    
    1. [root@master ~]# kafka-console-producer.sh --broker-list 192.168.168.200:9092 --topic test
    2. test success
    3. spark
    4. kafka

    运行日志如下:

    
    
    1. -------------------------------------------
    2. Time: 1499667830000 ms
    3. -------------------------------------------
    4.  
    5. -------------------------------------------
    6. Time: 1499667832000 ms
    7. -------------------------------------------
    8. (test,1)
    9. (success,1)
    10.  
    11. -------------------------------------------
    12. Time: 1499667834000 ms
    13. -------------------------------------------
    14. (test,1)
    15. (success,1)
    16.  
    17. -------------------------------------------
    18. Time: 1499667836000 ms
    19. -------------------------------------------
    20. (test,1)
    21. (spark,1)
    22. (success,1)
    23.  
    24. -------------------------------------------
    25. Time: 1499667838000 ms
    26. -------------------------------------------
    27. (kafka,1)
    28. (test,1)
    29. (spark,1)
    30. (success,1)

    5、sparkStreaming收不到kafka主题消息

    如果出现kakfa的消费者客户端可以收到消息,而spark的消费者客户端收不到消息,后台也没有报错,那么要仔细检查kafka_home/conf目录下的server.properties,有没有配置:

    
    
    1. ############################# Socket Server Settings #############################
    2. # The port the socket server listens on
    3. port=9092
    4. # Hostname the broker will bind to. If not set, the server will bind to all interfaces
    5. host.name=192.168.168.200
    一定要配置host.name,否则只能在kafk消费客户端收到消息,不能在sparkStreaming创建的topic消息客户端收到。

    6、sbtassembly打包代码并上传到spark运行

    可参考以下资料:

    使用SBT构建Scala项目

    本地开发spark代码上传spark集群服务并运行

  • 相关阅读:
    虚函数和纯虚函数
    MS CRM 2011中PartyList类型字段的实例化
    MS CRM 2011的自定义与开发(12)——表单脚本扩展开发(4)
    MS CRM 2011的自定义与开发(12)——表单脚本扩展开发(2)
    MS CRM 2011的自定义和开发(10)——CRM web服务介绍(第二部分)——IOrganizationService(二)
    MS CRM 2011 SDK 5.08已经发布
    MS CRM 2011 Q2的一些更新
    最近很忙
    Microsoft Dynamics CRM 2011最近的一些更新
    补一篇,Update Rollup 12 终于发布了
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723896.html
Copyright © 2011-2022 走看看