zoukankan      html  css  js  c++  java
  • spark streaming kafka example

    // scalastyle:off println
    package org.apache.spark.examples.streaming
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.streaming.scheduler.StreamingListener
    import scala.util.parsing.json.JSON
    
    /**
     * Consumes messages from one or more topics to analysis log
     * calaculate the threadhold under certain time window
     */
    object LogAnalysisB {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println(s"""
            |Usage: DirectKafkaWordCount <brokers> <topics>
            |  <brokers> is a list of one or more Kafka brokers
            |  <topics> is a list of one or more kafka topics to consume from
            |
            """.stripMargin)
          System.exit(1)
        }
        val WINDOW_LENGTH = new Duration(30 * 1000)
        val SLIDE_INTERVAL = new Duration(10 * 1000)
        StreamingExamples.setStreamingLogLevels()
        val Array(brokers, topics) = args
        val sparkConf = new SparkConf().setAppName("ELK Log Analysis windows Threhold")
        val ssc = new StreamingContext(sparkConf,SLIDE_INTERVAL)
        ssc.addStreamingListener(new RuleFileListenerB())
        // Create direct kafka stream with brokers and topics
        val topicsSet = topics.split(",").toSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
        val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet)
    
        // Get the lines, split them into words, count the words and print
        val lines = messages.map(_._2).map(HostAppLog.parseLogLine)
        val windowDStream = lines.window(WINDOW_LENGTH,SLIDE_INTERVAL)
        windowDStream.foreachRDD( logs=>
        {
          val topChar = logs
            .map(log => (log.msg, 1))
            .reduceByKey(_ + _)
            .top(3)(OrderingUtils.SecondValueOrdering)
          println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
          println( s"""Top Endpoints: ${topChar.mkString("[", ",", "]")}""")
    
          val topTest = logs
          .map(log =>(log.host+log.app,if (log.msg.contains("A")) 1 else 0))
          .reduceByKey(_+_)
          .filter(_._2 > 5)
          .take(10)
          println( s"""A > 5 times: ${topTest.mkString("[", ",", "]")}""")
        }
        )
    
        // Start the computation
        ssc.start()
        ssc.awaitTermination()
      }
       def wc(ssc:StreamingContext,map:Map[Any,Any]): Unit =
       {
         if( map.get("message").toString().contains("A"))
           println("find A in message:" + map.toString())
    
       }
    
    }
    
    class RuleFileListenerB extends StreamingListener {
    
      override def onBatchStarted(batchStarted : org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted) {
        println("---------------------------------------------------------------------------------------------------------------------------------------------")
            println("check whether the file's modified date is change, if change then reload the configuration file")
        //val source = scala.io.Source.fromFile("D:/code/scala/test")
        //val lines = try source.mkString finally source.close()
        //println(lines)
        println("---------------------------------------------------------------------------------------------------------------------------------------------")
      }
    
    }
    // scalastyle:on println
  • 相关阅读:
    在非MFC的win 32程序里面能够使用CString类
    一 VC2008环境中ICE的配置
    二 示例程序一
    三 ICE开发初级研究
    VC断点不可用的问题
    Ice笔记-利用Ice::Application类简化Ice应用
    GetCurrentDirectory、SetCurrentDirectory和GetModuleFileName
    Xcopy参数介绍
    COM组件开发实践(八)---多线程ActiveX控件和自动调整ActiveX控件大小(下)
    JackSon fasterxml学习
  • 原文地址:https://www.cnblogs.com/huaxiaoyao/p/5236150.html
Copyright © 2011-2022 走看看