zoukankan      html  css  js  c++  java
  • 寒假学习第十四天

    .使用 Flume 作为 Spark Streaming 数据源
    Flume 是非常流行的日志采集系统,可以作为 Spark Streaming 的高级数据源。请把 Flume
    Source 设置为 netcat 类型,从终端上不断给 Flume Source 发送各种消息,Flume 把消息汇集
    到 Sink,这里把 Sink 类型设置为 avro,由 Sink 把消息推送给 Spark Streaming,由自己编写
    的 Spark Streaming 应用程序对消息进行处理。

    配置Flume

        通过以下的配置文件可以将数据发送到Avro Sink。

    agent.sinks = avroSink
    agent.sinks.avroSink.type = avro
    agent.sinks.avroSink.channel = memoryChannel
    agent.sinks.avroSink.hostname = <所选机器的IP>
    agent.sinks.avroSink.port = <所选机器的端口>
    3、配置Spark Streaming应用程序

    A.添加依赖

    <dependency>

        <groupId>org.apache.spark </groupId>

        <artifactId>spark-streaming-flume_2.11</artifactId>
        <version>${spark.version}</version>

    </dependency>

    B.在Streaming应用程序的代码中,导入一个FlumeUtils类并创建input DStream。

    import org.apache.spark.streaming.flume._

    val flumeStream = FlumeUtils.createStream(streamingContext, [所选机器ip], [所选机器端口])
    4、测试

    A.直接运行代码

    package com.ruozedata.streaming

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.flume.FlumeUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    object FlumePushApp {

      def main(args: Array[String]) {

        val Array(hostname, port) = args

        val sparkConf = new SparkConf()
          .setAppName("FlumePushApp")
          .setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(10))

        val lines = FlumeUtils.createStream(ssc,hostname,port.toInt) 
    /*由于createStream返回的DStream类型为SparkFlumeEvent,而不是String,故此时split方法无法使用
    *为了能够使用split,我们执行了以下的map操作
    */
        lines.map(x => new String(x.event.getBody.array()).trim)
          .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

        ssc.start()
        ssc.awaitTermination()
      }
    }

  • 相关阅读:
    Mysql删除数据库中所有表
    MySQL出现2059错误
    .NetCore笔记
    PLSql中文乱码
    Oracle误删除数据恢复。Oracle删除后恢复数据
    ora-28000:the account is locked,Oracle修改密码有效期,Oracle设置密码不过期
    Linux 常用命令
    Razor
    ORA-01578: ORACLE 数据块损坏 (文件号 13, 块号 2415081) ORA-01110: 数据文件XXXXXX
    ORA-01033:ORACLE initialization or shutdown
  • 原文地址:https://www.cnblogs.com/lishengming00/p/12315420.html
Copyright © 2011-2022 走看看