zoukankan      html  css  js  c++  java
  • 寒假学习第十四天

    .使用 Flume 作为 Spark Streaming 数据源
    Flume 是非常流行的日志采集系统,可以作为 Spark Streaming 的高级数据源。请把 Flume
    Source 设置为 netcat 类型,从终端上不断给 Flume Source 发送各种消息,Flume 把消息汇集
    到 Sink,这里把 Sink 类型设置为 avro,由 Sink 把消息推送给 Spark Streaming,由自己编写
    的 Spark Streaming 应用程序对消息进行处理。

    配置Flume

        通过以下的配置文件可以将数据发送到Avro Sink。

    agent.sinks = avroSink
    agent.sinks.avroSink.type = avro
    agent.sinks.avroSink.channel = memoryChannel
    agent.sinks.avroSink.hostname = <所选机器的IP>
    agent.sinks.avroSink.port = <所选机器的端口>
    3、配置Spark Streaming应用程序

    A.添加依赖

    <dependency>

        <groupId>org.apache.spark </groupId>

        <artifactId>spark-streaming-flume_2.11</artifactId>
        <version>${spark.version}</version>

    </dependency>

    B.在Streaming应用程序的代码中,导入一个FlumeUtils类并创建input DStream。

    import org.apache.spark.streaming.flume._

    val flumeStream = FlumeUtils.createStream(streamingContext, [所选机器ip], [所选机器端口])
    4、测试

    A.直接运行代码

    package com.ruozedata.streaming

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.flume.FlumeUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    object FlumePushApp {

      def main(args: Array[String]) {

        val Array(hostname, port) = args

        val sparkConf = new SparkConf()
          .setAppName("FlumePushApp")
          .setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(10))

        val lines = FlumeUtils.createStream(ssc,hostname,port.toInt) 
    /*由于createStream返回的DStream类型为SparkFlumeEvent,而不是String,故此时split方法无法使用
    *为了能够使用split,我们执行了以下的map操作
    */
        lines.map(x => new String(x.event.getBody.array()).trim)
          .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

        ssc.start()
        ssc.awaitTermination()
      }
    }

  • 相关阅读:
    yum clean all大坑解决
    RHEL 7 “There are no enabled repos” 的解决方法
    exportfs命令 – 管理NFS服务器共享的文件系统
    Linux放大缩小字体的快捷键
    chcon命令详解
    通过配置hosts.allow和hosts.deny文件允许或禁止ssh或telnet操作
    安装RHEL7配置本地yum源 -- yum不能安装时,在本地安装,亲测成功
    块存储、文件存储、对象存储意义及差异
    在Windows Server 2012 R2域环境中禁用(取消)密码复杂策略
    bat脚本静默安装软件示例
  • 原文地址:https://www.cnblogs.com/lishengming00/p/12315420.html
Copyright © 2011-2022 走看看