zoukankan      html  css  js  c++  java
  • 学习进度笔记7

    今天完成spark实验6:Spark Streaming编程初级实践。

    1、安装Flume

    Flume 是 Cloudera 提供的一个分布式、可靠、可用的系统,它能够将不同数据源的海量 日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。Flume 的 核心是把数据从数据源收集过来,再送到目的地。请到 Flume 官网下载 Flume1.7.0 安装文 件,下载地址如下: http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz 或者也可以直接到本教程官网的 “ 下 载 专 区 ” 中 的 “ 软 件 ” 目 录 中 下 载 apache-flume-1.7.0-bin.tar.gz。 下载后,把 Flume1.7.0 安装到 Linux 系统的“/usr/local/flume”目录下,具体安装和使用方法可以参考教程官网的“实验指南”栏目中的“日志采集工具 Flume 的安装与使用方法”。

    2、使用Avro数据源测试Flume

    Avro 可以发送一个给定的文件给 Flume,Avro 源使用 AVRO RPC 机制。请对 Flume 的相关配置文件进行设置,从而可以实现如下功能:在一个终端中新建一个文件 helloworld.txt(里面包含一行文本“Hello World),在另外一个终端中启动 Flume 以后, 可以把 helloworld.txt 中的文本内容显示出来。

    ⑴agent 配置文件

     然后,我们在 avro.conf 写入以下内容

     上面 Avro Source 参数说明如下: Avro Source 的别名是 avro,也可以使用完整类别名 称 org.apache.flume.source.AvroSource,因此,上面有一行设置是 a1.sources.r1.type = avro,表示数据源的类型是 avro。bind 绑定的 ip 地址或主机名,使用 0.0.0.0 表示绑定机 器所有的接口。a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。port 表示绑定 的端口。a1.sources.r1.port = 4141,表示绑定的端口是 4141。a1.sinks.k1.type = logger, 表示 sinks 的类型是 logger。

    ⑵启动 flume agent a1

     ⑶创建指定文件 

    先打开另外一个终端,在/usr/local/flume 下写入一个文件 log.00,内容为 hello,world:

    启动日志控制台

     我们再打开另外一个终端,执行:

    此时我们可以看到第一个终端(agent 窗口)下的显示,也就是在日志控制台,就会把 log.00 文件的内容打印出来:

    avro source 执行成功!

    3、使用netcat数据源测试Flume

    请对 Flume 的相关配置文件进行设置,从而可以实现如下功能:在一个 Linux 终端(这里称为“Flume 终端”)中,启动 Flume,在另一个终端(这里称为“Telnet 终端”)中, 输入命令“telnet localhost 44444”,然后,在 Telnet 终端中输入任何字符,让这些字符可以 顺利地在 Flume 终端中显示出来。

    ⑴创建 agent 配置文件

     在 example.conf 里写入以下内容:

    1.#example.conf: A single-node Flume configuration
    2.# Name the components on this agent
    3.a1.sources = r1
    4.a1.sinks = k1
    5.a1.channels = c1
    6.# Describe/configure the source
    7.a1.sources.r1.type = netcat
    8.a1.sources.r1.bind = localhost
    9.a1.sources.r1.port = 44444
    10.#同上,记住该端口名
    11.# Describe the sink
    12.a1.sinks.k1.type = logger
    13.# Use a channel which buffers events in memory
    14.a1.channels.c1.type = memory
    15.a1.channels.c1.capacity = 1000
    16.a1.channels.c1.transactionCapacity = 100
    17.# Bind the source and sink to the channel
    18.a1.sources.r1.channels = c1
    19.a1.sinks.k1.channel = c1 

    ⑵启动 flume agent (即打开日志控制台):

    再打开一个终端,输入命令:telnet localhost 44444

    然后我们可以在终端下输入任何字符,第一个终端的日志控制台也会有相应的显示,如 我们输入”hello,world”,得出

     第一个终端的日志控制台显示:

    netcatsource 运行成功!

    这里补充一点,flume 只能传递英文和字符,不能用中文.

    4、使用Flume作为Spark Streaming数据源

    Flume 是非常流行的日志采集系统,可以作为 Spark Streaming 的高级数据源。请把 Flume Source 设置为 netcat 类型,从终端上不断给 Flume Source 发送各种消息,Flume 把消息汇集 到 Sink,这里把 Sink 类型设置为 avro,由 Sink 把消息推送给 Spark Streaming,由自己编写的Spark Streaming 应用程序对消息进行处理。

    ⑴配置 Flume 数据源

    请登录 Linux 系统,打开一个终端,执行如下命令新建一个 Flume 配置文件 flume-to-spark.conf:

    flume-to-spark.conf 文件中写入如下内容:

    #flume-to-spark.conf: A single-node Flume configuration
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 33333
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = localhost
    a1.sinks.k1.port =44444
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.transactionCapacity = 1000000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    在上面的配置文件中,我们把 Flume Source 类别设置为 netcat,绑定到 localhost 的 33333 端口,这样,我们后面就可以通过“telnet localhost 33333”命令向 Flume Source 发 送消息。

    同时,我们把 Flume Sink 类别设置为 avro,绑定到 localhost 的 44444 端口,这样, Flume Source 把采集到的消息汇集到 Flume Sink 以后,Sink 会把消息推送给 localhost 的 44444 端口,而我们编写的 Spark Streaming 程序一直在监听 localhost 的 44444 端口,一 旦有消息到达,就会被 Spark Streaming 应用程序取走进行处理。

    特别要强调的是,上述配置文件完成以后,暂时“不要”启动 Flume Agent,如果这个时 候使用“flume-ng agent”命令启动 agent,就会出现错误提示“localhost:44444 拒绝连接”,也 就是 Flume Sink 要发送消息给 localhost 的 44444 端口,但是,无法连接上 localhost 的 44444 端口。

    为什么会出现这个错误呢?因为,这个时候我们还没有启动 Spark Streaming 应用程序,也就没有启动 localhost 的44444 端口,所以,Sink 是无法向这个端口发送消息 的。

     ⑵Spark 的准备工作

    Kafka 和 Flume 等高级输入源,需要依赖独立的库(jar 文件)。按照我们前面安装好 的 Spark 版本,这些 jar 包都不在里面,为了证明这一点,我们现在可以测试一下。请打开 一个新的终端,然后启动 spark-shell:

    启动成功后,在 spark-shell 中执行下面 import 语句:

    你可以看到,马上会报错,因为找不到相关的 jar 包。所以,现在我们就需要下载 spark-streaming-flume_2.11-2.1.0.jar,其中2.11表示对应的Scala版本号,2.1.0表示Spark 版本号。

    现在请在 Linux 系统中,打开一个火狐浏览器,打开下方的网址http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.11/2.1.0,里面有提供 spark-streaming-flume_2.11-2.1.0.jar 文件的下载。

    下载后的文件会被默认保存在当前 Linux 登录用户的下载目录下,本教程统一使用 hadoop 用户名登录 Linux 系统,所以,文件下载后会被保存到“/home/hadoop/下载”目录下面。现在,我们在“/usr/local/spark/jars”目录下新建一个“flume”目录,就把这个文件复制到 Spark 目录的“/usr/local/spark/jars/flume”目录下。请新打开一个终端,输入下面命令:

     我们就成功地把 spark-streaming-flume_2.11-2.1.0.jar 文件拷贝到了 “/usr/local/spark/jars/flume”目录下。

    下面还要继续把 Flume 安装目录的 lib 目录下的所有 jar 文件复制到 “/usr/local/spark/jars/flume”目录下,请在终端中执行下面命令:

     这样,我们就已经准备好了 Spark 环境,它可以支持 Flume 相关编程了。

    ⑶编写 Spark 程序使用 Flume 数据源

     下面,我们就可以进行程序编写了。请新打开一个终端,然后,执行命令创建代码目录:

     请在 FlumeEventCount.scala 代码文件中输入以下代码

    package org.apache.spark.examples.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._
    import org.apache.spark.util.IntParam
    object FlumeEventCount {
     def main(args: Array[String]) {
     if (args.length < 2) {
     System.err.println(
     "Usage: FlumeEventCount <host> <port>")
     System.exit(1)
     }
     StreamingExamples.setStreamingLogLevels()
     val Array(host, IntParam(port)) = args
     val batchInterval = Milliseconds(2000)
     // Create the context and set the batch size
     val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local
    [2]")
     val ssc = new StreamingContext(sparkConf, batchInterval)
     // Create a flume stream
     val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_S
    ER_2)
     // Print out the count of events received from this server in each batch
     stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
     ssc.start()
     ssc.awaitTermination()
     }
    }

    保存 FlumeEventCount.scala 文件并退出 vim 编辑器。FlumeEventCount.scala 程序在 编译后运行时,需要我们提供 host 和 port 两个参数,程序会对指定的 host 和指定的 port 进行监听,Milliseconds(2000)设置了时间间隔为 2 秒,所以,该程序每隔 2 秒就会从指定 的端口中获取由 Flume Sink 发给该端口的消息,然后进行处理,对消息进行统计,打印出 “Received 0 flume events.”这样的信息。

    然后再使用 vim 编辑器新建 StreamingExamples.scala 文件,输入如下代码,用于控制日志输出格式:

    package org.apache.spark.examples.streaming
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.internal.Logging
    object StreamingExamples extends Logging {
     /** Set reasonable logging levels for streaming if the user has not configured log4
    j. */
     def setStreamingLogLevels() {
     val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
     if (!log4jInitialized) {
     // We first log something to initialize Spark's default logging, then we overri
    de the
     // logging level.
     logInfo("Setting log level to [WARN] for streaming example." +
     " To override add a custom log4j.properties to the classpath.")
     Logger.getRootLogger.setLevel(Level.WARN)
     }
     }
    }

    保存 StreamingExamples.scala 文件并退出 vim 编辑器。

    这样,我们在“/usr/local/spark/mycode/flume/src/main/scala”目录下,就有了如下两个 代码文件:

    FlumeEventCount.scala

    StreamingExamples.scala

    然后,新建一个 simple.sbt 文件:

    在 simple.sbt 中输入以下代码:

    name := "Simple Project"
    version := "1.0"
    scalaVersion := "2.11.8"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
    libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
    libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.1.0" 

    保存文件退出 vim 编辑器。然后执行下面命令,进行打包编译:

    打包成功后,就可以执行程序测试效果了。

    ⑷测试程序效果

    关闭之前打开的所有终端。首先,请新建第 1 个 Linux 终端,启动 Spark Streaming 应 用程序,命令如下:

    1.cd /usr/local/spark

    2. ./bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/flume/* --class "org.apache.spark.examples.streaming.FlumeEventCount" /usr/local/spark/mycode/flume/target/scala-2.11/simpleproject_2.11-1.0.jar localhost 44444

    通过上面命令,我们为应用程序提供 host 和 port 两个参数的值分别为 localhost 和 44444,程序会对 localhost 的 44444 端口进行监听,Milliseconds(2000)设置了时间间隔为 2 秒,所以,该程序每隔 2 秒就会从指定的端口中获取由 Flume Sink 发给该端口的消息, 然后进行处理,对消息进行统计,打印出“Received 0 flume events.”这样的信息。

    执行该命令后,屏幕上会显示程序运行的相关信息,并会每隔 2 秒钟刷新一次信息, 大量信息中会包含如下重要信息:

    -------------------------------------------

    Time: 1488029430000 ms

    -------------------------------------------

    Received 0 flume events.

    因为目前 Flume 还没有启动,没有给 FlumeEventCount 发送任何消息,所以 Flume Events 的数量是 0。第 1 个终端不要关闭,让它一直处于监听状态。

    现在,我们可以再另外新建第 2 个终端,在这个新的终端中启动 Flume Agent,命令如 下:

    1.cd /usr/local/flume

    2.bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

    启动 agent 以后,该 agent 就会一直监听 localhost 的 33333 端口,这样,我们下面就 可以通过“telnet localhost 33333”命令向 Flume Source 发送消息。第 2 个终端也不要关闭, 让它一直处于监听状态。

    请另外新建第 3 个终端,执行如下命令:

    1.telnet localhost 33333

    执行该命令以后,就可以在这个窗口里面随便敲入若干个字符和若干个回车,这些消息都会被 Flume 监听到,Flume 把消息采集到以后汇集到 Sink,然后由 Sink 发送给 Spark 的 FlumeEventCount 程序进行处理。然后,你就可以在运行 FlumeEventCount 的前面那个终端窗口内看到类似如下的统计结果:

    -------------------------------------------

    Time: 1488029430000 ms

    -------------------------------------------

    Received 0 flume events.

    #这里省略了其他屏幕信息

    -------------------------------------------

    Time: 1488029432000 ms

    -------------------------------------------

    Received 8 flume events.

    #这里省略了其他屏幕信息

    -------------------------------------------

    Time: 1488029434000 ms

    -------------------------------------------

    Received 21 flume events.

    从屏幕信息中可以看出,我们在 telnet 那个终端内发送的消息,都被成功发送到 Spark 进行处理了。

    至此,本实验顺利完成。

    实验结束后,要关闭各个终端,只要切换到该终端窗口,然后 按键盘的 Ctrl+C 组合键,就可以结束程序运行。

  • 相关阅读:
    iOS 6 Tutorials
    iPhone:文本设计的注意事项
    Beginning Storyboards in iOS 5 Part 2
    代友招聘一名网站开发人员
    利用反射自己写的一个ModelHelper类
    SQL中 对逻辑值取反赋值的语句
    记点uml的表现方式
    我的2008年
    Framework 4.0 新关键字dynamic 之我见(一)
    我的采集小程序配置篇
  • 原文地址:https://www.cnblogs.com/songxinai/p/14240225.html
Copyright © 2011-2022 走看看