zoukankan      html  css  js  c++  java
  • spark streaming

    场景

    餐厅老板想要统计每个用户来他的店里总共消费了多少金额,我们可以使用updateStateByKey来实现

    从kafka接收用户消费json数据,统计每分钟用户的消费情况,并且统计所有时间所有用户的消费情况(使用updateStateByKey来实现)

    数据格式

    {"user":"zhangsan","payment":8}
    {"user":"wangwu","payment":7}
    ....

    往kafka写入消息(kafka producer)

    package producer
    
    import java.util.Properties
    
    import kafka.javaapi.producer.Producer
    import kafka.producer.{KeyedMessage, ProducerConfig}
    import org.codehaus.jettison.json.JSONObject
    import scala.util.Random
    
    object KafkaProducer extends App{
    
      //所有用户
      private val users = Array(
        "zhangsan", "lisi",
        "wangwu", "zhaoliu")
    
      private val random = new Random()
    
      //消费的金额(0-9)
      def payMount() : Double = {
        random.nextInt(10)
      }
    
      //随机获得用户名称
      def getUserName() : String = {
        users(random.nextInt(users.length))
      }
    
      //kafka参数
      val topic = "user_payment"
      val brokers = "192.168.6.55:9092,192.168.6.56:9092"
      val props = new Properties()
      props.put("metadata.broker.list", brokers)
      props.put("serializer.class", "kafka.serializer.StringEncoder")
    
      val kafkaConfig = new ProducerConfig(props)
      val producer = new Producer[String, String](kafkaConfig)
    
      while(true) {
        // 创建json串
        val event = new JSONObject()
        event
          .put("user", getUserName())
          .put("payment", payMount)
    
        // 往kafka发送数据
        producer.send(new KeyedMessage[String, String](topic, event.toString))
        println("Message sent: " + event)
    
        //每隔200ms发送一条数据
        Thread.sleep(200)
      }
    }
    

    使用spark Streaming处理数据

    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{StreamingContext, Seconds}
    import org.apache.spark.{SparkContext, SparkConf}
    import net.liftweb.json._
    
    object UpdateStateByKeyTest {
    
      def main (args: Array[String]) {
    
        def functionToCreateContext(): StreamingContext = {
        //创建streamingContext
          val conf = new SparkConf().setAppName("test").setMaster("local[*]")
          val ssc = new StreamingContext(conf, Seconds(60))
    
          //将数据进行保存(这里作为演示,生产中保存在hdfs)
          ssc.checkpoint("checkPoint")
    
          val zkQuorum = "192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181"
          val consumerGroupName = "user_payment"
          val kafkaTopic = "user_payment"
          val kafkaThreadNum = 1
    
          val topicMap = kafkaTopic.split(",").map((_, kafkaThreadNum.toInt)).toMap
    
        //从kafka读入数据并且将json串进行解析
          val user_payment = KafkaUtils.createStream(ssc, zkQuorum, consumerGroupName, topicMap).map(x=>{
            parse(x._2)
          })
    
         //对一分钟的数据进行计算
          val paymentSum = user_payment.map(jsonLine =>{
            implicit val formats = DefaultFormats
            val user = (jsonLine  "user").extract[String]
            val payment = (jsonLine  "payment").extract[String]
            (user,payment.toDouble)
          }).reduceByKey(_+_)
    
          //输出每分钟的计算结果
          paymentSum.print()
    
        //将以前的数据和最新一分钟的数据进行求和
          val addFunction = (currValues : Seq[Double],preVauleState : Option[Double]) => {
            val currentSum = currValues.sum
            val previousSum = preVauleState.getOrElse(0.0)
            Some(currentSum + previousSum)
          }
    
          val totalPayment = paymentSum.updateStateByKey[Double](addFunction)
    
          //输出总计的结果
          totalPayment.print()
    
          ssc
        }
    
        //如果"checkPoint"中存在以前的记录,则重启streamingContext,读取以前保存的数据,否则创建新的StreamingContext
        val context = StreamingContext.getOrCreate("checkPoint", functionToCreateContext _)
    
        context.start()
        context.awaitTermination()
      }
    }
    

    运行结果节选

    //-----------第n分钟的结果------------------
    
    //1分钟结果
    -------------------
    (zhangsan,23.0)
    (lisi,37.0)
    (wangwu,31.0)
    (zhaoliu,34.0)
    -------------------
    
    //总和结果 
    (zhangsan,101.0)
    (lisi,83.0)
    (wangwu,80.0)
    (zhaoliu,130.0)
    
    //-----------第n+1分钟的结果------------------
    
    //1分钟结果
    -------------------
    (zhangsan,43.0)
    (lisi,16.0)
    (wangwu,21.0)
    (zhaoliu,54.0)
    -------------------
    //总和结果 
    -------------------
    (zhangsan,144.0)
    (lisi,99.0)
    (wangwu,101.0)
    (zhaoliu,184.0)
    -------------------

    后记

    下一片文章为统计不同时间段用户平均消费金额,消费次数,消费总额等指标。
    点击这里

  • 相关阅读:
    Android(java)学习笔记68:使用proguard混淆android代码
    SGU 194 Reactor Cooling
    关于流量有上下界的网络流问题的求解
    关于最小割的求解方法
    HDU 5311 Hidden String
    POJ 3548 Restoring the digits
    POJ 2062 HDU 1528 ZOJ 2223 Card Game Cheater
    ZOJ 1967 POJ 2570 Fiber Network
    HDU 1969 Pie
    HDU 1956 POJ 1637 Sightseeing tour
  • 原文地址:https://www.cnblogs.com/zhangyunlin/p/6168170.html
Copyright © 2011-2022 走看看