zoukankan      html  css  js  c++  java
  • 寒假学习第十四天

    .使用 Flume 作为 Spark Streaming 数据源
    Flume 是非常流行的日志采集系统,可以作为 Spark Streaming 的高级数据源。请把 Flume
    Source 设置为 netcat 类型,从终端上不断给 Flume Source 发送各种消息,Flume 把消息汇集
    到 Sink,这里把 Sink 类型设置为 avro,由 Sink 把消息推送给 Spark Streaming,由自己编写
    的 Spark Streaming 应用程序对消息进行处理。

    配置Flume

        通过以下的配置文件可以将数据发送到Avro Sink。

    agent.sinks = avroSink
    agent.sinks.avroSink.type = avro
    agent.sinks.avroSink.channel = memoryChannel
    agent.sinks.avroSink.hostname = <所选机器的IP>
    agent.sinks.avroSink.port = <所选机器的端口>
    3、配置Spark Streaming应用程序

    A.添加依赖

    <dependency>

        <groupId>org.apache.spark </groupId>

        <artifactId>spark-streaming-flume_2.11</artifactId>
        <version>${spark.version}</version>

    </dependency>

    B.在Streaming应用程序的代码中,导入一个FlumeUtils类并创建input DStream。

    import org.apache.spark.streaming.flume._

    val flumeStream = FlumeUtils.createStream(streamingContext, [所选机器ip], [所选机器端口])
    4、测试

    A.直接运行代码

    package com.ruozedata.streaming

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.flume.FlumeUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    object FlumePushApp {

      def main(args: Array[String]) {

        val Array(hostname, port) = args

        val sparkConf = new SparkConf()
          .setAppName("FlumePushApp")
          .setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(10))

        val lines = FlumeUtils.createStream(ssc,hostname,port.toInt) 
    /*由于createStream返回的DStream类型为SparkFlumeEvent,而不是String,故此时split方法无法使用
    *为了能够使用split,我们执行了以下的map操作
    */
        lines.map(x => new String(x.event.getBody.array()).trim)
          .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

        ssc.start()
        ssc.awaitTermination()
      }
    }

  • 相关阅读:
    linux systemctl 命令详解
    nginx 与 php-fpm 通信配置
    yum Install PHP 7 on CentOS 7 (英文-转载)
    解决在idea中创建spring boot项目start.spring.io初始化失败的问题
    MongoDB学习5:模型设计和设计模式
    MongoDB学习4:MongoDB复制集机制和原理,搭建复制集
    `curl -L` 解决 GitHub 的 raw.githubusercontent.com 无法连接问题
    用 Hugo 快速搭建博客
    针对多系统时间不一致
    kali 下的邮件发送工具 swaks
  • 原文地址:https://www.cnblogs.com/lishengming00/p/12315420.html
Copyright © 2011-2022 走看看