zoukankan      html  css  js  c++  java
  • sparkstreaming+kafka

    生产者

    import java.util.HashMap
    import org.apache.kafka.clients.producer._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
    object spark_kafka_wordcount_producer {
        def main(args: Array[String]) {
           val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_first", "3")
           val props = new HashMap[String, Object]()
           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
    
          val producer = new KafkaProducer[String, String](props)
    
          while(true) {
              val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
                    .mkString(" ")
              val message = new ProducerRecord[String, String](topic, null, str)
              producer.send(message)
      
              Thread.sleep(1000)
          }
        }
    }
    

     消费者

    import java.util.Properties
    import kafka.producer._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf
    
    object spark_kafka_wordcount_customer {
       def main(args: Array[String]) {
            val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_first")
            val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
            val ssc =  new StreamingContext(sparkConf, Seconds(1))
            ssc.checkpoint("checkpoint")
    
            val topicpMap = topics.split(",").map((_,2)).toMap
            val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
            val words = lines.flatMap(_.split(" "))
            val pairs = words.map(word => (word, 1))
            val wordCounts = pairs.reduceByKey(_ + _)
            wordCounts.print()
            ssc.start()
            ssc.awaitTermination()
        }
    }
    
  • 相关阅读:
    【Python】错误、调试和测试
    【c++ primer, 5e】函数指针
    【英语学习】【17/4/1】
    【c++ primer, 5e】函数匹配
    FIRST GAME.
    【Thinking in Java, 4e】访问权限控制
    【c++ primer, 5e】特殊用途语言特性
    Top-Down笔记 #01# 计算机网络概述
    NHibernate之映射文件配置说明
    Web Service 部署到IIS服务器
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6398687.html
Copyright © 2011-2022 走看看