Producer
package zx.zx.sparkkafka import java.util.Properties import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import scala.util.Random /** * Created by 166 on 2017/9/6. */ object Producer { val topic="myWordCount1" val buffer: StringBuilder = new StringBuilder val message: Array[String] = Array("hadoop","scala","spark","kafka","java","storm","redis","hello","world") def getMessage:String={ buffer.clear() for(info<-0 to 10) { if(info!=10) buffer.append(message(Random.nextInt(message.length)).concat(" ")) else buffer.append(message(Random.nextInt(message.length))) } buffer.toString() } def main(args: Array[String]) { //properties用户保存一下配置信息的 val properties= new Properties //添加配置信息:metadata.broker.list指定kafka的Borker的地址和端口,可以是多个Borker的地址 properties.put("metadata.broker.list","192.168.1.88:9092,192.168.1.89:9092,192.168.1.90:9092") //数据写入到kafka中的使用序列化方式 properties.put("serializer.class","kafka.serializer.StringEncoder") val producer= new Producer[String,String](new ProducerConfig(properties)) for (i<-0 until Integer.MAX_VALUE){ Thread.sleep(500) val message: KeyedMessage[String, String] = KeyedMessage[String,String](topic,"",null,getMessage) producer.send(message) } } }
SparkStreamingDemo
注意必须设置checkpoint
package zx.zx.sparkkafka import org.apache.log4j.{Level, Logger} import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by 166 on 2017/9/6. */ object SparkStreamingDemo { /** * Iterator[(String, Seq[Int], Option[Int])] * 第一个:key,单词 * 第二个:当前批次该单词出现的次数 * 第三个:初始值或者以前累加过的值 */ val updataFunc=(iter:Iterator[(String, Seq[Int], Option[Int])])=>{ iter.map(t=>(t._1,t._2.sum+t._3.getOrElse(0))) } def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) //创建SparkConf并设置AppName val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]") //创建StreamingContext val ssc: StreamingContext = new StreamingContext(conf,Seconds(2)) //设置检查点-----如果想要更新历史状态(累加),要设置checkpoint //checkpoint必须设置,一般来说设置中HDFS ssc.checkpoint("C:\Users\166\Desktop\Data\ck") //接受命令行中的参数 //从kafka中拉取数据 val zkQuorum="srv01:2181,srv02:2181,srv03:2181" val groupId="g1"//groupID=UUID.randomUUID().toString //当话题很多时就使用这个要切分---topics={t1,t2,t3} //val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val topic = Map("myWordCount1"->3) val topicAndLine: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupId,topic) //(key,message)--->map(_._2)===>message val lines: DStream[String] = topicAndLine.map(_._2) //该数据可能是多行的 //一行一行地取出来,切分数据 //redis spark scala hadoop hello scala java java hadoop scala world //(redis,1),(spark,1) val words: DStream[(String, Int)] = lines.map(_.split(" ")).flatMap(x=>x).map((_,1))//一行一行地取出来,切分数据 //统计单词数量 val result: DStream[(String, Int)] = words.updateStateByKey(updataFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true) //将结果打印到控制台 result.print() ssc.start() ssc.awaitTermination() } }