zoukankan      html  css  js  c++  java
  • Spark Streaming整合Kafka

    Spark Streaming整合Kafka:

    0)摘要

      主要介绍了Spark Streaming整合Kafka,两种整合方式:Receiver-based和Direct方式。这里使用的是Kafka broker version 0.8.2.1,官方文档地址:(http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html),kafka介绍(https://www.cnblogs.com/truekai/p/11774847.html

    1)Kafka准备

    • 启动zookeeper
      ./zkServer.sh start
    • 启动kafka
      ./kafka-server-start.sh -daemon ../config/server.properties //后台启动
    • 创建topic
      ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic test
    • 通过控制台测试topic能否正常的生产和消费

           启动生产者脚本:
             ./kafka-console-producer.sh --broker-list hadoop:9092 --topic test

       启动消费者脚本:

        ./kafka-console-consumer.sh --zookeeper hadoop:2181 --topic test --from-beginning

      准备工作已经就绪。

    2)Receiver-based方式整合

    注意:这种方式为了保证数据不会丢失,需要开启Write Ahead Logs机制,开启后,接收数据的正确性只有被预写到日志以后Receive才会确认,可以从日志中恢复数据,会增加额外的开销。如何开启?设置SparkConf的“Spark Streaming writeAheadLog.enable”属性为“true”,这种模式基本被淘汰

    1 添加kafka依赖

            <!--        kafka依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>

    2 本地代码编写

     1 package flume_streaming
     2 
     3 import org.apache.spark.SparkConf
     4 import org.apache.spark.streaming.kafka._
     5 import org.apache.spark.streaming.{Durations, StreamingContext}
     6 
     7 /**
     8  * @Author: SmallWild
     9  * @Date: 2019/10/30 10:00
    10  * @Desc:
    11  */
    12 
    13 object kafkaReceiveWordCount {
    14   def main(args: Array[String]): Unit = {
    15     if (args.length != 4) {
    16       System.err.println("错误参数")
    17       System.exit(1)
    18     }
    19     //接收参数
    20     //numPartitions 线程数
    21     val Array(zkQuorum, groupId, topics, numPartitions) = args
    22     //一定不能使用local[1]
    23     val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafkaReceiveWordCount")
    24     val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
    25     //设置日志级别
    26     ssc.sparkContext.setLogLevel("WARN")
    27     //多个topic用,分开
    28     val topicMap = topics.split(",").map((_, numPartitions.toInt)).toMap
    29     //TODO 业务逻辑,简单进行wordcount,输出到控制台
    30     /**
    31      * * @param ssc       StreamingContext object
    32      * * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
    33      * * @param groupId   The group id for this consumer topic所在的组,可以设置为自己想要的名称
    34      * * @param topics    Map of (topic_name to numPartitions) to consume. Each partition is consumed
    35      * *                  in its own thread
    36      * * @param storageLevel  Storage level to use for storing the received objects
    37      * *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
    38      */
    39     val lineMap = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)
    40     lineMap.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
    41     ssc.start()
    42     ssc.awaitTermination()
    43   }
    44 }

    3 提交到服器上运行

      如果生产中没有联网,需要使用  --jars 传入kafka的jar包

    • 把项目达成jar包
    • 使用local模式提交,提交的脚本:
    提交到服务器上运行
     ./spark-submit --master local[2] /
     --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /
     --class flume_streaming.kafkaReceiveWordCount /
     /smallwild/app/SparkStreaming-1.0.jar /
     hadoop:2181 1 sparkStreaming 1

    4 运行结果

      首先在控制台,启动kafka生产者,输入一些单词,然后,启动SparkStreaming程序。

      

    3)Direct方式整合

    使用的是:Simple Consumer API,自己管理offset,把kfka看成存储数据的地方,根据offset去读。没有使用zk管理消费者的offset,spark自己管理,默认的offset在内存中,如果设置了checkpoint,那么也也有一份,一般要设置。Direct模式生成的Dstream中的RDD的并行度与读取的topic中的partition一致(增加topic的partition个数)

    注意点:

    • 没有使用receive,直接查询的kafka偏移量

    1 添加kafka依赖

            <!--        kafka依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>

    2 代码编写

     1 package kafka_streaming
     2 
     3 import kafka.serializer.StringDecoder
     4 import org.apache.spark.SparkConf
     5 import org.apache.spark.streaming.{Durations, StreamingContext}
     6 import org.apache.spark.streaming.kafka.KafkaUtils
     7 
     8 /**
     9  * @Author: SmallWild
    10  * @Date: 2019/10/31 21:21
    11  * @Desc:
    12  */
    13 object kafkaDirectWordCount {
    14 
    15   def main(args: Array[String]): Unit = {
    16     if (args.length != 2) {
    17       System.err.println("错误参数")
    18       System.exit(1)
    19     }
    20     //接收参数
    21     //numPartitions 线程数
    22     val Array(brokers, topics) = args
    23     //一定不能使用local[1]
    24     val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafkaDirectWordCount")
    25     val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
    26     //设置日志级别
    27     ssc.sparkContext.setLogLevel("WARN")
    28     //多个topic用,分开
    29     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers
    30     )
    31     val topicsa = topics.split(",").toSet
    32     /**
    33      *
    34      */
    35     val lineMap = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsa)
    36     //TODO 业务逻辑,简单进行wordcount,输出到控制台
    37     lineMap.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
    38     ssc.start()
    39     ssc.awaitTermination()
    40   }
    41 
    42 }
    View Code

    3 提交到服务器上运行和第一种方式是上面一样

    4 自己管理offset

      使用spark自己管理offset方便,但是当业务逻辑改变的时候,恢复就难了,需要自己手动编写代码管理offset

    4)总结

      注意两种模式差别,receive模式几乎被淘汰,可以扩展的地方,1)使程序具备高可用的能力,挂掉之后,能否从上次的状态恢复过来,2)手动管理offset,改变了业务逻辑也能从上次的状态恢复过来

      

    我不喜欢这个世界,我喜欢你
  • 相关阅读:
    BZOJ2821 作诗(Poetize) 【分块】
    BZOJ2724 蒲公英 【分块】
    Codeforces 17E Palisection 【Manacher】
    BZOJ2565 最长双回文串 【Manacher】
    Codeforces 25E Test 【Hash】
    CODEVS3013 单词背诵 【Hash】【MAP】
    HDU2825 Wireless Password 【AC自动机】【状压DP】
    HDU2896 病毒侵袭 【AC自动机】
    HDU3065 病毒侵袭持续中【AC自动机】
    HDU2222 Keywords Search 【AC自动机】
  • 原文地址:https://www.cnblogs.com/truekai/p/11769705.html
Copyright © 2011-2022 走看看