zoukankan      html  css  js  c++  java
  • 使用streaming window函数统计用户不同时间段平均消费金额等指标

    场景

    现在餐厅老板已经不满足仅仅统计历史用户消费金额总数了,他想知道每个用户半年,每个月,每天,或者一小时消费的总额,来店消费的次数以及平均金额。

    给出的例子计算的是每5秒,每30秒,每1分钟的用户消费金额,消费次数,平均消费。

    数据格式

    {"user":"zhangsan","payment":8}
    {"user":"wangwu","payment":7}
    ....

    制作kafka输入数据

    与我上篇文章相同
    参考这里

    处理流程

    package StreamingTest
    
    /**
     * Created by liangshiwei on 15/9/9.
     */
    import net.liftweb.json._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object WindowsFunction {
    
      //利用用户消费金额总和计算结果以及用户消费次数统计计算结果计算平均消费金额
      def avgFunction(sum:DStream[(String,Double)],count:DStream[(String,Int)]): DStream[(String,Double)] = {
        val payment = sum.join(count).map(r => {
          val user = r._1
          val sum = r._2._1
          val count = r._2._2
          (user,sum/count)
        })
        payment
      }
    
      def main (args: Array[String]) {
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("test").setMaster("local[*]")
          val ssc = new StreamingContext(conf, Seconds(5))
    
          val zkQuorum = "192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181"
          val consumerGroupName = "user_payment"
          val kafkaTopic = "user_payment"
          val kafkaThreadNum = 1
    
          val topicMap = kafkaTopic.split(",").map((_, kafkaThreadNum.toInt)).toMap
    
          val user_payment = KafkaUtils.createStream(ssc, zkQuorum, consumerGroupName, topicMap).map(x=>{
            parse(x._2)
          })
    
          //计算每5s每个用户的消费总和
          val paymentSum = user_payment.map(jsonLine =>{
            implicit val formats = DefaultFormats
            val user = (jsonLine  "user").extract[String]
            val payment = (jsonLine  "payment").extract[String]
            (user,payment.toDouble)
          }).reduceByKey(_+_)
    
          //输出结果
          paymentSum.print()
    
          //计算每5s每个用户的消费次数
          val paymentCount = user_payment.map(jsonLine =>{
            implicit val formats = DefaultFormats
            val user = (jsonLine  "user").extract[String]
            (user,1)
          }).reduceByKey(_+_)
    
    //      paymentCount.print()
    
          //计算每5s每个用户平均的消费金额
          val paymentAvg = avgFunction(paymentSum,paymentCount)
    //      paymentAvg.print()
    
          //窗口操作,在其中计算不同时间段的结果,入库的话根据使用场景选择吧
          def windowsFunction()  {
            //每5秒计算最后30秒每个用户消费金额
            val windowSum_30 = paymentSum.reduceByKeyAndWindow((a: Double, b: Double) => (a + b),_-_, Seconds(30), Seconds(5))
    //        windowSum_30.print()
    
            //每5秒计算最后30秒每个用户消费次数
            val windowCount_30 = paymentCount.reduceByKeyAndWindow((a: Int, b: Int) => (a + b),_-_, Seconds(30), Seconds(5))
    //        windowCount_30.print()
    
            //每5秒计算最后30秒每个用户平均消费
            val windowAvg_30 = avgFunction(windowSum_30,windowCount_30)
    //        windowAvg_30.print()
    
            //每5秒计算最后60秒每个用户消费金额
            val windowSum_60 = windowSum_30.reduceByKeyAndWindow((a:Double,b:Double)=>(a+b),_-_,Seconds(10),Seconds(5))
    //       windowSum_60.print()
    
            //每5秒计算最后60秒每个用户消费次数
            val windowCount_60 = windowCount_30.reduceByKeyAndWindow((a:Int,b:Int) => (a+b),_-_,Seconds(10),Seconds(5))
    //        windowCount_60.print()
    
            //每5秒计算最后60秒每个用户平均消费
            val windowAvg_60 = avgFunction(windowSum_60,windowCount_60)
    //        windowAvg_60.print
          }
    
          windowsFunction()
    
          ssc
        }
    
        val context = StreamingContext.getOrCreate("checkPoint", functionToCreateContext _)
    
        context.start()
        context.awaitTermination()
      }
    }
    

    结果节选

    实际效果你们自己去体验吧

    //-----------消费总额-------
    (zhangsan,146.0)
    (lisi,79.0)
    (wangwu,99.0)
    (zhaoliu,115.0)
    
    //-----------消费次数-------
    (zhangsan,32)
    (lisi,18)
    (wangwu,30)
    (zhaoliu,24)
    
    //-----------平均消费-------
    (zhangsan,4.5625)
    (lisi,4.388888888888889)
    (wangwu,3.3)
    (zhaoliu,4.791666666666667)
  • 相关阅读:
    [enum]enum的用法
    gridview汇出EXCEL (ExportGridViewToExcel(dt, HttpContext.Current.Response);)
    c#用正则表达式判断字符串是否全是数字、小数点、正负号组成 Regex reg = new Regex(@"^(([0-9]+.[0-9]*[1-9][0-9]*)|([0-9]*[1-9][0-9]*.[0-9]+)|([0-9]*[1-9][0-9]*))$");
    GridView里的文本框改变事件
    转发 win7+iis7.5+asp.net下 CS0016: 未能写入输出文件“c:WindowsMicrosoft.NETFrameworkv2.0.50727Temporary ASP.NET Files 解决方案
    验证上转文件类型的正则表达式
    EXCEL设置选中单元格样式
    转发!HTML 复选框 checkbox 的 JavaScript 的全选和全反选
    dataGrid转换dataTable
    wince mobile环境下播放WAV声音
  • 原文地址:https://www.cnblogs.com/zhangyunlin/p/6168169.html
Copyright © 2011-2022 走看看