zoukankan      html  css  js  c++  java
  • <Spark Streaming><Flume><Integration>

    Overview

    • Flume:一个分布式的,可靠的,可用的服务,用于有效地收集、聚合、移动大规模日志数据
    • 我们搭建一个flume + Spark Streaming的平台来从Flume获取数据,并处理它。
    • 有两种方法实现:使用flume-style的push-based方法,或者使用自定义的sink来实现pull-based方法。

    Approach 1: Flume-style Push-based Approach

    • flume被设计用来在Flume agents之间推信息,在这种方式下,Spark Streaming安装一个receiver that acts like an Avro agent for Flume, to which Flume can push the data.

    General Requirement

    • 当你启动flume + spark streaming应用时,该机器上必须运行一个Spark workers。
    • flume可以向该机器的某一个port push数据。
    • 基于这种push机制,streaming应用必须有一个receiver scheduled and listening on the chosen port.

    Configuring Flume

    • 配置flume以向Avro sink发送数据
    • agent.sinks = avroSink
      agent.sinks.avroSink.type = avro
      agent.sinks.avroSink.channel = memoryChannel
      agent.sinks.avroSink.hostname = <chosen machine's hostname>
      agent.sinks.avroSink.port = <chosen port on the machine>

    Configuring Spark Streaming Application

    1. Linking: 在maven项目中配置依赖
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-flume-sink_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>

      2. Programming:import FlumeUtils, 创建input DStream

     import org.apache.spark.streaming.flume._
    
     val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
    • 注意:应该与cluster中的resourceManager使用同一个hostname,这样的话资源分配可以匹配names,并在正确的机器上launch receiver
    • 一个简单的Spark Streaming统计Flume event个数的demo代码:
    • object FlumeEventCount {
        def main(args: Array[String]) {
          if (args.length < 2) {
            System.err.println(
              "Usage: FlumeEventCount <host> <port>")
            System.exit(1)
          }
      
          StreamingExamples.setStreamingLogLevels()
      
          val Array(host, IntParam(port)) = args
      
          val batchInterval = Milliseconds(2000)
      
          // Create the context and set the batch size
          val sparkConf = new SparkConf().setAppName("FlumeEventCount")
          val ssc = new StreamingContext(sparkConf, batchInterval)
      
          // Create a flume stream
          val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
      
          // Print out the count of events received from this server in each batch
          stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
      
          ssc.start()
          ssc.awaitTermination()
        }
      }
      
    满地都是六便士,她却抬头看见了月亮。
  • 相关阅读:
    【Linux下Inotify + Rsync文件实时同步】
    Nginx升级到1.0.2最新稳定版
    【Nginx+Tomcat+Session 高性能群集搭建】
    HttpException (0x80072749): Unable to make the session state request to the session state server
    there is no source code available for the current location 解决方案
    CHARINDEX和PATINDEX函数 详解
    COMException was unhandled:Old format or invalid type library
    ASP.NET MVC 开源项目 收集
    Drupal Installation:Failed to connect to your MySQL database server
    Sql Server建表时设置双主键及列名以数字开头的解决方法
  • 原文地址:https://www.cnblogs.com/wttttt/p/6841864.html
Copyright © 2011-2022 走看看