zoukankan      html  css  js  c++  java
  • Spark streaming 采用直接读kafka 方法获取数据

    package com.xing.stream
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Created by DengNi on 2016/12/16.
      */
    class StreamingFirst {
    
    }
    
    object StreamingFirst {
    
      def main(args: Array[String]) {
    
        val brokers = "192.168.184.188:9092, 192.168.184.178:9092, 192.168.184.168:9092"
        val topics = "meinv"
    
    
        val sparkconf = new SparkConf().setAppName("kafkastreaming").setMaster("local[2]")
        val ssc = new StreamingContext(sparkconf,Seconds(6))
    
        ssc.checkpoint("w_checkpoints")  //windows 路径
    
        val topicSet = topics.split(",").toSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    
    
        //{"@timestamp":"2016-12-14T16:26:21.746Z","beat":{"hostname":"root","name":"root","version":"5.1.1"},"metricset":{"module":"system","name":"process","rtt":28025},"system":{"process":{"cmdline":""C:\WINDOWS\system32\SearchFilterHost.exe" 0 624 628 644 8192 632 ","cpu":{"start_time":"2016-12-14T16:24:15.240Z","total":{"pct":0.000000}},"memory":{"rss":{"bytes":7495680,"pct":0.000400},"share":0,"size":1806336},"name":"SearchFilterHost.exe","pgid":0,"pid":8776,"ppid":2524,"state":"running","username":"NT AUTHORITY\SYSTEM"}},"type":"metricsets"}
        val lines = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc,kafkaParams,topicSet)
        //val message = lines.map(_._1) map(_._1)  数据是空的 null
        val message = lines.map(_._2) //map(_._2)  才是Kafka里面打入的数据
        val words = message.flatMap(_.split(":"))
    
        val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
        wordCounts.print()
        //message.print()  checked
        
        ssc.start()
        ssc.awaitTermination()
    
    
      }
    
    }
    



    向kafka 大数据的程序 参考 http://blog.csdn.net/haohaixingyun/article/details/53647963

  • 相关阅读:
    在光驱按钮不好使用时如何关闭光驱门
    将24位及32位图像数据保存到位图中
    VIM代码自动完成
    加载TLBB场景(一)
    创建异形窗口
    Windows快速关机与重启
    Direct3D9应用CEGUI
    利用GDI+加载图片
    ubuntu MP3乱码解决
    asp.net动态生成控件及访问控件
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501301.html
Copyright © 2011-2022 走看看