zoukankan      html  css  js  c++  java
  • spark与flume整合

    spark-streaming与flume整合  push

    package cn.my.sparkStream
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._
    
    
    /**
     
      */
    object SparkFlumePush {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println(
            "Usage: FlumeEventCount <host> <port>")
          System.exit(1)
        }
        LogLevel.setStreamingLogLevels()
        val Array(host, port) = args
        val batchInterval = Milliseconds(2000)
        // Create the context and set the batch size
        val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, batchInterval)
        // Create a flume stream
        val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2)
        // Print out the count of events received from this server in each batch
        stream.count().map(cnt => "Received " + cnt + " flume events.").print()
        //拿到消息中的event,从event中拿出body,body是真正的消息体
        stream.flatMap(t=>{new String(t.event.getBody.array()).split(" ")}).map((_,1)).reduceByKey(_+_).print
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    
    package cn.my.sparkStream

    import java.net.InetSocketAddress

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._


    /**
    *
    */
    object SparkFlumePull {
    def main(args: Array[String]) {
    if (args.length < 2) {
    System.err.println(
    "Usage: FlumeEventCount <host> <port>")
    System.exit(1)
    }
    LogLevel.setStreamingLogLevels()
    val Array(host, port) = args
    val batchInterval = Milliseconds(2000)
    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, batchInterval)
    // Create a flume stream

    //val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2)
    // val flumeStream = FlumeUtils.createPollingStream(ssc, host, port.toInt)
    /*
    def createPollingStream(
    jssc: JavaStreamingContext,
    addresses: Array[InetSocketAddress],
    storageLevel: StorageLevel
    ):
    */
    //当sink有多个的时候
    val flumesinklist = Array[InetSocketAddress](new InetSocketAddress("mini1", 8888))
    val flumeStream = FlumeUtils.createPollingStream(ssc, flumesinklist, StorageLevel.MEMORY_ONLY_2)

    flumeStream.count().map(cnt => "Received " + cnt + " flume events.").print()
    flumeStream.flatMap(t => {
    new String(t.event.getBody.array()).split(" ")
    }).map((_, 1)).reduceByKey(_ + _).print()


    // Print out the count of events received from this server in each batch
    //stream.count().map(cnt => "Received " + cnt + " flume events.").print()
    //拿到消息中的event,从event中拿出body,body是真正的消息体
    //stream.flatMap(t=>{new String(t.event.getBody.array()).split(" ")}).map((_,1)).reduceByKey(_+_).print

    ssc.start()
    ssc.awaitTermination()
    }
    }
     

    http://spark.apache.org/docs/1.6.3/streaming-flume-integration.html

  • 相关阅读:
    【SQL Server】数据库是单个用户的 无法顺利进行操作 怎么解决
    【java 断点续传】
    【SQL 触发器】
    【SQL 数据库】将一张数据表信息复制到另一张数据表
    【java 获取数据库信息】获取MySQL或其他数据库的详细信息
    【zTree】 zTree使用的 小例子
    【前台 ajax】web项目前台传递数组给后台 两种方式
    【POI xls Java map】使用POI处理xls 抽取出异常信息 --java1.8Group by ---map迭代 -- 设置单元格高度
    Linux 在VMware中搭建CentOS6.5虚拟机
    HTML5 Canvas实现360度全景图
  • 原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7357368.html
Copyright © 2011-2022 走看看