package com.xing.stream
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by DengNi on 2016/12/16.
*/
class StreamingFirst {
}
object StreamingFirst {
def main(args: Array[String]) {
val brokers = "192.168.184.188:9092, 192.168.184.178:9092, 192.168.184.168:9092"
val topics = "meinv"
val sparkconf = new SparkConf().setAppName("kafkastreaming").setMaster("local[2]")
val ssc = new StreamingContext(sparkconf,Seconds(6))
ssc.checkpoint("w_checkpoints") //windows 路径
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
//{"@timestamp":"2016-12-14T16:26:21.746Z","beat":{"hostname":"root","name":"root","version":"5.1.1"},"metricset":{"module":"system","name":"process","rtt":28025},"system":{"process":{"cmdline":""C:\WINDOWS\system32\SearchFilterHost.exe" 0 624 628 644 8192 632 ","cpu":{"start_time":"2016-12-14T16:24:15.240Z","total":{"pct":0.000000}},"memory":{"rss":{"bytes":7495680,"pct":0.000400},"share":0,"size":1806336},"name":"SearchFilterHost.exe","pgid":0,"pid":8776,"ppid":2524,"state":"running","username":"NT AUTHORITY\SYSTEM"}},"type":"metricsets"}
val lines = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc,kafkaParams,topicSet)
//val message = lines.map(_._1) map(_._1) 数据是空的 null
val message = lines.map(_._2) //map(_._2) 才是Kafka里面打入的数据
val words = message.flatMap(_.split(":"))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
//message.print() checked
ssc.start()
ssc.awaitTermination()
}
}
向kafka 大数据的程序 参考
http://blog.csdn.net/haohaixingyun/article/details/53647963