zoukankan      html  css  js  c++  java
  • sparkstreaming+kafka

    生产者

    import java.util.HashMap
    import org.apache.kafka.clients.producer._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
    object spark_kafka_wordcount_producer {
        def main(args: Array[String]) {
           val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_first", "3")
           val props = new HashMap[String, Object]()
           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
    
          val producer = new KafkaProducer[String, String](props)
    
          while(true) {
              val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
                    .mkString(" ")
              val message = new ProducerRecord[String, String](topic, null, str)
              producer.send(message)
      
              Thread.sleep(1000)
          }
        }
    }
    

     消费者

    import java.util.Properties
    import kafka.producer._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf
    
    object spark_kafka_wordcount_customer {
       def main(args: Array[String]) {
            val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_first")
            val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
            val ssc =  new StreamingContext(sparkConf, Seconds(1))
            ssc.checkpoint("checkpoint")
    
            val topicpMap = topics.split(",").map((_,2)).toMap
            val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
            val words = lines.flatMap(_.split(" "))
            val pairs = words.map(word => (word, 1))
            val wordCounts = pairs.reduceByKey(_ + _)
            wordCounts.print()
            ssc.start()
            ssc.awaitTermination()
        }
    }
    
  • 相关阅读:
    Net基础篇_学习笔记_第十二天_面向对象继承(命名空间 、值类型和引用类型)
    Net基础篇_学习笔记_第十一天_面向对象(练习)
    js判断客户端是pc还是移动端
    swoole_table
    Master Reactor Manager Worker TaskWorker(Task)
    阻塞,非阻塞,同步,异步
    进程,线程与协程
    swoole 安装与简单应用
    laravel 简单应用 redis
    ubuntu 设置固定IP
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6398687.html
Copyright © 2011-2022 走看看