zoukankan      html  css  js  c++  java
  • 2020寒假(12)

    4.使用 Flume 作为 Spark Streaming 数据源
    Flume 是非常流行的日志采集系统,可以作为 Spark Streaming 的高级数据源。请把 Flume
    Source 设置为 netcat 类型,从终端上不断给 Flume Source 发送各种消息,Flume 把消息汇集
    到 Sink,这里把 Sink 类型设置为 avro,由 Sink 把消息推送给 Spark Streaming,由自己编写
    的 Spark Streaming 应用程序对消息进行处理
    实验内容:
    1:配置Flume数据源flume-to-spark.conf  
    2.下载相关jar包:
    spark-streaming-flume_2.11-2.4.4.jar,其中2.11表示对应的Scala版本号,2.4.4表示Spark
    版本号,这里需要根据自己系统scala和spark版本号来进行下载相关jar包,并复制到spark/jars/flume目录下
    http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.11/2.4.4
    3.将Flume/lib下的所有jar包复制到spark/jars/flume目录下
    4.编写spark程序使用flume数据源
    package org.apache.spark.examples.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._
    import org.apache.spark.util.IntParam
    object FlumeEventCount {
     def main(args: Array[String]) {
     if (args.length < 2) {
     System.err.println("Usage: FlumeEventCount <host> <port>")
     System.exit(1)
     }
    StreamingExamples.setStreamingLogLevels()
     val Array(host, IntParam(port)) = args
     val batchInterval = Milliseconds(2000)
     // Create the context and set the batch size
     val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
     val ssc = new StreamingContext(sparkConf, batchInterval)
     // Create a flume stream
     val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
     // Print out the count of events received from this server in each batch
     stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
     ssc.start()
     ssc.awaitTermination()
     } }
    

    编写用于控制日志输出格式代码

    package org.apache.spark.examples.streaming
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.internal.Logging
    object StreamingExamples extends Logging {
     /** Set reasonable logging levels for streaming if the user has not configured log4
    j. */
     def setStreamingLogLevels() {
     val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
     if (!log4jInitialized) {
     // We first log something to initialize Spark's default logging, then we overri
    de the
     // logging level.
     logInfo("Setting log level to [WARN] for streaming example." +
     " To override add a custom log4j.properties to the classpath.")
     Logger.getRootLogger.setLevel(Level.WARN)
     }
     }
    }
    

      在进行打包的时候出现如下错误,未解决。

  • 相关阅读:
    华为设备SNMP配置
    Linux CP直接覆盖快速操作
    CentOS7快速安装PHP7.0指南
    Linux SSH免密码登录配置
    this的指向
    分页的模块layui
    multer中间件
    ajax实现图片上传
    MVC模式、加密、jsonwebtoken
    mongoose与express
  • 原文地址:https://www.cnblogs.com/zjl-0217/p/12324147.html
Copyright © 2011-2022 走看看