zoukankan      html  css  js  c++  java
  • 【Spark】SparkStreaming和Kafka的整合


    Streaming和Kafka整合

    概述

    Kafka项目在0.8和0.10版本之间引入了一个新的消费者api,因此有两个单独的相应的Spark流媒体包。请为您的服务器和所需的功能选择正确的包;注意,0.8集成与后面的0.9和0.10代理兼容,但0.10集成与前面的代理不兼容。
    以上是spark官方文档给出的介绍
    更多详情可以参考官方文档:http://spark.apache.org/docs/2.2.0/streaming-kafka-integration.html
    想了解关于Kafka的更多信息可以参考文章:

    【Kafka】消息队列相关知识
    【Kafka】Kafka集群环境搭建
    【Kafka】Kafka集群基础操作!新手上路必备~
    【Kafka】Kafka简单介绍

    在这里插入图片描述
    SparkStreaming一般情况下更多地会和Kafka进行整合,而非Flume,因为Kafka可以实现数据的限流。
    从上图和官方文档可以了解到,SparkStreaming和Kafka整合有两个大版本:0.8版本0.10版本

    0.8版本接收数据有两种方式:
    Receiver DStream —— 是使用HighLevel API(高阶API) 进行消费,每隔一段时间就将offset值保存在ZooKeeper中,消费模式为at least once模式,有可能会造成重复消费的情况
    Direct DStream —— 使用LowLevel API(低阶 API) 进行消费,offset值保存在Kafka自带的默认topic中,每次消费都默认从最新的offset值进行消费,消费模式为at most once,可能会造成数据丢失的情况

    0.10版本只有一种接收数据的方式:
    Direct DStream —— 也是使用LowLevel API 进行消费,不过配合手动提交offset,实现exactly once消费模式,尽量避免重复消费和数据丢失的情况


    使用0.8版本下Receiver DStream接收数据进行消费

    步骤

    一、启动Kafka集群

    必须安装了JDK和ZooKeeper,并保证Zookeeper服务正常启动
    在三台机器都后台启动kafka服务

    cd /export/servers/kafka_2.11-1.0.0/
    
    bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
    
    二、创建maven工程,导入jar包
    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
    </properties>
    <dependencies>
        <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
    
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--    <verbal>true</verbal>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    三、创建一个kafka的topic
    bin/kafka-topics.sh  --create --partitions 3 --replication-factor 2 --topic sparkafka --zookeeper node01:2181,node02:2181,node03:2181
    
    四、启动kafka的Producer
    bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic sparkafka
    
    五、开发代码
    package cn.itcast.sparkstreaming.demo5_08
    
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    
    import scala.collection.immutable
    
    object SparkKafkaReceiverDStream {
      def main(args: Array[String]): Unit = {
    
        //获取SparkConf
        val sparkConf: SparkConf = new SparkConf().setAppName("SparkKafkaReceiverDStream").setMaster("local[6]").set("spark.driver.host", "localhost")
        //获取SparkContext
        val sparkContext = new SparkContext(sparkConf)
        //设置日至等级
        sparkContext.setLogLevel("WARN")
        //获取StreamingContext
        val streamingContext = new StreamingContext(sparkContext, Seconds(5))
    
        //获取zkQuorum
        val zkQuorum = "node01:2181,node02:2181,node03:2181"
        //获取topics,开启三个线程
        val topics = Map("sparkafka" -> 3)
    
        /**
         * 使用 1 to 3开启三个线程消费kafka三个分区中的数据
         * IndexedSeq 表示里面存放的各个分区的数据
         */
        val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
          /**
           * Create an input stream that pulls messages from Kafka Brokers. | 创建一个从Kafka代理获取消息的输入流。
           * ssc       StreamingContext object | StreamingContext对象
           * zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..) | zookeeper quorum配置
           * groupId   The group id for this consumer  | 次消费者的组名
           * topics    Map of (topic_name to numPartitions) to consume. Each partition is consumed
           * in its own thread
           * Map(topic_name -> numPartitions).每个分区都在自己的线程中使用
           * DStream of (Kafka message key, Kafka message value)
           */
          val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(streamingContext, zkQuorum, "ReceiverDStreamGroup", topics)
          stream
        })
        // 将各个分区的数据合并到一起
        val union: DStream[(String, String)] = streamingContext.union(receiverDStream)
        //此时拿到的数据是key,value对的形式,value是我们需要的数据
        val value: DStream[String] = union.map(x => x._2)
        //打印接收到的数据
        value.print()
    
        streamingContext.start()
        streamingContext.awaitTermination()
    
        
      }
    }
    

    使用0.8版本下Direct DStream接收数据进行消费

    开发代码

    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    
    object SparkKafkaDirectDStream {
      def main(args: Array[String]): Unit = {
        //获取SparkConf
        val sparkConf: SparkConf = new SparkConf().setAppName("SparkKafkaDirectDStream").setMaster("local[6]").set("spark.driver.host", "localhost")
        //获取SparkContext
        val sparkContext = new SparkContext(sparkConf)
        //设置日志级别
        sparkContext.setLogLevel("WARN")
        //获取StreamingContext
        val streamingContext = new StreamingContext(sparkContext, Seconds(5))
        //配置kafka相关参数
        val kafkaParams = Map("metadata.broker.list" -> "node01:9092,node02:9092,node03:9092", "group.id" -> "Kafka_Direct")
    
        val topics = Set("sparkafka")
    
        /**
         * 所需泛型和参数
         * [K: ClassTag,    type of Kafka message key
         * V: ClassTag,     type of Kafka message value
         * KD <: Decoder[K]: ClassTag,      type of Kafka message key decoder
         * VD <: Decoder[V]: ClassTag] (    type of Kafka message value decoder
         * ssc: StreamingContext,     StreamingContext object
         * kafkaParams: Map[String, String],
         * topics: Set[String])     Names of the topics to consume
         */
        val resultDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
        //key是null,value是需要的值
        val value: DStream[String] = resultDStream.map(x => x._2)
        //打印value
        value.print()
    
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    
    }
    

    使用0.10版本下Direct DStream接收数据进行消费

    注意事项

    要把pom.xml中0.8版本的包注释掉,把项目中以上两个object注释或者删掉

    步骤

    一、添加jar包
    <dependency>
    	<groupId>org.apache.spark</groupId>
    	<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    	<version>2.2.0</version>
    </dependency>
    
    二、开发代码
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, ConsumerStrategy, HasOffsetRanges, KafkaUtils, LocationStrategies, LocationStrategy, OffsetRange}
    
    object NewSparkKafkaDirectDStream {
      def main(args: Array[String]): Unit = {
        //获取SparkConf
        val sparkConf: SparkConf = new SparkConf().setAppName("NewSparkKafkaDirectDStream").setMaster("local[6]").set("spark.driver.host", "localhost")
        //获取SparkContext
        val sparkContext = new SparkContext(sparkConf)
        //设置日志级别
        sparkContext.setLogLevel("WARN")
        //获取StreamingContext
        val streamingContext = new StreamingContext(sparkContext, Seconds(5))
    
        /*
        sealed abstract class LocationStrategy
        sealed类表示密封类,需要调用LocationStrategy伴生对象的方法
         */
        //PreferConsistent: Use this in most cases, it will consistently distribute partitions across all executors. | 在大多数情况下,它会在所有执行器之间一致地分配分区。
        val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
        //需要参数consumerStrategy,就调用其伴生对象中的subscribe方法
        /*
        topics: Iterable[jl.String],
        kafkaParams: collection.Map[String, Object]
         */
        val topics: Array[String] = Array("sparkafka")
        //创建topic
        val brokers = "node01:9092,node02:9092,node03:9092"
        val sourcetopic = "sparkafka";
        //创建消费者组
        var group = "sparkafkaGroup"
        //消费者配置
        val kafkaParam = Map(
          "bootstrap.servers" -> brokers, //用于初始化链接到集群的地址
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          //用于标识这个消费者属于哪个消费团体
          "group.id" -> group,
          //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
          //可以使用这个配置,latest自动重置偏移量为最新的偏移量
          "auto.offset.reset" -> "latest",
          //如果是true,则这个消费者的偏移量会在后台自动提交
          "enable.auto.commit" -> (false: java.lang.Boolean)
        );
        val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe[String, String](topics, kafkaParam)
    
        /**
         * 所需参数如下:
         * ssc: StreamingContext,
         * locationStrategy: LocationStrategy,
         * consumerStrategy: ConsumerStrategy[K, V]
         */
        val resultStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext, locationStrategy, consumerStrategy)
    
    
        //循环遍历每一个RDD,一个RDD中有多个分区,每个分区中有多条数据
        resultStream.foreachRDD(iter => {
          //判断RDD中是否有数据,如果大于0就是有数据
          if (iter.count() > 0) {
            //使用foreach,获取到rdd中的每一条数据,并进行处理
            iter.foreach(line => {
              //key为null,value为我们需要的数据
              val value: String = line.value()
              //打印数据
              println(value)
            })
            /*
            RDD中每处理完一批数据,就手动提交这批数据的offset
            利用asInstanceOf将iter强转为HasOffsetRanges,
             */
            val ranges: Array[OffsetRange] = iter.asInstanceOf[HasOffsetRanges].offsetRanges
            //将resultStream强转为CanCommitOffsets,进行异步提交
            resultStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
          }
        })
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    }
    
  • 相关阅读:
    数据库——数据操作——数据的增删改(8)
    数据库——完整性约束(7)
    数据库——数据类型(6)
    生成代码的代码 之 POJO生成器 之二 模板实现
    生成代码的代码 之 错误代码类生成器
    生成代码的代码 之 POJO生成器
    [翻译] Trident-ML:基于storm的实时在线机器学习库
    Vim实用技巧系列
    基于循环数组的无锁队列
    Vim实用技巧系列
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772382.html
Copyright © 2011-2022 走看看