4.使用 Flume 作为 Spark Streaming 数据源
Flume 是非常流行的日志采集系统,可以作为 Spark Streaming 的高级数据源。请把 Flume
Source 设置为 netcat 类型,从终端上不断给 Flume Source 发送各种消息,Flume 把消息汇集
到 Sink,这里把 Sink 类型设置为 avro,由 Sink 把消息推送给 Spark Streaming,由自己编写
的 Spark Streaming 应用程序对消息进行处理
实验内容:
1:配置Flume数据源flume-to-spark.conf
2.下载相关jar包:
spark-streaming-flume_2.11-2.4.4.jar,其中2.11表示对应的Scala版本号,2.4.4表示Spark
版本号,这里需要根据自己系统scala和spark版本号来进行下载相关jar包,并复制到spark/jars/flume目录下
http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.11/2.4.4
3.将Flume/lib下的所有jar包复制到spark/jars/flume目录下
4.编写spark程序使用flume数据源
package org.apache.spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ import org.apache.spark.util.IntParam object FlumeEventCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: FlumeEventCount <host> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(host, IntParam(port)) = args val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() ssc.awaitTermination() } }
编写用于控制日志输出格式代码
package org.apache.spark.examples.streaming import org.apache.log4j.{Level, Logger} import org.apache.spark.internal.Logging object StreamingExamples extends Logging { /** Set reasonable logging levels for streaming if the user has not configured log4 j. */ def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { // We first log something to initialize Spark's default logging, then we overri de the // logging level. logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.") Logger.getRootLogger.setLevel(Level.WARN) } } }
在进行打包的时候出现如下错误,未解决。
