zoukankan      html  css  js  c++  java
  • sparkstreaming+kafka

    生产者

    import java.util.HashMap
    import org.apache.kafka.clients.producer._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
    object spark_kafka_wordcount_producer {
        def main(args: Array[String]) {
           val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_first", "3")
           val props = new HashMap[String, Object]()
           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
    
          val producer = new KafkaProducer[String, String](props)
    
          while(true) {
              val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
                    .mkString(" ")
              val message = new ProducerRecord[String, String](topic, null, str)
              producer.send(message)
      
              Thread.sleep(1000)
          }
        }
    }
    

     消费者

    import java.util.Properties
    import kafka.producer._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf
    
    object spark_kafka_wordcount_customer {
       def main(args: Array[String]) {
            val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_first")
            val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
            val ssc =  new StreamingContext(sparkConf, Seconds(1))
            ssc.checkpoint("checkpoint")
    
            val topicpMap = topics.split(",").map((_,2)).toMap
            val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
            val words = lines.flatMap(_.split(" "))
            val pairs = words.map(word => (word, 1))
            val wordCounts = pairs.reduceByKey(_ + _)
            wordCounts.print()
            ssc.start()
            ssc.awaitTermination()
        }
    }
    
  • 相关阅读:
    PyQt4信号与槽
    Amazon Redshift数据库
    NoSQL数据库的认识
    如何划分子网
    VPC见解
    Linux之添加交换分区
    MySQL基础之 标准模式通配符
    MySQL基础之 LIKE操作符
    MySQL基础之 AND和OR运算符
    MySQL基础之 索引
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6398687.html
Copyright © 2011-2022 走看看