一、Streaming与Flume的联调
Spark 2.2.0 对应于 Flume 1.6.0
两种模式:
1. Flume-style push-based approach:
Flume推送数据給Streaming
Streaming的receiver作为Flume的Avro agent
Spark workers应该跑在Flume这台机器上
Streaming先启动,receiver监听Flume push data的端口
实现:
写flume配置文件:
netcat source -> memory channel -> avro sink
IDEA开发:
添加Spark-flume依赖
对应的API是FlumeUtils
开发代码:
import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * Spark Streaming整合Flume的第一种方式 * */ object FlumePushWordCount { def main(args: Array[String]): Unit = { //外部传入参数 if (args.length != 2) { System.out.println("Usage: FlumePushWordCount <hostname> <port>") System.exit(1) } val Array(hostname, port) = args //外部args数组 val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //选择输入ssc的createStream方法,生成一个InputDStream val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt) //由于flume的内容有head有body, 需要先把内容拿出来, 并去掉空值 flumeStream.map(x => new String(x.event.getBody.array()).trim) .flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
注意:为了不hard-core,选择外部传入hostname和port
在IDEA测试时,可以在
里面的program argument输入运行参数
在本地测试时:
先启动Streaming作业,然后启动flume agent,最后通过telnet输入数据,观察IDEA的控制台输出
在服务器测试时:
submit时一定要把maven依赖中在--packages加上,自动会在网络上下载依赖
当不能下载时,需要--jars才能把预先下载好的jar包加上
2. Pull-based approach using a custom sink:
Streaming拉数据
Flume推送的数据先放到sink缓冲区
Streaming使用一个reliable flume receiver,确保了数据的接收和备份
可靠性更高,支持容错,生产上面常用
一台机器运行Flume agent,Spark集群其他机器可访问这台机器的custom sink
实现:
Flume配置:
使用相关jars包,配置依赖:(参考Spark官网)
sink是一个独特的type
IDEA开发:
对应上面Flume的依赖,使用的是createPollStream,区别于第一种模式
其他地方都一样,体现了Spark代码的复用性
本地测试:
先启动flume!!后启动Streaming作业
二、Streaming与Kafka的联调
Spark2.2.0对应于Kafka 0.8.2.1或更新(本次使用的是0.9.0.0)
两种模式:
1. Receiver-based approach
使用Kafka高级用户API
为了确保零数据丢失,需要用到Write Ahead Logs(出现于Spark 1.2)
同步地保存接收到的数据到日志当中,出错时可以恢复(容错机制)
这是传统的方式,在ZK server中消费数据
用KafkaUtils和Streaming对接,一样需要加入kafka的各种依赖(见官网)
使用的API是createStream
注意:
- 此处的topic分区和RDD的分区不同概念
- 多个Kafka DStream可以并行接收
- 用write ahead logs时需要配置StorageLevel.MEMORY_AND_DISK_SER
准备工作:
启动ZK server
启动kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties
创建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
测试topic能否正确生产和消费
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_streaming_topic
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic
IDEA代码:
import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * SparkStreaming对接Kafka其中的Receiver-based方式 * */ object KafkaReceiverWordCount { def main(args: Array[String]): Unit = { if (args.length != 4) { System.out.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //createStream需要传入的其中一个参数是一个Map,就是topics对应的线程数 val topicsMap = topics.split(",").map((_, numThreads.toInt)).toMap val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicsMap) //一定要取Stream的第二位才是数据,可以print出来看看,在实际生产中只是更改这一行的业务逻辑!!! message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
本地测试/服务器测试:
从IDEA中输入参数,即可看到结果
从服务器测试也是打包submit就行,看web UI的时候留意验证receiver是占有一个Job的,证实了前面的理论
2. Direct Approach
No receiver!!!
从Spark 1.3 版本开始有
没有了Receiver,而是周期性地检测Kafka的offset,用了kafka simple consumer API
优点:
- 简化了并行度,不需要创建多个input stream
- 性能更好,达到零数据丢失,且不需要保存副本于write ahead logs中
- 一次语义Exactly-once semantics
缺点:不能在zookeeper中更新offset,但可以自己设置让其更新
使用的API是createDirectStream
准备工作和上面一样。
IDEA代码:
import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * SparkStreaming对接Kafka其中的Direct方式 * */ object KafkaDirectWordCount { def main(args: Array[String]): Unit = { if (args.length != 4) { System.out.println("Usage: KafkaReceiverWordCount <brokers> <topics>") System.exit(1) } val Array(brokers, topics) = args val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //createDirectStream需要传入kafkaParams和topicsSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val topicsSet = topics.split(",").toSet val message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet ) //一定要取Stream的第二位才是数据,可以print出来看看 message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
注意:StringDecoder有可能因为前面写Kafka java API时的包冲突而导入失败
在IDEA运行时报错:
这是由于之前在Kafka基础学习中我设置的kafka的依赖是0.9.0.0,和我们IDEA冲突,所以要把这一个依赖注释掉才能执行
调优时就是配置createDirectStream的参数嘛!!
三、Flume + Kafka + Spark Streaming常用流处理架构
实现的需求:实时(到现在为止)的日志访问统计操作
由于本人缺乏日志采集来源,故使用python语言来实现一个日志生成器,模拟生产环境中服务器不断生成日志的过程
本生成器产生的日志内容包括ip、time、url、status、referer
根据前面的知识,我们在实现的过程中有以下步骤:
1. Flume的选型,在本例中设为exec-memory-kafka
2. 打开kafka一个消费者,再启动flume读取日志生成器中的log文件,可看到kafka中成功读取到日志产生器的实时数据
3. 让Kafka接收到的数据传输到Spark Streaming当中,这样就可以在Spark对实时接收到的数据进行操作了
由于与前面一、二的操作基本一致,此处不再重复列出详细操作过程
下面直接进入Spark中对实时数据的操作:
分为数据清洗过程、统计功能实现过程两个步骤!其中统计功能的实现基本上和Spark SQL中的操作一致,这又体现了Spark的代码复用性,即能通用于多个框架中