zoukankan      html  css  js  c++  java
  • sparkstreaming+kafka

    生产者

    import java.util.HashMap
    import org.apache.kafka.clients.producer._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
    object spark_kafka_wordcount_producer {
        def main(args: Array[String]) {
           val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_first", "3")
           val props = new HashMap[String, Object]()
           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
    
          val producer = new KafkaProducer[String, String](props)
    
          while(true) {
              val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
                    .mkString(" ")
              val message = new ProducerRecord[String, String](topic, null, str)
              producer.send(message)
      
              Thread.sleep(1000)
          }
        }
    }
    

     消费者

    import java.util.Properties
    import kafka.producer._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf
    
    object spark_kafka_wordcount_customer {
       def main(args: Array[String]) {
            val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_first")
            val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
            val ssc =  new StreamingContext(sparkConf, Seconds(1))
            ssc.checkpoint("checkpoint")
    
            val topicpMap = topics.split(",").map((_,2)).toMap
            val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
            val words = lines.flatMap(_.split(" "))
            val pairs = words.map(word => (word, 1))
            val wordCounts = pairs.reduceByKey(_ + _)
            wordCounts.print()
            ssc.start()
            ssc.awaitTermination()
        }
    }
    
  • 相关阅读:
    Flash请求不能传Cookie的PHP解决方案
    免费与不免费
    js编码之 encodeURIComponent
    锁定老帖子 主题:你应当了解的几个CSS3新技术
    清除浮动mini版
    iPad网页开发教程及规则
    手持设备meta设置
    使用 jQuery 在新窗口打开外部链接
    jQuery GET POST AJAX與php异步加载
    游戏英文词汇命名——备用
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6398687.html
Copyright © 2011-2022 走看看