zoukankan      html  css  js  c++  java
  • 转:Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统

    转自:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/

    生产者:

    package sparkStreaming
    
    import scala.util.Random
    import java.util.HashMap
    import org.apache.kafka.clients.producer._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
    object UserBehaviorProducerClient {
       private val PAGE_NUM = 100
       private val MAX_MSG_NUM = 3
       private val MAX_CLICK_TIME = 5
       private val MAX_STAY_TIME = 10
       private val LIKE_OR_NOT = Array[Int](1, 0, -1)
       
       def main(args: Array[String]) {
           val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_test_topic", "3")
           val props = new HashMap[String, Object]()
           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
     
          val producer = new KafkaProducer[String, String](props)
     
          val rand = new Random()
           while (true) {
               //how many user behavior messages will be produced
               val msgNum = rand.nextInt(MAX_MSG_NUM) + 1
               try {
                   //generate the message with format like page1|2|7.123|1
                   for (i <- 0 to msgNum) {
                        var msg = new StringBuilder()
                        msg.append("page" + (rand.nextInt(PAGE_NUM) + 1))
                        msg.append("|")
                        msg.append(rand.nextInt(MAX_CLICK_TIME) + 1)
                        msg.append("|")
                        msg.append(rand.nextInt(MAX_CLICK_TIME) + rand.nextFloat())
                        msg.append("|")
                        msg.append(LIKE_OR_NOT(rand.nextInt(3)))
                        println(msg.toString())
                        val message = new ProducerRecord[String, String](topic, null, msg.toString())
                        producer.send(message)
                    }
                    println("%d user behavior messages produced.".format(msgNum+1))
                } catch {
                    case e: Exception => println(e)
                }
                try {
                     //sleep for 5 seconds after send a micro batch of message
                    Thread.sleep(5000)
                } catch {
                    case e: Exception => println(e)
                }
           }
       }
    }
    

    消费者:

    package sparkStreaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.HashPartitioner
    import org.apache.spark.streaming.Duration
    
    object WebPagePopularityValueCalculator {
    
      def main(args: Array[String]) {
    
        val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic")
        val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint("checkpoint")
    
        val topicpMap = topics.split(",").map((_, 2)).toMap
        
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
        val popularityData = lines.map { msgLine =>
          {
            val dataArr: Array[String] = msgLine.split("\|")
            val pageID = dataArr(0)
            val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
            (pageID, popValue)
          }
        }
        //sum the previous popularity value and current value
        val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
          iterator.flatMap(t => {
            val newValue: Double = t._2.sum
            val stateValue: Double = t._3.getOrElse(0);
            Some(newValue + stateValue)
          }.map(sumedValue => (t._1, sumedValue)))
        }
        val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00)))
        val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,
          new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
        //set the checkpoint interval to avoid too frequently data checkpoint which may
        //may significantly reduce operation throughput
        stateDstream.checkpoint(Duration(8 * 2 * 1000))
        //after calculation, we need to sort the result and only show the top 10 hot pages
        stateDstream.foreachRDD { rdd =>
          {
            val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false)
            val topKData = sortedData.take(10).map { case (v, k) => (k, v) }
            topKData.foreach(x => {
              println(x)
            })
          }
        }
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
  • 相关阅读:
    Volatile关键字
    ThreadPoolExecutor线程池基本原理及使用
    HashMap线程不安全源码解析(1.7 + 1.8)
    SpringBoot+ajax+formData实现图片上传和回显
    BloomFilter
    POST和GET
    快手电话面试
    Apache SSI 远程命令执行漏洞
    SYSTEM--服务器提权
    封神台靶场练习(2)
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6542870.html
Copyright © 2011-2022 走看看