文章目录
Streaming和Kafka整合
概述
Kafka项目在0.8和0.10版本之间引入了一个新的消费者api,因此有两个单独的相应的Spark流媒体包。请为您的服务器和所需的功能选择正确的包;注意,0.8集成与后面的0.9和0.10代理兼容,但0.10集成与前面的代理不兼容。
以上是spark官方文档给出的介绍
更多详情可以参考官方文档:http://spark.apache.org/docs/2.2.0/streaming-kafka-integration.html
想了解关于Kafka的更多信息可以参考文章:
【Kafka】消息队列相关知识
【Kafka】Kafka集群环境搭建
【Kafka】Kafka集群基础操作!新手上路必备~
【Kafka】Kafka简单介绍
SparkStreaming一般情况下更多地会和Kafka进行整合,而非Flume,因为Kafka可以实现数据的限流。
从上图和官方文档可以了解到,SparkStreaming和Kafka整合有两个大版本:0.8版本和0.10版本
0.8版本接收数据有两种方式:
Receiver DStream —— 是使用HighLevel API(高阶API) 进行消费,每隔一段时间就将offset值保存在ZooKeeper中,消费模式为at least once模式,有可能会造成重复消费的情况
Direct DStream —— 使用LowLevel API(低阶 API) 进行消费,offset值保存在Kafka自带的默认topic中,每次消费都默认从最新的offset值进行消费,消费模式为at most once,可能会造成数据丢失的情况
0.10版本只有一种接收数据的方式:
Direct DStream —— 也是使用LowLevel API 进行消费,不过配合手动提交offset,实现exactly once消费模式,尽量避免重复消费和数据丢失的情况
使用0.8版本下Receiver DStream接收数据进行消费
步骤
一、启动Kafka集群
必须安装了JDK和ZooKeeper,并保证Zookeeper服务正常启动
在三台机器都后台启动kafka服务
cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
二、创建maven工程,导入jar包
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
三、创建一个kafka的topic
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic sparkafka --zookeeper node01:2181,node02:2181,node03:2181
四、启动kafka的Producer
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic sparkafka
五、开发代码
package cn.itcast.sparkstreaming.demo5_08
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.immutable
object SparkKafkaReceiverDStream {
def main(args: Array[String]): Unit = {
//获取SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("SparkKafkaReceiverDStream").setMaster("local[6]").set("spark.driver.host", "localhost")
//获取SparkContext
val sparkContext = new SparkContext(sparkConf)
//设置日至等级
sparkContext.setLogLevel("WARN")
//获取StreamingContext
val streamingContext = new StreamingContext(sparkContext, Seconds(5))
//获取zkQuorum
val zkQuorum = "node01:2181,node02:2181,node03:2181"
//获取topics,开启三个线程
val topics = Map("sparkafka" -> 3)
/**
* 使用 1 to 3开启三个线程消费kafka三个分区中的数据
* IndexedSeq 表示里面存放的各个分区的数据
*/
val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
/**
* Create an input stream that pulls messages from Kafka Brokers. | 创建一个从Kafka代理获取消息的输入流。
* ssc StreamingContext object | StreamingContext对象
* zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) | zookeeper quorum配置
* groupId The group id for this consumer | 次消费者的组名
* topics Map of (topic_name to numPartitions) to consume. Each partition is consumed
* in its own thread
* Map(topic_name -> numPartitions).每个分区都在自己的线程中使用
* DStream of (Kafka message key, Kafka message value)
*/
val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(streamingContext, zkQuorum, "ReceiverDStreamGroup", topics)
stream
})
// 将各个分区的数据合并到一起
val union: DStream[(String, String)] = streamingContext.union(receiverDStream)
//此时拿到的数据是key,value对的形式,value是我们需要的数据
val value: DStream[String] = union.map(x => x._2)
//打印接收到的数据
value.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
使用0.8版本下Direct DStream接收数据进行消费
开发代码
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.KafkaUtils
object SparkKafkaDirectDStream {
def main(args: Array[String]): Unit = {
//获取SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("SparkKafkaDirectDStream").setMaster("local[6]").set("spark.driver.host", "localhost")
//获取SparkContext
val sparkContext = new SparkContext(sparkConf)
//设置日志级别
sparkContext.setLogLevel("WARN")
//获取StreamingContext
val streamingContext = new StreamingContext(sparkContext, Seconds(5))
//配置kafka相关参数
val kafkaParams = Map("metadata.broker.list" -> "node01:9092,node02:9092,node03:9092", "group.id" -> "Kafka_Direct")
val topics = Set("sparkafka")
/**
* 所需泛型和参数
* [K: ClassTag, type of Kafka message key
* V: ClassTag, type of Kafka message value
* KD <: Decoder[K]: ClassTag, type of Kafka message key decoder
* VD <: Decoder[V]: ClassTag] ( type of Kafka message value decoder
* ssc: StreamingContext, StreamingContext object
* kafkaParams: Map[String, String],
* topics: Set[String]) Names of the topics to consume
*/
val resultDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
//key是null,value是需要的值
val value: DStream[String] = resultDStream.map(x => x._2)
//打印value
value.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
使用0.10版本下Direct DStream接收数据进行消费
注意事项
要把pom.xml中0.8版本的包注释掉,把项目中以上两个object注释或者删掉
步骤
一、添加jar包
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
二、开发代码
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, ConsumerStrategy, HasOffsetRanges, KafkaUtils, LocationStrategies, LocationStrategy, OffsetRange}
object NewSparkKafkaDirectDStream {
def main(args: Array[String]): Unit = {
//获取SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("NewSparkKafkaDirectDStream").setMaster("local[6]").set("spark.driver.host", "localhost")
//获取SparkContext
val sparkContext = new SparkContext(sparkConf)
//设置日志级别
sparkContext.setLogLevel("WARN")
//获取StreamingContext
val streamingContext = new StreamingContext(sparkContext, Seconds(5))
/*
sealed abstract class LocationStrategy
sealed类表示密封类,需要调用LocationStrategy伴生对象的方法
*/
//PreferConsistent: Use this in most cases, it will consistently distribute partitions across all executors. | 在大多数情况下,它会在所有执行器之间一致地分配分区。
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
//需要参数consumerStrategy,就调用其伴生对象中的subscribe方法
/*
topics: Iterable[jl.String],
kafkaParams: collection.Map[String, Object]
*/
val topics: Array[String] = Array("sparkafka")
//创建topic
val brokers = "node01:9092,node02:9092,node03:9092"
val sourcetopic = "sparkafka";
//创建消费者组
var group = "sparkafkaGroup"
//消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> brokers, //用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪个消费团体
"group.id" -> group,
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交
"enable.auto.commit" -> (false: java.lang.Boolean)
);
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe[String, String](topics, kafkaParam)
/**
* 所需参数如下:
* ssc: StreamingContext,
* locationStrategy: LocationStrategy,
* consumerStrategy: ConsumerStrategy[K, V]
*/
val resultStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext, locationStrategy, consumerStrategy)
//循环遍历每一个RDD,一个RDD中有多个分区,每个分区中有多条数据
resultStream.foreachRDD(iter => {
//判断RDD中是否有数据,如果大于0就是有数据
if (iter.count() > 0) {
//使用foreach,获取到rdd中的每一条数据,并进行处理
iter.foreach(line => {
//key为null,value为我们需要的数据
val value: String = line.value()
//打印数据
println(value)
})
/*
RDD中每处理完一批数据,就手动提交这批数据的offset
利用asInstanceOf将iter强转为HasOffsetRanges,
*/
val ranges: Array[OffsetRange] = iter.asInstanceOf[HasOffsetRanges].offsetRanges
//将resultStream强转为CanCommitOffsets,进行异步提交
resultStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
}
})
streamingContext.start()
streamingContext.awaitTermination()
}
}